国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

【源起Netty 外傳】ScheduledThreadPoolExecutor源碼解讀

Eastboat / 1300人閱讀

引言

本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來越遠。于是乎,我決定:繼續(xù)保持……

使用

首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋)

</>復制代碼

  1. import java.time.LocalTime;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.ScheduledExecutorService;
  4. import java.util.concurrent.ScheduledFuture;
  5. import java.util.concurrent.TimeUnit;
  6. public class ScheduleExecutorServiceDemo {
  7. private final static ScheduledExecutorService scheduler =
  8. Executors.newScheduledThreadPool(5);
  9. public static void main(String args[]){
  10. final Runnable beeper = new Runnable() {
  11. public void run() {
  12. System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep");
  13. //TODO 沉睡吧,少年
  14. //try {
  15. // TimeUnit.SECONDS.sleep(3L);
  16. //} catch (InterruptedException e) {
  17. // e.printStackTrace();
  18. //}
  19. }
  20. };
  21. //從0s開始輸出beep,間隔1s
  22. final ScheduledFuture beeperHandle =
  23. scheduler.scheduleAtFixedRate(beeper, 0, 1, TimeUnit.SECONDS);
  24. //10s之后停止beeperHandle的瘋狂輸出行為
  25. scheduler.schedule(new Runnable() {
  26. public void run() {
  27. System.out.println("覺悟吧,beeperHandle!I will kill you!");
  28. beeperHandle.cancel(true);
  29. }
  30. }, 10, TimeUnit.SECONDS);
  31. }
  32. }

scheduleAtFixedRate也是該類常用的打開方式之一,網(wǎng)上很多文章會拿該方法與scheduleWithFixedDelay進行對比,對比結果其實和方法名一致:

</>復制代碼

  1. scheduleAtFixedRate //以固定頻率執(zhí)行
  2. scheduleWithFixedDelay //延遲方式執(zhí)行,間隔時間=間隔時間入?yún)?任務執(zhí)行時間

ScheduleExecutorService實則是Timer的進化版,主要改進了Timer單線程方面的弊端,改進方式自然是線程池,ScheduleExecutorService的好基友ScheduledThreadPoolExecutor華麗麗登場。其實ScheduledThreadPoolExecutor才是主角,ScheduleExecutorService扮演的是拋磚引玉中的磚……

先看下ScheduledThreadPoolExecutor類的江湖地位:

既然繼承自ThreadPoolExecutor,確乃線程池無疑。

疑問

本文以如下方法作為切入點:
public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

方法入?yún)?b>period(譯:周期)就是scheduleAtFixedRate所指的固定頻率嗎?
這個問題很好驗證,把示例中這部分代碼的注釋去掉就能得到答案。

</>復制代碼

  1. final Runnable beeper = new Runnable() {
  2. public void run() {
  3. System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep");
  4. //TODO 沉睡吧,少年
  5. //try {
  6. // TimeUnit.SECONDS.sleep(3L);
  7. //} catch (InterruptedException e) {
  8. // e.printStackTrace();
  9. //}
  10. }
  11. };

答案就是,如果方法執(zhí)行時間大于間隔周期period,則任務的下次執(zhí)行時間將超過period的設定!

執(zhí)行結果如下,可以看出任務間隔為3s,而不是period設置的1s

不禁好奇,ScheduleExecutorService是怎么實現(xiàn)的多長時間之后執(zhí)行下一個任務?有句話叫源碼之下無秘密,so..let"s do this !

源碼分析 1.初始化

從ScheduleExecutorService的初始化開始:

</>復制代碼

  1. private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

追隨調(diào)用鏈Executors.newScheduledThreadPool -> new ScheduledThreadPoolExecutor(corePoolSize),進入如下方法:

</>復制代碼

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); //注意最后一個參數(shù)
  3. }

線程池中的任務隊列用的new DelayedWorkQueue(),而DelayedWorkQueue是ScheduledThreadPoolExecutor的內(nèi)部類
初始化部分關注到這一點即可,之后會是一些成員變量的賦值,不作解釋。

