摘要:并發(fā)設(shè)計模式一模式的使用表示線程本地存儲模式。為不同的任務(wù)創(chuàng)建不同的線程池,這樣能夠有效的避免死鎖問題。兩階段終止,即將線程的結(jié)束分為了兩個階段,第一個階段是一個線程向另一個線程發(fā)送終止指令,第二個階段是線程響應(yīng)終止指令。
Java 并發(fā)設(shè)計模式 一、Thread Local Storage 模式 1. ThreadLocal 的使用
Thread Local Storage 表示線程本地存儲模式。
大多數(shù)并發(fā)問題都是由于變量的共享導(dǎo)致的,多個線程同時讀寫同一變量便會出現(xiàn)原子性,可見性等問題。局部變量是線程安全的,本質(zhì)上也是由于各個線程各自擁有自己的變量,避免了變量的共享。
Java 中使用了 ThreadLocal 來實現(xiàn)避免變量共享的方案。ThreadLocal 保證在線程訪問變量時,會創(chuàng)建一個這個變量的副本,這樣每個線程都有自己的變量值,沒有共享,從而避免了線程不安全的問題。
下面是 ThreadLocal 的一個簡單使用示例:
public class ThreadLocalTest { private static final ThreadLocalthreadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); public static SimpleDateFormat safeDateFormat() { return threadLocal.get(); } public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask task1 = new FutureTask<>(ThreadLocalTest::safeDateFormat); FutureTask task2 = new FutureTask<>(ThreadLocalTest::safeDateFormat); Thread t1 = new Thread(task1); Thread t2 = new Thread(task2); t1.start(); t2.start(); System.out.println(task1.get() == task2.get());//返回false,表示兩個對象不相等 } }
程序中構(gòu)造了一個線程安全的 SimpleDateFormat ,兩個線程取到的是不同的示例對象,這樣就保證了線程安全。
2. ThreadLocal 原理淺析線程 Thread 類內(nèi)部有兩個 ThreadLocalMap 類型的變量:
/* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null; /* * InheritableThreadLocal values pertaining to this thread. This map is * maintained by the InheritableThreadLocal class. */ ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
其中第二個變量的用途是創(chuàng)建可繼承父線程變量的子線程,只不過這并不常用,主要介紹第一個。
ThreadLocalMap 是一個用于存儲 ThreadLocal 的特殊 HashMap,map 中 key 就是 ThreadLocal,value 是線程變量值。只不過這個 map 并不被 ThreadLocal 持有,而是被 Thread 持有。
當(dāng)調(diào)用 ThreadLocal 類中的 set 方法時,就會創(chuàng)建 Thread 中的 threadLocals 屬性。
//ThreadLocal的set方法 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t);//獲取Thread中的ThreadLocalMap if (map != null) map.set(this, value); else createMap(t, value); }
可以看到,最終的 ThreadLocal 對象和變量值并不是創(chuàng)建在 ThreadLocal 內(nèi)部,而是 Thread 中的 ThreadLocalMap,ThreadLocal 在這里只是充當(dāng)了代理的作用。
3. ThreadLocal 內(nèi)存泄漏問題存儲數(shù)據(jù)的 TheadLocalMap 被 Thread 持有,而不是 ThreadLocal,主要的原因便是 ThreadLocal 的生命周期比 Thread 要長,如果 ThreadLocal 對象一直存在,那么 map 中的線程就不能被回收,容易導(dǎo)致內(nèi)存泄漏。
而 Thread 持有 ThreadLocalMap,并且 ThreadLocalMap 對 ThreadLocal 的引用還是弱引用,這樣當(dāng)線程被回收時,map 也能夠被回收,更加安全。
但是 Java 的這種設(shè)計并沒有完全避免內(nèi)存泄漏問題。如果線程池中的線程存活時間過長,那么其持有的 ThreadLocalMap 一直不會被釋放。ThreadLocalMap 中的 Entry 對其 value 是強引用的(對 ThreadLocal 是弱引用),這樣就算 ThreadLocalMap 的生命周期結(jié)束了,但是 value 值并沒有被回收。
解決的辦法便是手動釋放 ThreadLocalMap 中對 value 的強引用,可以使用 TheadLocal 的 remove 方法。在 finally 語句塊中執(zhí)行。例如下面這個簡單的示例:
public class ThreadLocalTest { private final ThreadLocal二、Immutability 模式 1. 不可變的概念threadLocal = new ThreadLocal<>(); public void test(){ //設(shè)置變量值 threadLocal.set(10); try { System.out.println(threadLocal.get()); } finally { //釋放 threadLocal.remove(); } } }
Immutability,即不變模式。可以理解為只要對象一經(jīng)創(chuàng)建,其狀態(tài)是不能夠被改變的,無法進行寫操作。
要實現(xiàn) Immuatability 模式很簡單,將一個類本身及其所有的屬性都設(shè)為 final ,并且方法都是只讀的,需要注意的是,如果類的屬性也是引用類型,那么其對應(yīng)的類也要滿足不可變的特性。final 應(yīng)該都很熟悉了,用它來修飾類和方法,分別表示類不可繼承、屬性不可改變。
Java 中具備不可變性的類型包括:
String
final 修飾的基本數(shù)據(jù)類型
Integer、Long、Double 等基本數(shù)據(jù)類型的包裝類
Collections 中的不可變集合
具備不可變性的類,如果需要有類似修改這樣的功能,那么它不會像普通的對象一樣改變自己的屬性,而是創(chuàng)建新的對象。
下面是 String 的字符串連接方法 concat() 的源碼,仔細觀察,可以看到最后方法返回的時候,創(chuàng)建了一個新的 Sring 對象:
public String concat(String str) { int otherLen = str.length(); if (otherLen == 0) { return this; } int len = value.length; char buf[] = Arrays.copyOf(value, len + otherLen); str.getChars(buf, len); //創(chuàng)建新的對象 return new String(buf, true); }
而 Collections 工具可以將集合變?yōu)椴豢勺兊模耆箤憽⑿薷牡炔僮鳌J纠缦拢?/p>
//Collections 中構(gòu)建不可變集合的方法 Collections.unmodifiableList(); Collections.unmodifiableSet(); Collections.unmodifiableMap(); Collections.unmodifiableSortedSet(); Collections.unmodifiableSortedMap(); --- List2. 對象池list = Arrays.asList(1, 2, 3, 4, 5); //構(gòu)建不可變集合 List unmodifiableList = Collections.unmodifiableList(list); unmodifiableList.remove(1);//拋出異常
對于一個不可變性的類,如果頻繁的對其進行修改操作,那么一直會創(chuàng)建性新的對象,這樣就比較浪費內(nèi)存空間了,一種解決辦法便是利用對象池。
原理也很簡單,新建對象的時候,去對象池看是否存在對象,如果存在則直接利用,如果不存在才會創(chuàng)建新的對象,創(chuàng)建之后再將對象放到對象池中。
以長整型的包裝類 Long 為例,它緩存了 -128 到 127 的數(shù)據(jù),如果創(chuàng)建的是這個區(qū)間的對象,那么會直接使用緩存中的對象。例如 Long 中的 valueOf 方法就用到了這個緩存,然后直接返回:
public static Long valueOf(long l) { final int offset = 128; //在這個區(qū)間則直接使用緩存中的對象 if (l >= -128 && l <= 127) { // will cache return LongCache.cache[(int)l + offset]; } return new Long(l); }三、Guarded Suspension 模式 1. Guarded Suspension 實現(xiàn)
Guarded Suspension 意為保護性暫停。一個典型的使用場景是:當(dāng)客戶端線程 T 發(fā)送請求后,服務(wù)端這時有大量的請求需要處理,這時候就需要排隊,線程 T 進入等待狀態(tài),直到服務(wù)端處理完請求并且返回結(jié)果。
Guarded Suspension 的實現(xiàn)很簡單,有一個對象 GuardedObject,其內(nèi)部有一個屬性,即被保護的對象,還有兩個方法,客戶端調(diào)用 get() 方法,如果未獲取到結(jié)果,則進入等待狀態(tài),即“保護性暫停”;還有一個 notice() 通知方法,當(dāng)服務(wù)端處理完請求后,調(diào)用這個方法,并且喚醒等待中的線程。示意圖如下:
示例代碼如下:
public class GuardedObject{ private T obj; private final Lock lock = new ReentrantLock(); private final Condition finished = lock.newCondition(); //調(diào)用方線程獲取結(jié)果 T get(){ lock.lock(); try { while (未獲取到結(jié)果){ finished.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return obj; } //執(zhí)行完后通知 void notice(T obj){ lock.lock(); try { this.obj = obj; finished.signalAll(); } finally { lock.unlock(); } } }
從代碼中可以看到,Guarded Suspension 模式本質(zhì)上就是一種等待-通知機制,只不過使用這種模式,在解決實際的問題的時候,需要根據(jù)情況進行程序功能的擴展。
2. 使用示例還是上面提到的那個例子,當(dāng)客戶端發(fā)送請求后,需要等待服務(wù)端的響應(yīng)結(jié)果,這時候就可以使用 Guarded Suspension 來實現(xiàn),下面是代碼示例:
public class SendRequest{ //相當(dāng)于消息隊列 private final BlockingQueue queue = new ArrayBlockingQueue<>(5); //客戶端發(fā)送請求 void send(Request request) throws InterruptedException { //將消息存放至隊列中 queue.put(request); //創(chuàng)建Guarded Suspension模式的對象 GuardedObject guardedObject = GuardedObject.create(request.id); //循環(huán)等待,獲取結(jié)果 Request res = guardedObject.get(Objects::nonNull); } //服務(wù)端處理請求 void handle() throws InterruptedException { //從隊列中獲取請求 Request request = queue.take(); //調(diào)用請求對應(yīng)的GuardedObject,并處理請求 GuardedObject.handleRequest(request.id, request); } //請求類 private static class Request{ private int id; private String content; } }
需要注意的是,這里并不是直接使用 new GuardedObject() 的方式來創(chuàng)建對象,這是因為需要找到每個請求和對象之間的對應(yīng)關(guān)系,所以 GuardedObject 內(nèi)部使用了一個 map 來保存對象,key 是對應(yīng)的請求 id。
GuardedObject 類代碼如下:
public class GuardedObject四、Balking 模式{ private T obj; private final Lock lock = new ReentrantLock(); private final Condition finished = lock.newCondition(); private static final ConcurrentHashMap map = new ConcurrentHashMap<>(); //創(chuàng)建對象 public static GuardedObject create(int id){ GuardedObject guardedObject = new GuardedObject(); //保存對象和請求的對應(yīng)關(guān)系 map.put(id, guardedObject); return guardedObject; } //處理請求 public static void handleRequest(int id, Object obj){ GuardedObject guardedObject = map.remove(id); if (guardedObject != null){ //具體的處理邏輯省略 //處理完后通知 guardedObject.notice(obj); } } //調(diào)用方線程獲取結(jié)果 T get(Predicate p){ lock.lock(); try { while (!p.test(obj)){ finished.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return obj; } //執(zhí)行完后通知 void notice(T obj){ lock.lock(); try { this.obj = obj; finished.signalAll(); } finally { lock.unlock(); } } }
Balking 模式的典型應(yīng)用場景是,業(yè)務(wù)邏輯依賴于某個條件變量的狀態(tài),因此這種模式又可以理解為多線程版本的 if。
public class BalkingTest { private boolean flag = false; public void execute(){ if (!flag){ return; } //具體的執(zhí)行操作省略 flag = false; } public void test(){ //省略業(yè)務(wù)代碼若干 flag = true; } }
例如上面這個例子,一段業(yè)務(wù)邏輯會改變 flag 的值,另一個方法會根據(jù) flag 的值來決定是否繼續(xù)執(zhí)行。
這個程序并不是線程安全的,解決的辦法也很簡單,就是加互斥鎖,然后可以將改變 flag 值的邏輯多帶帶拿出來,如下:
public class BalkingTest { private boolean flag = false; public synchronized void execute(){ if (!flag){ return; } //具體的執(zhí)行操作省略 flag = false; } public void test(){ //省略業(yè)務(wù)代碼若干 change(); } public synchronized void change(){ flag = true; } }
Balking 模式一般可以使用互斥鎖來實現(xiàn),并且可以將對條件變量的改變的邏輯和業(yè)務(wù)邏輯進行分離,這樣能夠減小鎖的粒度,提升性能。Balking 模式大多應(yīng)用于需要快速失敗的場景,即當(dāng)條件變量不滿足,則直接失敗。這也是它和 Guarded Suspension 模式的區(qū)別,因為 Guarded Suspension 模式在條件不滿足的時候,會一直等待條件滿足。
五、Worker - Thread 模式Worker Thread 模式,對應(yīng)到現(xiàn)實世界,類似工廠中的工人做任務(wù),當(dāng)有任務(wù)的時候,工人取出任務(wù)執(zhí)行。
解決的辦法是使用線程池,并且使用一個阻塞隊列來存儲任務(wù),線程池中的線程從隊列中取出任務(wù)執(zhí)行。線程池的使用需要注意幾點:
任務(wù)隊列盡量使用有界隊列,避免任務(wù)過多造成 OOM。
應(yīng)該明確指定拒絕策略,可以根據(jù)實際情況實現(xiàn) RejectedExecutionHandler 接口自定義拒絕策略。
應(yīng)該給線程指定一個有意義的名字,最好和業(yè)務(wù)相關(guān)。
為不同的任務(wù)創(chuàng)建不同的線程池,這樣能夠有效的避免死鎖問題。
六、Two - Phase Termination 模式 1. 兩階段終止概念Two - Phase Termination,即兩階段終止,主要是為解決如何正確的終止一個線程,這里說的是一個線程終止另一個線程,而不是線程終止自己。
Java 中的線程提供了一個 stop() 方法用來終止線程,這不過這個方法會直接將線程殺死,風(fēng)險太高,并且這個方法已經(jīng)被標(biāo)記為廢棄,不建議使用了。
兩階段終止,即將線程的結(jié)束分為了兩個階段,第一個階段是一個線程 T1 向另一個線程 T2 發(fā)送終止指令,第二個階段是線程 T2 響應(yīng)終止指令。
根據(jù) Java 的線程狀態(tài),線程如果要進入 TERMINATED 狀態(tài)則必須先進入 RUNNABLE 狀態(tài),而處于 RUNNABLE 狀態(tài)的線程有可能轉(zhuǎn)換到休眠狀態(tài)。
Java 的線程提供了 interrupt() 方法,這個方法的作用便是將線程的狀態(tài)從休眠狀態(tài)轉(zhuǎn)換到 RUNNABLE 狀態(tài)。
切換到 RUNNABLE 狀態(tài)之后,線程有兩種方式可以終止,一是執(zhí)行完 run() 方法,自動進入終止?fàn)顟B(tài);二是設(shè)置一個標(biāo)志,線程如果檢測到這個標(biāo)志,則退出 run() 方法,這就是兩階段終止的響應(yīng)終止指令。
2. 程序示例下面是一個簡單的使用 interrupt() 方法和中斷標(biāo)志位來終止線程的示例:
public class Test { public static void main(String[] args) { Thread thread = new Thread(() -> { //檢測到中斷則退出 while (!Thread.currentThread().isInterrupted()){ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); //重新設(shè)置中斷標(biāo)志 Thread.currentThread().interrupt(); } System.out.println("I am roseduan"); } }); thread.start(); thread.interrupt(); } }
程序要每隔三秒打印語句,但是線程啟動之后就直接調(diào)用了 interrupt() 方法,所以線程直接退出了。需要注意的是這里在捕獲異常之后,需要重新設(shè)置線程的中斷狀態(tài),因為 JVM 的異常處理會清除線程的中斷狀態(tài)。
在實際的生產(chǎn)中,并不推薦使用這種方式,因為在 Thread 內(nèi)部可能會調(diào)用其他的方法,而其他的方法并不能夠保證正確的處理了線程中斷,解決的辦法便是自定義一個線程的中斷標(biāo)志,如下所示:
public class Test { //自定義中斷標(biāo)志 private volatile boolean isTerminated = false; private Thread thread; public synchronized void start(){ thread = new Thread(() -> { //檢測到中斷則退出 while (!isTerminated) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); //重新設(shè)置中斷狀態(tài) Thread.currentThread().interrupt(); } System.out.println("I am roseduan"); } isTerminated = false; }); thread.start(); } //線程終止方法 public synchronized void stop(){ isTerminated = true; thread.interrupt(); } }3. 終止線程池
Java 中并不太會顯式的創(chuàng)建和終止一個線程,使用更多的是線程池。
Java 中的線程池提供了兩個方法來終止,分別是 shutdown() 和 shutdownNow() ,兩個方法的區(qū)別如下:
shutdown():拒絕新的任務(wù),等待正在執(zhí)行的和已經(jīng)在阻塞隊列中的任務(wù)執(zhí)行完后,再關(guān)閉線程池
shutdownNow():直接關(guān)閉線程池,拒絕新的任務(wù),并且中斷正在執(zhí)行的任務(wù),已經(jīng)在阻塞隊列中的任務(wù)也不會被執(zhí)行了。
七、Producer - Consumer 模式這是較為常用的生產(chǎn)者 - 消費者模式,Java 中的線程池就使用了這種模式,線程的使用方是生產(chǎn)者,提供任務(wù),線程池本身是消費者,取出并執(zhí)行任務(wù)。
生產(chǎn)者 - 消費者模式使用了一個任務(wù)隊列,生產(chǎn)者將任務(wù)添加到隊列中,消費者從隊列中取出任務(wù)執(zhí)行。
這樣的設(shè)計的目的有三個:
解耦,生產(chǎn)者和消費者之間沒有直接的關(guān)聯(lián),而是通過隊列進行通信。
其次可以實現(xiàn)異步,例如生產(chǎn)者可以不用管消費者的行為,直接將任務(wù)添加到隊列中。消費者也可以不在乎生產(chǎn)者,直接從隊列中取任務(wù)。
最后,可以平衡生產(chǎn)者和消費者之間的速度差異。
下面是一個簡單的生產(chǎn)者 - 消費者程序示例:
public class ProducerConsumerTest { private BlockingQueuequeue = new LinkedBlockingQueue<>(100); public void produce() { queue.add(new Task()); } public void consume() { Task task = queue.poll(); while (task != null){ task.execute(); task = queue.poll(); } System.out.println("沒有任務(wù)了"); } public static void main(String[] args) throws InterruptedException { Test test = new Test(); //生產(chǎn)者線程,創(chuàng)建10個任務(wù) Thread producer = new Thread(() -> { for (int i = 0; i < 10; i++) { test.produce(); } }); producer.start(); producer.join(); //消費者線程 Thread consumer = new Thread(test::consume); consumer.start(); } } class Task{ public void execute(){ System.out.println("執(zhí)行任務(wù)"); } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74809.html
摘要:表示的是兩個,當(dāng)其中任意一個計算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)常可見它的使用,在開始分析它的高并發(fā)實現(xiàn)機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計 分布式,高可用,和機器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
摘要:在本例中,講述的無鎖來自于并發(fā)包我們將這個無鎖的稱為。在這里,我們使用二維數(shù)組來表示的內(nèi)部存儲,如下變量存放所有的內(nèi)部元素。為什么使用二維數(shù)組去實現(xiàn)一個一維的呢這是為了將來進行動態(tài)擴展時可以更加方便。 我們已經(jīng)比較完整得介紹了有關(guān)無鎖的概念和使用方法。相對于有鎖的方法,使用無鎖的方式編程更加考驗一個程序員的耐心和智力。但是,無鎖帶來的好處也是顯而易見的,第一,在高并發(fā)的情況下,它比有鎖...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學(xué)習(xí)一下AbstractQueuedSynchronizer類的相關(guān)原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
閱讀 2186·2020-06-12 14:26
閱讀 2489·2019-08-29 16:41
閱讀 1890·2019-08-29 15:28
閱讀 2458·2019-08-26 13:43
閱讀 757·2019-08-26 13:37
閱讀 2779·2019-08-23 18:13
閱讀 2801·2019-08-23 15:31
閱讀 1020·2019-08-23 14:10