摘要:叫做回環是因為當所有等待線程都被釋放以后,可以被重用。我們暫且把這個狀態就叫做,當調用方法之后,線程就處于了。
CountDownLatch
CountDownLatch 類位于 java.util.concurrent 包下,利用它可以實現類似計數器的功能。比如有一個任務A,它要等待其他4個任務執行完畢之后才能執行,此時就可以利用CountDownLatch來實現這種功能了。
CountDownLatch類只提供了一個構造器:
public CountDownLatch(int count) { }; //參數count為計數值
然后下面這3個方法是CountDownLatch類中最重要的方法:
public void await() throws InterruptedException { }; //調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()類似,只不過等待一定的時間后count值還沒變為0的話就會繼續執行 public void countDown() { }; //將count值減1
代碼實現
package sychronized; import static net.mindview.util.Print.*; import java.util.concurrent.*; class Task implements Runnable{ private static int count = 0; private final int id = count++; final CountDownLatch latch ; public Task(CountDownLatch latch){ this.latch = latch; } @Override public void run(){ try { print(this+"正在執行"); TimeUnit.MILLISECONDS.sleep(3000); print(this+"執行完畢"); latch.countDown(); } catch (InterruptedException e) { print(this + " 被中斷"); } } @Override public String toString() { return "Task-"+id; } } public class Test { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Task(latch)); exec.execute(new Task(latch)); try { print("等待2個子線程執行完畢..."); long start = System.currentTimeMillis(); latch.await(); long end = System.currentTimeMillis(); print("2個子線程已經執行完畢 "+(end - start)); print("繼續執行主線程"); }catch (InterruptedException e){ print("主線程被中斷"); } exec.shutdown(); } } #輸出結果: 等待2個子線程執行完畢... Task-0正在執行 Task-1正在執行 Task-0執行完畢 Task-1執行完畢 2個子線程已經執行完畢 3049 繼續執行主線程CyclicBarrier
字面意思回環柵欄,通過它可以實現讓一組線程等待至某個狀態之后再全部同時執行。叫做回環是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用。我們暫且把這個狀態就叫做barrier,當調用await()方法之后,線程就處于barrier了。
CyclicBarrier類位于java.util.concurrent包下,CyclicBarrier提供2個構造器:
參數parties指讓多少個線程或者任務等待至barrier狀態
參數barrierAction為當這些線程都達到barrier狀態時會執行的內容
public CyclicBarrier(int parties, Runnable barrierAction) {} public CyclicBarrier(int parties) {}
然后CyclicBarrier中最重要的方法就是 await 方法,它有2個重載版本:
第一個版本比較常用,用來掛起當前線程,直至所有線程都到達barrier狀態再同時執行后續任務;
第二個版本是讓這些線程等待至一定的時間,如果還有線程沒有到達barrier狀態就直接讓到達barrier的線程執行后續任務。
public int await() throws InterruptedException, BrokenBarrierException { }; public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
代碼展示
package sychronized; import java.util.Random; import java.util.concurrent.*; import static net.mindview.util.Print.*; class WriteTask implements Runnable{ private static int count = 0; private final int id = count++; private CyclicBarrier barrier ; private static Random random = new Random(47); public WriteTask(CyclicBarrier cyclicBarrier) { this.barrier = cyclicBarrier; } @Override public void run() { print(this+"開始寫入數據..."); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(5000)); //以睡眠來模擬寫入數據操作 print(this+"寫入數據完畢,等待其他線程寫入完畢"+" "+System.currentTimeMillis()); barrier.await(); } catch (InterruptedException e) { print(this + "is interrupted!"); }catch(BrokenBarrierException e){ throw new RuntimeException(e); } print("所有任務寫入完畢,繼續處理其他任務... "+System.currentTimeMillis()); } @Override public String toString() { return getClass().getSimpleName()+"-"+id; } } public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < N; ++i){ exec.execute(new WriteTask(barrier)); } exec.shutdown(); } } #輸出結果: WriteTask-3 開始寫入數據... WriteTask-2 開始寫入數據... WriteTask-1 開始寫入數據... WriteTask-0 開始寫入數據... WriteTask-2 寫入數據完畢,等待其他線程寫入完畢 1512048648904 WriteTask-1 寫入數據完畢,等待其他線程寫入完畢 1512048650042 WriteTask-0 寫入數據完畢,等待其他線程寫入完畢 1512048650209 WriteTask-3 寫入數據完畢,等待其他線程寫入完畢 1512048652606 所有任務寫入完畢,繼續處理其他任務... 1512048652607 所有任務寫入完畢,繼續處理其他任務... 1512048652607 所有任務寫入完畢,繼續處理其他任務... 1512048652607 所有任務寫入完畢,繼續處理其他任務... 1512048652607
**
如果說想在所有線程寫入操作完之后,進行額外的其他操作可以為CyclicBarrier提供Runnable參數:
**
package sychronized; import java.util.Random; import java.util.concurrent.*; import static net.mindview.util.Print.*; class WriteTask implements Runnable{ private static int count = 0; private final int id = count++; private CyclicBarrier barrier ; private static Random random = new Random(47); public WriteTask(CyclicBarrier cyclicBarrier) { this.barrier = cyclicBarrier; } @Override public void run() { print(this+" 開始寫入數據..."); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(5000)); //以睡眠來模擬寫入數據操作 print(this+" 寫入數據完畢,等待其他線程寫入完畢"+" "+System.currentTimeMillis()); barrier.await(); TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { print(this + "is interrupted!"); }catch(BrokenBarrierException e){ throw new RuntimeException(e); } print("所有任務寫入完畢,繼續處理其他任務... "+System.currentTimeMillis()+Thread.currentThread()); } @Override public String toString() { return getClass().getSimpleName()+"-"+id; } } public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() { @Override public void run() { print(Thread.currentThread()); } }); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < N; ++i){ exec.execute(new WriteTask(barrier)); } exec.shutdown(); } } #輸出結果為: WriteTask-3 開始寫入數據... WriteTask-1 開始寫入數據... WriteTask-2 開始寫入數據... WriteTask-0 開始寫入數據... WriteTask-1 寫入數據完畢,等待其他線程寫入完畢 1512049061954 WriteTask-2 寫入數據完畢,等待其他線程寫入完畢 1512049063092 WriteTask-0 寫入數據完畢,等待其他線程寫入完畢 1512049063261 WriteTask-3 寫入數據完畢,等待其他線程寫入完畢 1512049065657 Thread[pool-1-thread-4,5,main] 所有任務寫入完畢,繼續處理其他任務... 1512049065668Thread[pool-1-thread-2,5,main] 所有任務寫入完畢,繼續處理其他任務... 1512049065668Thread[pool-1-thread-1,5,main] 所有任務寫入完畢,繼續處理其他任務... 1512049065668Thread[pool-1-thread-4,5,main] 所有任務寫入完畢,繼續處理其他任務... 1512049065668Thread[pool-1-thread-3,5,main]
從結果可以看出,當四個線程都到達barrier狀態后,會從四個線程中選擇一個線程去執行Runnable。
另外CyclicBarrier是可以重用的,看下面這個例子:
package sychronized; import java.util.Random; import java.util.concurrent.*; import static net.mindview.util.Print.*; class WriteTask implements Runnable{ private static int count = 0; private final int id = count++; private CyclicBarrier barrier ; private static Random random = new Random(47); public WriteTask(CyclicBarrier cyclicBarrier) { this.barrier = cyclicBarrier; } @Override public void run() { while (!Thread.interrupted()){ print(this+" 開始寫入數據..."); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(5000)); //以睡眠來模擬寫入數據操作 print(this+" 寫入數據完畢,等待其他線程寫入完畢"+" "+System.currentTimeMillis()); barrier.await(); TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { print(this + "is interrupted!"); }catch(BrokenBarrierException e){ throw new RuntimeException(e); } print("所有任務寫入完畢,繼續處理其他任務... "+System.currentTimeMillis()); } } @Override public String toString() { return getClass().getSimpleName()+"-"+id; } } class CyclicBarrierManager implements Runnable{ private CyclicBarrier barrier ; private ExecutorService exec; public CyclicBarrierManager(CyclicBarrier barrier, ExecutorService exec,int N){ this.barrier = barrier ; this.exec = exec; for (int i = 0; i < N-1; ++i){ exec.execute(new WriteTask(barrier)); } } @Override public void run(){ while (!Thread.interrupted()){ try { barrier.await(); }catch (InterruptedException e){ print(getClass().getSimpleName()+" 被中斷了!"); }catch (BrokenBarrierException e){ throw new RuntimeException(e); } } } } public class CyclicBarrierTest { public static void main(String[] args) throws Exception{ int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new CyclicBarrierManager(barrier,exec,N)); exec.shutdown(); } } #輸出結果: WriteTask-1 開始寫入數據... WriteTask-2 開始寫入數據... WriteTask-0 開始寫入數據... WriteTask-2 寫入數據完畢,等待其他線程寫入完畢 1512051484365 WriteTask-0 寫入數據完畢,等待其他線程寫入完畢 1512051485503 WriteTask-1 寫入數據完畢,等待其他線程寫入完畢 1512051488068 所有任務寫入完畢,繼續處理其他任務... 1512051488078 所有任務寫入完畢,繼續處理其他任務... 1512051488078 WriteTask-2 開始寫入數據... 所有任務寫入完畢,繼續處理其他任務... 1512051488078 WriteTask-1 開始寫入數據... WriteTask-0 開始寫入數據... WriteTask-0 寫入數據完畢,等待其他線程寫入完畢 1512051488513 WriteTask-1 寫入數據完畢,等待其他線程寫入完畢 1512051489045 WriteTask-2 寫入數據完畢,等待其他線程寫入完畢 1512051489945 所有任務寫入完畢,繼續處理其他任務... 1512051489955 WriteTask-0 開始寫入數據... 所有任務寫入完畢,繼續處理其他任務... 1512051489955 所有任務寫入完畢,繼續處理其他任務... 1512051489955 WriteTask-2 開始寫入數據... WriteTask-1 開始寫入數據... WriteTask-2 寫入數據完畢,等待其他線程寫入完畢 1512051490155 WriteTask-1 寫入數據完畢,等待其他線程寫入完畢 1512051494477 WriteTask-0 寫入數據完畢,等待其他線程寫入完畢 1512051494823 所有任務寫入完畢,繼續處理其他任務... 1512051494833 所有任務寫入完畢,繼續處理其他任務... 1512051494833 WriteTask-0 開始寫入數據... 所有任務寫入完畢,繼續處理其他任務... 1512051494833 WriteTask-1 開始寫入數據... WriteTask-2 開始寫入數據... WriteTask-2 寫入數據完畢,等待其他線程寫入完畢 1512051494961 WriteTask-0 寫入數據完畢,等待其他線程寫入完畢 1512051496040 WriteTask-1 寫入數據完畢,等待其他線程寫入完畢 1512051498121 所有任務寫入完畢,繼續處理其他任務... 1512051498132 所有任務寫入完畢,繼續處理其他任務... 1512051498132 WriteTask-1 開始寫入數據... 所有任務寫入完畢,繼續處理其他任務... 1512051498132Semaphore
Semaphore翻譯成字面意思為 信號量,Semaphore 可以同時讓多個線程同時訪問共享資源,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
Semaphore類位于java.util.concurrent包下,它提供了2個構造器:
public Semaphore(int permits) { //參數permits表示許可數目,即同時可以允許多少線程進行訪問 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //這個多了一個參數fair表示是否是公平的,即等待時間越久的越先獲取許可 sync = (fair)? new FairSync(permits) : new NonfairSync(permits); }
下面說一下Semaphore類中比較重要的幾個方法,首先是acquire()、release()方法:
public void acquire() throws InterruptedException { } //獲取一個許可 public void acquire(int permits) throws InterruptedException { } //獲取permits個許可 public void release() {} //釋放一個許可 public void release(int permits) {} //釋放permits個許可
acquire()用來獲取一個許可,若無許可能夠獲得,則會一直等待,直到獲得許可。
release()用來釋放許可。
注意,在釋放許可之前,必須先獲獲得許可。
這4個方法都會被阻塞,如果想立即得到執行結果,可以使用下面幾個方法:
public boolean tryAcquire() { }; //嘗試獲取一個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取一個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false
package sychronized; import java.util.Random; import java.util.concurrent.*; import static net.mindview.util.Print.*; class Worker implements Runnable{ private static int count = 0; private final int id = count++; private int finished = 0; private Random random = new Random(47); private Semaphore semaphore; public Worker(Semaphore semaphore){ this.semaphore = semaphore; } @Override public void run(){ try { while (!Thread.interrupted()){ semaphore.acquire(); print(this+" 占用一個機器在生產... "); TimeUnit.MILLISECONDS.sleep(random.nextInt(2000)); synchronized (this){ print(" 已經生產了"+(++finished)+"個產品,"+"釋放出機器"); } semaphore.release(); } } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return getClass().getSimpleName()+"-"+id; } } public class SemaphoreTest { public static void main(String[] args) { int N = 8; //工人數 Semaphore semaphore = new Semaphore(5); //機器數目 ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < N; ++i){ exec.execute(new Worker(semaphore)); } exec.shutdown(); } }總結
CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同:
CountDownLatch 一般用于某個線程A等待若干個其他線程執行完任務之后,它才執行;
CyclicBarrier 一般用于一組線程互相等待至某個狀態,然后這一組線程再同時執行;
CountDownLatch 是不能夠重用的,而 CyclicBarrier 是可以重用的。
Semaphore 其實和鎖有點類似,它一般用于控制對 某組 資源的訪問權限,而鎖是控制對 某個 資源的訪問權限。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70668.html
摘要:前言之前學多線程的時候沒有學習線程的同步工具類輔助類。而其它線程完成自己的操作后,調用使計數器減。信號量控制一組線程同時執行。 前言 之前學多線程的時候沒有學習線程的同步工具類(輔助類)。ps:當時覺得暫時用不上,認為是挺高深的知識點就沒去管了.. 在前幾天,朋友發了一篇比較好的Semaphore文章過來,然后在瀏覽博客的時候又發現面試還會考,那還是挺重要的知識點。于是花了點時間去了解...
摘要:所有示例代碼請見下載于基本概念并發同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內存這些線程是同時存在的,每個線程都處于執行過程中的某個狀態,如果運行在多核處理器上此時,程序中的每個線程都 所有示例代碼,請見/下載于 https://github.com/Wasabi1234... showImg(https://upload-images.jians...
摘要:倒計時鎖,線程中調用使進程進入阻塞狀態,當達成指定次數后通過繼續執行每個線程中剩余的內容。實現分階段的的功能測試代碼拿客網站群三產創建于年月日。 同步器 為每種特定的同步問題提供了解決方案 Semaphore Semaphore【信號標;旗語】,通過計數器控制對共享資源的訪問。 測試類: package concurrent; import concurrent.th...
摘要:對于,我們僅僅需要關心兩個方法,一個是方法,另一個是方法。首先,我們來看方法,它代表線程阻塞,等待的值減為。首先,的源碼實現和大相徑庭,基于的共享模式的使用,而基于來實現。 前言 本文先用 CountDownLatch 將共享模式說清楚,然后順著把其他 AQS 相關的類 CyclicBarrier、Semaphore 的源碼一起過一下。 CountDownLatch CountDown...
閱讀 3465·2021-09-08 09:36
閱讀 2556·2019-08-30 15:54
閱讀 2355·2019-08-30 15:54
閱讀 1768·2019-08-30 15:44
閱讀 2391·2019-08-26 14:04
閱讀 2444·2019-08-26 14:01
閱讀 2880·2019-08-26 13:58
閱讀 1330·2019-08-26 13:47