2.任務封裝

接下來從scheduleAtFixedRate方法開始,進入它的實現(xiàn)方法:

</>復制代碼

  1. public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
  2. if (command == null || unit == null)
  3. throw new NullPointerException();
  4. if (period <= 0)
  5. throw new IllegalArgumentException();
  6. ScheduledFutureTask sft = new ScheduledFutureTask(command,
  7. null,
  8. triggerTime(initialDelay, unit),
  9. unit.toNanos(period));
  10. RunnableScheduledFuture t = decorateTask(command, sft);
  11. sft.outerTask = t;
  12. delayedExecute(t);
  13. return t;
  14. }

Runnable command被封裝成了ScheduledFutureTask類,無獨有偶,ScheduledFutureTask是ScheduledThreadPoolExecutor的另外一個內(nèi)部類。看下它的類關系圖:

有沒有發(fā)現(xiàn)ScheduledFutureTask實現(xiàn)了Comparable接口?眾所周知這個接口是以某種規(guī)則用來比較大小的,這里的規(guī)則就是任務的開始執(zhí)行時間——ScheduledFutureTask的一個屬性:

</>復制代碼

  1. /** The time the task is enabled to execute in nanoTime units */
  2. private long time;

compareTo方法就是明證:

</>復制代碼

  1. public int compareTo(Delayed other) {
  2. if (other == this) // compare zero if same object
  3. return 0;
  4. if (other instanceof ScheduledFutureTask) {
  5. ScheduledFutureTask x = (ScheduledFutureTask)other;
  6. long diff = time - x.time; //focus這里啊喂!!!
  7. if (diff < 0)
  8. return -1;
  9. else if (diff > 0)
  10. return 1;
  11. else if (sequenceNumber < x.sequenceNumber)
  12. return -1;
  13. else
  14. return 1;
  15. }
  16. long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
  17. return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  18. }

一般來說,這些比較(compare)放在集合中才有意義,那ScheduledFutureTask之后會放在哪個集合中嗎?有些朋友可能已經(jīng)猜到了,沒錯,ScheduledFutureTask后續(xù)會置于前文提到的DelayedWorkQueue中。

3.延時執(zhí)行

繼續(xù)ScheduledThreadPoolExecutor.scheduleAtFixedRate方法:

</>復制代碼

  1. ScheduledFutureTask sft = new ScheduledFutureTask(command,
  2. null,
  3. triggerTime(initialDelay, unit),
  4. unit.toNanos(period));
  5. RunnableScheduledFuture t = decorateTask(command, sft);
  6. sft.outerTask = t;
  7. delayedExecute(t); //醒醒,該你出場了

進入delayedExecute方法:

</>復制代碼

  1. private void delayedExecute(RunnableScheduledFuture task) {
  2. if (isShutdown())
  3. reject(task);
  4. else {
  5. super.getQueue().add(task); //代碼一 - 任務加入DelayedWorkQueue
  6. if (isShutdown() &&
  7. !canRunInCurrentRunState(task.isPeriodic()) &&
  8. remove(task))
  9. task.cancel(false);
  10. else
  11. ensurePrestart(); //代碼二 - 任務開始
  12. }
  13. }

追蹤 代碼一 位置的調(diào)用鏈:
-> DelayedWorkQueue.add -> offer -> siftUp(int k, RunnableScheduledFuture key)

</>復制代碼

  1. private void siftUp(int k, RunnableScheduledFuture key) {
  2. while (k > 0) {
  3. int parent = (k - 1) >>> 1;
  4. RunnableScheduledFuture e = queue[parent];
  5. if (key.compareTo(e) >= 0)
  6. break;
  7. queue[k] = e;
  8. setIndex(e, k);
  9. k = parent;
  10. }
  11. queue[k] = key;
  12. setIndex(key, k);
  13. }

可以看到,siftUp方法實現(xiàn)了向DelayedWorkQueue添加任務時(add),開始時間靠后的任務(ScheduledFutureTask)會放在后面

