摘要:每個(gè)消息都會(huì)被一個(gè)線程消費(fèi),同時(shí)最大并發(fā)量為。然后提交一個(gè)任務(wù)到線程池中,這個(gè)任務(wù)的內(nèi)容是從等待隊(duì)列中取出一個(gè),如果等待隊(duì)列為空,則刪除這個(gè)等待隊(duì)列的。小結(jié)本文分析了的久經(jīng)生產(chǎn)考驗(yàn)的核心組件線程池。
本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog...前言
在ZStack中,最基本的執(zhí)行單位不僅僅是一個(gè)函數(shù),也可以是一個(gè)任務(wù)(Task。其本質(zhì)實(shí)現(xiàn)了Java的Callable接口)。通過大小合理的線程池調(diào)度來(lái)并行的消費(fèi)這些任務(wù),使ZStack這個(gè)Iaas軟件有條不紊運(yùn)行在大型的數(shù)據(jù)中心里。
對(duì)線程池不太了解的同學(xué)可以先看我的一篇博客:Java多線程筆記(三):線程池演示代碼
在這里,將以ZStack中ThreadFacade最常用的方法為例進(jìn)行演示。
syncSubmit提交同步任務(wù),線程將會(huì)等結(jié)果完成后才繼續(xù)下一個(gè)任務(wù)。
這里先參考ZStack中ApiMediatorImpl ,其中有一段用于API消息調(diào)度的邏輯。
@Override public void handleMessage(final Message msg) { thdf.syncSubmit(new SyncTask
每個(gè)API消息都會(huì)被一個(gè)線程消費(fèi),同時(shí)最大并發(fā)量為5(apiWorkerNum=5)。每個(gè)線程都會(huì)等著API消息的回復(fù),等到回復(fù)后便給用戶。
chainSubmit提交異步任務(wù),這里的任務(wù)執(zhí)行后將會(huì)執(zhí)行隊(duì)列中的下一個(gè)任務(wù),不會(huì)等待結(jié)果。
參考VmInstanceBase關(guān)于虛擬機(jī)啟動(dòng)、重啟、暫停相關(guān)的代碼:
//暫停虛擬機(jī) protected void handle(final APIStopVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("stop-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { stopVm(msg, chain); } }); } //重啟虛擬機(jī) protected void handle(final APIRebootVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("reboot-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { rebootVm(msg, chain); } }); } //啟動(dòng)虛擬機(jī) protected void handle(final APIStartVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("start-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { startVm(msg, chain); } }); }通用特性
getSyncSignature則指定了其隊(duì)列的key,這個(gè)任務(wù)隊(duì)列本質(zhì)一個(gè)Map。根據(jù)相同的k,將任務(wù)作為v按照順序放入map執(zhí)行。單從這里的業(yè)務(wù)邏輯來(lái)看,可以有效避免虛擬機(jī)的狀態(tài)混亂。
chainTask的默認(rèn)并發(fā)度為1,這意味著它是同步的。在稍后的源碼解析中我們將會(huì)看到。它的實(shí)現(xiàn)
先從接口ThreadFacade了解一下方法簽名:
public interface ThreadFacade extends Component {Future submit(Task task);//提交一個(gè)任務(wù) Future syncSubmit(SyncTask task); //提交一個(gè)有返回值的任務(wù) Future chainSubmit(ChainTask task); //提交一個(gè)沒有返回值的任務(wù) Future submitPeriodicTask(PeriodicTask task, long delay); //提交一個(gè)周期性任務(wù),將在一定時(shí)間后執(zhí)行 Future submitPeriodicTask(PeriodicTask task); //提交一個(gè)周期性任務(wù) Future submitCancelablePeriodicTask(CancelablePeriodicTask task); //提交一個(gè)可以取消的周期性任務(wù) Future submitCancelablePeriodicTask(CancelablePeriodicTask task, long delay); //提交一個(gè)可以取消的周期性任務(wù),將在一定時(shí)間后執(zhí)行 void registerHook(ThreadAroundHook hook); //注冊(cè)鉤子 void unregisterHook(ThreadAroundHook hook); //取消鉤子 ThreadFacadeImpl.TimeoutTaskReceipt submitTimeoutTask(Runnable task, TimeUnit unit, long delay); //提交一個(gè)過了一定時(shí)間就算超時(shí)的任務(wù) void submitTimerTask(TimerTask task, TimeUnit unit, long delay); //提交一個(gè)timer任務(wù) }
以及幾個(gè)方法邏輯實(shí)現(xiàn)類DispatchQueueImpl中的幾個(gè)成員變量。
private static final CLogger logger = Utils.getLogger(DispatchQueueImpl.class); @Autowired ThreadFacade _threadFacade; private final HashMapsyncTasks = new HashMap (); private final HashMap chainTasks = new HashMap (); private static final CLogger _logger = CLoggerImpl.getLogger(DispatchQueueImpl.class); public static final String DUMP_TASK_DEBUG_SINGAL = "DumpTaskQueue";
關(guān)鍵就是syncTasks(同步隊(duì)列)和chainTasks(異步隊(duì)列) ,用于存儲(chǔ)兩種類型的任務(wù)隊(duì)列。
因此當(dāng)我們提交chainTask時(shí),要注意記得顯示的調(diào)用next方法,避免后面的任務(wù)調(diào)度不到。
接著,我們從最常用的幾個(gè)方法開始看它的代碼。
chainSubmit方法從ThreadFacadeImpl作為入口
@Override public FuturechainSubmit(ChainTask task) { return dpq.chainSubmit(task); }
DispatchQueue中的邏輯
//公有方法,即入口之一 @Override public FuturechainSubmit(ChainTask task) { return doChainSyncSubmit(task); }
//內(nèi)部邏輯 privateFuture doChainSyncSubmit(final ChainTask task) { assert task.getSyncSignature() != null : "How can you submit a chain task without sync signature ???"; DebugUtils.Assert(task.getSyncLevel() >= 1, String.format("getSyncLevel() must return 1 at least ")); synchronized (chainTasks) { final String signature = task.getSyncSignature(); ChainTaskQueueWrapper wrapper = chainTasks.get(signature); if (wrapper == null) { wrapper = new ChainTaskQueueWrapper(); chainTasks.put(signature, wrapper); } ChainFuture cf = new ChainFuture(task); wrapper.addTask(cf); wrapper.startThreadIfNeeded(); return cf; } }
這段邏輯大致為:
斷言syncSignature不為空,并且必須并行度必須大于等于1。因?yàn)?會(huì)被做成隊(duì)列,由一個(gè)線程完成這些任務(wù)。而1以上則指定了可以有幾個(gè)線程來(lái)完成同一個(gè)signature的任務(wù)。
加鎖HashMap
接下來(lái)就是startThreadIfNeeded。所謂ifNeeded就是指給這個(gè)隊(duì)列的線程數(shù)尚有空余。然后提交一個(gè)任務(wù)到線程池中,這個(gè)任務(wù)的內(nèi)容是:從等待隊(duì)列中取出一個(gè)Feture,如果等待隊(duì)列為空,則刪除這個(gè)等待隊(duì)列的Map。
private class ChainTaskQueueWrapper { LinkedList pendingQueue = new LinkedList(); final LinkedList runningQueue = new LinkedList(); AtomicInteger counter = new AtomicInteger(0); int maxThreadNum = -1; String syncSignature; void addTask(ChainFuture task) { pendingQueue.offer(task); if (maxThreadNum == -1) { maxThreadNum = task.getSyncLevel(); } if (syncSignature == null) { syncSignature = task.getSyncSignature(); } } void startThreadIfNeeded() { //如果運(yùn)行線程數(shù)量已經(jīng)大于等于限制,不start if (counter.get() >= maxThreadNum) { return; } counter.incrementAndGet(); _threadFacade.submit(new TasksyncSubmit方法() { @Override public String getName() { return "sync-chain-thread"; } // start a new thread every time to avoid stack overflow @AsyncThread private void runQueue() { ChainFuture cf; synchronized (chainTasks) { // remove from pending queue and add to running queue later cf = (ChainFuture) pendingQueue.poll(); if (cf == null) { if (counter.decrementAndGet() == 0) { //并且線程只有一個(gè)(跑完就沒了),則將相關(guān)的signature隊(duì)列移除,避免占用內(nèi)存 chainTasks.remove(syncSignature); } //如果為空,則沒有任務(wù),返回 return; } } synchronized (runningQueue) { // add to running queue runningQueue.offer(cf); } //完成以后將任務(wù)挪出運(yùn)行隊(duì)列 cf.run(new SyncTaskChain() { @Override public void next() { synchronized (runningQueue) { runningQueue.remove(cf); } runQueue(); } }); } //這個(gè)方法將會(huì)被線程池調(diào)用,作為入口 @Override public Void call() throws Exception { runQueue(); return null; } }); } }
syncSubmit的內(nèi)部邏輯與我們之前分析的chainSubmit極為相似,只是放入了不同的隊(duì)列中。
同樣,也是從ThreadFacadeImpl作為入口
@Override publicFuture syncSubmit(SyncTask task) { return dpq.syncSubmit(task); }
然后是DispatchQueue中的實(shí)現(xiàn)
@Override publicFuture syncSubmit(SyncTask task) { if (task.getSyncLevel() <= 0) { return _threadFacade.submit(task); } else { return doSyncSubmit(task); } }
內(nèi)部邏輯-私有方法
privatesubmitPeriodicTaskFuture doSyncSubmit(final SyncTask syncTask) { assert syncTask.getSyncSignature() != null : "How can you submit a sync task without sync signature ???"; SyncTaskFuture f; synchronized (syncTasks) { SyncTaskQueueWrapper wrapper = syncTasks.get(syncTask.getSyncSignature()); if (wrapper == null) { wrapper = new SyncTaskQueueWrapper(); //放入syncTasks隊(duì)列。 syncTasks.put(syncTask.getSyncSignature(), wrapper); } f = new SyncTaskFuture(syncTask); wrapper.addTask(f); wrapper.startThreadIfNeeded(); } return f; }
提交一個(gè)定時(shí)任務(wù)本質(zhì)上是通過了線程池的scheduleAtFixedRate來(lái)實(shí)現(xiàn)。這個(gè)方法用于對(duì)任務(wù)進(jìn)行周期性調(diào)度,任務(wù)調(diào)度的頻率是一定的,它以上一個(gè)任務(wù)開始執(zhí)行時(shí)間為起點(diǎn),之后的period時(shí)間后調(diào)度下一次任務(wù)。如果任務(wù)的執(zhí)行時(shí)間大于調(diào)度時(shí)間,那么任務(wù)就會(huì)在上一個(gè)任務(wù)結(jié)束后,立即被調(diào)用。
調(diào)用這個(gè)方法時(shí)將會(huì)把任務(wù)放入定時(shí)任務(wù)隊(duì)列。當(dāng)任務(wù)出現(xiàn)異常時(shí),將會(huì)取消這個(gè)Futrue,并且挪出隊(duì)列。
public FuturesubmitCancelablePeriodicTasksubmitPeriodicTask(final PeriodicTask task, long delay) { assert task.getInterval() != 0; assert task.getTimeUnit() != null; ScheduledFuture ret = (ScheduledFuture ) _pool.scheduleAtFixedRate(new Runnable() { public void run() { try { task.run(); } catch (Throwable e) { _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e); final Map > periodicTasks = getPeriodicTasks(); final ScheduledFuture> ft = periodicTasks.get(task); if (ft != null) { ft.cancel(true); periodicTasks.remove(task); } else { _logger.warn("Not found feature for task " + task.getName() + ", the exception happened too soon, will try to cancel the task next time the exception happens"); } } } }, delay, task.getInterval(), task.getTimeUnit()); _periodicTasks.put(task, ret); return ret; }
而submitCancelablePeriodicTask則是會(huì)在執(zhí)行時(shí)檢測(cè)ScheduledFuture是否被要求cancel,如果有要求則取消。
@Override public Future初始化操作submitCancelablePeriodicTask(final CancelablePeriodicTask task, long delay) { ScheduledFuture ret = (ScheduledFuture ) _pool.scheduleAtFixedRate(new Runnable() { private void cancelTask() { ScheduledFuture> ft = cancelablePeriodicTasks.get(task); if (ft != null) { ft.cancel(true); cancelablePeriodicTasks.remove(task); } else { _logger.warn("cannot find feature for task " + task.getName() + ", the exception happened too soon, will try to cancel the task next time the exception happens"); } } public void run() { try { boolean cancel = task.run(); if (cancel) { cancelTask(); } } catch (Throwable e) { _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e); cancelTask(); } } }, delay, task.getInterval(), task.getTimeUnit()); cancelablePeriodicTasks.put(task, ret); return ret; }
不同與通常的ZStack組件,它雖然實(shí)現(xiàn)了Component接口。但是其start中的邏輯并不全面,初始化邏輯是基于spring bean的生命周期來(lái)做的。見ThreadFacade。
再讓回頭看看ThreadFacadeImpl的init與destory操作。
//init 操作 public void init() { //根據(jù)全局配置讀入線程池最大線程數(shù)量 totalThreadNum = ThreadGlobalProperty.MAX_THREAD_NUM; if (totalThreadNum < 10) { _logger.warn(String.format("ThreadFacade.maxThreadNum is configured to %s, which is too small for running zstack. Change it to 10", ThreadGlobalProperty.MAX_THREAD_NUM)); totalThreadNum = 10; } // 構(gòu)建一個(gè)支持延時(shí)任務(wù)的線程池 _pool = new ScheduledThreadPoolExecutorExt(totalThreadNum, this, this); _logger.debug(String.format("create ThreadFacade with max thread number:%s", totalThreadNum)); //構(gòu)建一個(gè)DispatchQueue dpq = new DispatchQueueImpl(); jmxf.registerBean("ThreadFacade", this); }
//destory public void destroy() { _pool.shutdownNow(); }
看了這里可能大家會(huì)有疑問,這種關(guān)閉方式未免關(guān)于暴力(執(zhí)行任務(wù)的線程會(huì)全部被中斷)。在此之前,我們?cè)岬竭^,它實(shí)現(xiàn)了Component接口。這個(gè)接口分別有一個(gè)start和stop方法,使一個(gè)組件的生命周期能夠方便的在ZStack中注冊(cè)相應(yīng)的鉤子。
//stop 方法 @Override public boolean stop() { _pool.shutdown(); timerPool.stop(); return true; }線程工廠
ThreadFacadeImpl同時(shí)也實(shí)現(xiàn)了ThreadFactory,可以讓線程在創(chuàng)建時(shí)做一些操作。
@Override public Thread newThread(Runnable arg0) { return new Thread(arg0, "zs-thread-" + String.valueOf(seqNum.getAndIncrement())); }
在這里可以看到ZStack為每一個(gè)新的線程賦予了一個(gè)名字。
線程池ZStack對(duì)JDK中的線程池進(jìn)行了一定的擴(kuò)展,對(duì)一個(gè)任務(wù)執(zhí)行前后都有相應(yīng)的鉤子函數(shù),同時(shí)也開放注冊(cè)鉤子。
package org.zstack.core.thread; import org.apache.logging.log4j.ThreadContext; import org.zstack.utils.logging.CLogger; import org.zstack.utils.logging.CLoggerImpl; import java.util.ArrayList; import java.util.List; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; public class ScheduledThreadPoolExecutorExt extends ScheduledThreadPoolExecutor { private static final CLogger _logger =CLoggerImpl.getLogger(ScheduledThreadPoolExecutorExt.class); List_hooks = new ArrayList (8); public ScheduledThreadPoolExecutorExt(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, threadFactory, handler); this.setMaximumPoolSize(corePoolSize); } public void registerHook(ThreadAroundHook hook) { synchronized (_hooks) { _hooks.add(hook); } } public void unregisterHook(ThreadAroundHook hook) { synchronized (_hooks) { _hooks.remove(hook); } } @Override protected void beforeExecute(Thread t, Runnable r) { ThreadContext.clearMap(); ThreadContext.clearStack(); ThreadAroundHook debugHook = null; List tmpHooks; synchronized (_hooks) { tmpHooks = new ArrayList (_hooks); } for (ThreadAroundHook hook : tmpHooks) { debugHook = hook; try { hook.beforeExecute(t, r); } catch (Exception e) { _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e); } } } @Override protected void afterExecute(Runnable r, Throwable t) { ThreadContext.clearMap(); ThreadContext.clearStack(); ThreadAroundHook debugHook = null; List tmpHooks; synchronized (_hooks) { tmpHooks = new ArrayList (_hooks); } for (ThreadAroundHook hook : tmpHooks) { debugHook = hook; try { hook.afterExecute(r, t); } catch (Exception e) { _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e); } } } }
另外,ScheduledThreadPoolExecutorExt是繼承自ScheduledThreadPoolExecutor。本質(zhì)上是一個(gè)任務(wù)調(diào)度線程池,用的工作隊(duì)列也是一個(gè)延時(shí)工作隊(duì)列。
小結(jié)本文分析了ZStack的久經(jīng)生產(chǎn)考驗(yàn)的核心組件——線程池。通過線程池,使并行編程變得不再那么復(fù)雜。
當(dāng)然,其中也有一些可以改進(jìn)的地方:
一些加鎖的地方(synchronized),可以通過使用并發(fā)容器解決。這樣可以有效提升吞吐量,節(jié)省因?yàn)楦?jìng)爭(zhēng)鎖而導(dǎo)致的開銷。
在提交大量任務(wù)的情況下,HashMap會(huì)因?yàn)閿U(kuò)容而導(dǎo)致性能耗損。可以考慮換一種Map或在不同的策略下使HashMap的初始大小有個(gè)較為合理的設(shè)置。
隊(duì)列是無(wú)界的。在大量任務(wù)請(qǐng)求時(shí),會(huì)對(duì)內(nèi)存造成極大的負(fù)擔(dān)。
任務(wù)隊(duì)列無(wú)超時(shí)邏輯判斷。ZStack中的調(diào)用絕大多數(shù)都是由MQ完成,每一個(gè)msg有著對(duì)應(yīng)的超時(shí)時(shí)間。但是每一個(gè)任務(wù)卻沒有超時(shí)判定,這意味著一個(gè)任務(wù)執(zhí)行時(shí)間過長(zhǎng)時(shí),后面的任務(wù)有可能進(jìn)入了超時(shí)狀態(tài),而卻沒有挪出隊(duì)列,配合之前提到的無(wú)界隊(duì)列,就是一場(chǎng)潛在的災(zāi)難。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/70783.html
摘要:本文首發(fā)于泊浮目的專欄在語(yǔ)言中,有一個(gè)關(guān)鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時(shí)執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來(lái)自系統(tǒng)啟動(dòng)時(shí)利用反射所做的一個(gè)行為。因此并不會(huì)影響使用時(shí)的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語(yǔ)言中,有一個(gè)關(guān)鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...
摘要:本文首發(fā)于泊浮目的專欄在語(yǔ)言中,有一個(gè)關(guān)鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時(shí)執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來(lái)自系統(tǒng)啟動(dòng)時(shí)利用反射所做的一個(gè)行為。因此并不會(huì)影響使用時(shí)的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語(yǔ)言中,有一個(gè)關(guān)鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...
摘要:因?yàn)檫@個(gè)狀態(tài)下,是交給一個(gè)線程在執(zhí)行的,見源碼剖析之核心庫(kù)鑒賞中的分析。并且允許等行為。上面提到過,允許運(yùn)行暫停取消等行為。維護(hù)和相應(yīng)的之間的關(guān)系。則停止執(zhí)行并觸發(fā)之前的所有。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack中,當(dāng)用戶在UI上發(fā)起操作時(shí),前端會(huì)調(diào)用后端的API對(duì)實(shí)際的資源發(fā)起操作請(qǐng)求。但在一個(gè)分布式系統(tǒng)中...
摘要:但在實(shí)際的二次開發(fā)中,這些做法未必能夠完全滿足需求。在源碼剖析之核心庫(kù)鑒賞一文中,我們了解到是的基礎(chǔ)設(shè)施之一,同時(shí)也允許通過顯示聲明的方式來(lái)聲明。同理,一些也可以使用繼承進(jìn)行擴(kuò)展。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack博文-5.通用插件系統(tǒng)中,官方提出了幾個(gè)較為經(jīng)典的擴(kuò)展方式。但在實(shí)際的二次開發(fā)中,這些做法未必...
摘要:下面將開始分析它的源碼。僅僅定義了一個(gè)最小應(yīng)有的行為。更好的選擇由于該庫(kù)是為定制而生,故此有一些防御性判斷,源碼顯得略為。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack(或者說產(chǎn)品化的IaaS軟件)中的任務(wù)通常有很長(zhǎng)的執(zhí)行路徑,錯(cuò)誤可能發(fā)生在路徑的任意一處。為了保證系統(tǒng)的正確性,需提供一種較為完善的回滾機(jī)制——在ZSt...
閱讀 1452·2019-08-29 17:14
閱讀 1653·2019-08-29 12:12
閱讀 733·2019-08-29 11:33
閱讀 3270·2019-08-28 18:27
閱讀 1446·2019-08-26 10:19
閱讀 910·2019-08-23 18:18
閱讀 3532·2019-08-23 16:15
閱讀 2545·2019-08-23 14:14