ok,回到 代碼二 位置的ensurePrestart方法,接著追:
ensurePrestart -> addWorker(Runnable firstTask, boolean core)

濃縮版addWorker方法如下:

</>復制代碼

  1. private boolean addWorker(Runnable firstTask, boolean core){
  2. ... //省略很多的驗證邏輯
  3. boolean workerStarted = false;
  4. boolean workerAdded = false;
  5. Worker w = null;
  6. try{
  7. w = new Worker(firstTask); //代碼三 - 封裝成worker,new Worker會從線程池中獲取線程
  8. final Thread t = w.thread;
  9. if (t != null){
  10. final ReentrantLock mainLock = this.mainLock;
  11. mainLock.lock();
  12. ... //省略部分狀態(tài)控制邏輯
  13. if (workerAdded){
  14. t.start(); //代碼四 - 執(zhí)行Worker的run方法
  15. workerStarted = true;
  16. }
  17. }
  18. }finally {
  19. if (! workerStarted)
  20. addWorkerFailed(w);
  21. }
  22. return workerStarted;
  23. }

這里發(fā)現(xiàn)firstTask(ScheduledFutureTask)再次被封裝成了Worker(代碼三),那么t.start()(代碼四),自然會執(zhí)行Worker的run方法,跟下Worker.run方法:Worker.run -> runWorker(Worker w)

濃縮后的runWorker

</>復制代碼

  1. final void runWorker(Worker w){
  2. ... //省略部分代碼
  3. try{
  4. while (task != null || (task = getTask()) != null){ //代碼五 - getTask()獲取任務
  5. ... //省略部分代碼
  6. task.run(); //代碼六 - 任務執(zhí)行
  7. ... //省略部分代碼
  8. }
  9. completedAbruptly = false;
  10. }finally{
  11. processWorkerExit(w, completedAbruptly);
  12. }
  13. }

老規(guī)矩,五、六兩處關鍵代碼分別看一下:

代碼五 getTask最終定位到DelayedWorkQueue.take方法,這里只分析延時任務的執(zhí)行情況

</>復制代碼

  1. public RunnableScheduledFuture take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. for (;;) {
  6. RunnableScheduledFuture first = queue[0];
  7. if (first == null)
  8. available.await();
  9. else {
  10. long delay = first.getDelay(NANOSECONDS);
  11. if (delay <= 0)
  12. return finishPoll(first);
  13. first = null; // don"t retain ref while waiting
  14. if (leader != null) //代碼八 - leader線程就是下一次的工作線程
  15. available.await();
  16. else {
  17. Thread thisThread = Thread.currentThread(); //代碼七 - 指定leader線程
  18. leader = thisThread;
  19. try {
  20. available.awaitNanos(delay); //等待
  21. } finally {
  22. if (leader == thisThread)
  23. leader = null;
  24. }
  25. }
  26. }
  27. }
  28. } finally {
  29. if (leader == null && queue[0] != null)
  30. available.signal();
  31. lock.unlock();
  32. }
  33. }

對于延時任務來說,線程池中第一個調(diào)用take的線程進來會作為leader線程(代碼七),然后等待。結束等待的位置在哪?在ScheduledFutureTask.run的調(diào)用中!(我作斷點調(diào)試的時候,這個等待時間總是很大,一般兩個小時以上,似乎直接用await就成?這一點確有疑問)。
而線程池中的其它線程調(diào)用take時,發(fā)現(xiàn)leader已經(jīng)被第一個線程搶了,只能等著(代碼八)

回到 代碼六 位置,task.run()也就是ScheduledFutureTask.run

</>復制代碼

  1. public void run() {
  2. boolean periodic = isPeriodic();
  3. if (!canRunInCurrentRunState(periodic))
  4. cancel(false);
  5. else if (!periodic)
  6. ScheduledFutureTask.super.run();
  7. else if (ScheduledFutureTask.super.runAndReset()) { //對于延時任務,會進入這個分支
  8. setNextRunTime();
  9. reExecutePeriodic(outerTask);
  10. }
  11. }

對于延時任務,會執(zhí)行ScheduledFutureTask.super.runAndReset()

</>復制代碼

  1. protected boolean runAndReset() {
  2. if (state != NEW ||
  3. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  4. null, Thread.currentThread()))
  5. return false;
  6. boolean ran = false;
  7. int s = state;
  8. try {
  9. Callable c = callable;
  10. if (c != null && s == NEW) {
  11. try {
  12. //代碼九 - 阻塞式等待beeper完成
  13. c.call(); // don"t set result
  14. ran = true;
  15. } catch (Throwable ex) {
  16. setException(ex);
  17. }
  18. }
  19. } finally {
  20. // runner must be non-null until state is settled to
  21. // prevent concurrent calls to run()
  22. runner = null;
  23. // state must be re-read after nulling runner to prevent
  24. // leaked interrupts
  25. s = state;
  26. if (s >= INTERRUPTING)
  27. handlePossibleCancellationInterrupt(s);
  28. }
  29. return ran && s == NEW;
  30. }

runAndReset方法會等待最初定義的beeper邏輯執(zhí)行完成(代碼九),這也解釋了為什么scheduleAtFixedRate的下次任務執(zhí)行時間會有可能超過參數(shù)period的設定!

然后調(diào)用reExecutePeriodic

</>復制代碼

  1. void reExecutePeriodic(RunnableScheduledFuture task) {
  2. if (canRunInCurrentRunState(true)) {
  3. super.getQueue().add(task); //隊列中再次加入任務
  4. if (!canRunInCurrentRunState(true) && remove(task))
  5. task.cancel(false);
  6. else
  7. ensurePrestart(); //再次回到ensurePrestart方法
  8. }
  9. }

reExecutePeriodic方法看上去是不是似曾相識,與本小節(jié)(3.延時執(zhí)行)開端的delayedExecute方法對比下:

</>復制代碼

  1. private void delayedExecute(RunnableScheduledFuture task) {
  2. if (isShutdown())
  3. reject(task);
  4. else {
  5. super.getQueue().add(task); //任務加入DelayedWorkQueue
  6. if (isShutdown() &&
  7. !canRunInCurrentRunState(task.isPeriodic()) &&
  8. remove(task))
  9. task.cancel(false);
  10. else
  11. ensurePrestart(); //任務開始
  12. }
  13. }

都是加入隊列,然后任務開始!

DelayedWorkQueue.add中到底做了什么?之前沒有分析,在這里看一下:DelayedWorkQueue.add -> offer

</>復制代碼

  1. public boolean offer(Runnable x) {
  2. if (x == null)
  3. throw new NullPointerException();
  4. RunnableScheduledFuture e = (RunnableScheduledFuture)x;
  5. final ReentrantLock lock = this.lock;
  6. lock.lock();
  7. try {
  8. int i = size;
  9. if (i >= queue.length)
  10. grow();
  11. size = i + 1;
  12. if (i == 0) {
  13. queue[0] = e;
  14. setIndex(e, 0);
  15. } else {
  16. siftUp(i, e);
  17. }
  18. if (queue[0] == e) {
  19. leader = null; //將leader賦值清除
  20. available.signal(); //代碼十 - 通知線程
  21. }
  22. } finally {
  23. lock.unlock();
  24. }
  25. return true;
  26. }

可以看到,就是在offer方法(代碼十),將DelayedWorkQueue.take中的available.awaitNanos(delay)喚醒了!

總結

是不是已經(jīng)繞暈了?很正常,因為源碼終歸是需要自己去讀個幾遍才能理清整個脈絡。所以老鐵們,加油!

最后的總結還是不能缺少的,一個定時任務的執(zhí)行流程是這樣的:

1.任務開始時,將任務ScheduledFutureTask放入隊列DelayedWorkQueue。任務放入過程會計算該任務的開始執(zhí)行時間,執(zhí)行時間靠前的放入隊列的前端,執(zhí)行時間靠后的放入隊列的后端。

2.之后的ensurePrestart方法,先從線程池中獲取線程,該線程會從隊列DelayedWorkQueue中獲取ScheduledFutureTask

獲取過程DelayedWorkQueue.take先計算任務的延時時間delay ,有兩種情況:

delay<=0 已不需要延時,立即獲取任務

delay>0 需要延時,出現(xiàn)如下局面:

第一個進入的線程成為leader

其它線程等待

</>復制代碼

  1. long delay = first.getDelay(NANOSECONDS); //計算延時時間delay
  2. //已不需要延時,立即獲取任務
  3. if (delay <= 0)
  4. return finishPoll(first);
  5. first = null; // don"t retain ref while waiting
  6. //需要延時的任務(與此同時有任務正在執(zhí)行)
  7. if (leader != null) //其它線程進來時,有l(wèi)eader線程存在了,等待
  8. available.await();
  9. else {
  10. Thread thisThread = Thread.currentThread(); //第一個進入這里的線程會成為leader
  11. leader = thisThread;
  12. try {
  13. available.awaitNanos(delay); //等待
  14. } finally {
  15. if (leader == thisThread)
  16. leader = null;
  17. }
  18. }

3.獲取任務后,進入執(zhí)行環(huán)節(jié)Worker.run -> ScheduledFutureTask.run。執(zhí)行過程會阻塞式等待任務完成,這也是任務執(zhí)行時間可能會超過period的原因!任務執(zhí)行結束會再次放入任務,這樣又回到步驟1,反復執(zhí)行。

感謝

分析Java延遲與周期任務的實現(xiàn)原理描述

文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/76305.html

相關文章

  • 源起Netty 外傳ScheduledThreadPoolExecutor源碼解讀

    引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來越遠。于是乎,我決定:繼續(xù)保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....

    funnyZhang 評論0 收藏0
  • 源起Netty 外傳ScheduledThreadPoolExecutor源碼解讀

    引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經(jīng)在偏離主題的道路上越來越遠。于是乎,我決定:繼續(xù)保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....

    Martin91 評論0 收藏0
  • 源起Netty 外傳】FastThreadLocal怎么Fast?

    摘要:實現(xiàn)原理淺談幫助理解的示意圖中有一屬性,類型是的靜態(tài)內(nèi)部類。剛剛說過,是一個中的靜態(tài)內(nèi)部類,則是的內(nèi)部節(jié)點。這個會在線程中,作為其屬性初始是一個數(shù)組的索引,達成與類似的效果。的方法被調(diào)用時,會根據(jù)記錄的槽位信息進行大掃除。 概述 FastThreadLocal的類名本身就充滿了對ThreadLocal的挑釁,快男FastThreadLocal是怎么快的?源碼中類注釋坦白如下: /** ...

    gxyz 評論0 收藏0
  • 源起Netty 外傳】ServiceLoader詳解

    摘要:答曰摸索直譯為服務加載器,最終目的是獲取的實現(xiàn)類。代碼走起首先,要有一個接口形狀接口介紹然后,要有該接口的實現(xiàn)類。期具體實現(xiàn)依靠的內(nèi)部類,感性趣的朋友可以自己看一下。總結重點在于可跨越包獲取,這一點筆者通過多模塊項目親測延時加載特性 前戲 netty源碼注釋有云: ... If a provider class has been installed in a jar file tha...

    MoAir 評論0 收藏0
  • 源起Netty 外傳】System.getPropert()詳解

    摘要:閱讀源碼時,發(fā)現(xiàn)很多,理所當然會想翻閱資料后,該技能,姿勢如下環(huán)境中的全部屬性全部屬性注意如果將本行代碼放在自定義屬性之后,會不會打出把自定義屬性也給獲取到可以結論會獲取目前環(huán)境中全部的屬性值,無論系統(tǒng)提供還是個人定義系統(tǒng)提供屬性代碼中定義 閱讀源碼時,發(fā)現(xiàn)很多System.getProperty(xxx),理所當然會想:whats fucking this? 翻閱資料后,Get該技能...

    lixiang 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<