摘要:因為這個狀態下,是交給一個線程在執行的,見源碼剖析之核心庫鑒賞中的分析。并且允許等行為。上面提到過,允許運行暫停取消等行為。維護和相應的之間的關系。則停止執行并觸發之前的所有。
本文首發于泊浮目的專欄:https://segmentfault.com/blog...前言
在ZStack中,當用戶在UI上發起操作時,前端會調用后端的API對實際的資源發起操作請求。但在一個分布式系統中,我們不能假設網絡是可靠的(同樣要面對的還有單點故障等)——這往往導致API會超時。ZStack有默認的API超時機制,為30mins。但從UI上看來,用戶的體驗不是很好,如下:
如果API遇到什么情況而一直沒有響應,在這里用戶也只能默默等到其超時。因為這個狀態下,API是交給一個線程在執行的,見ZStack源碼剖析之核心庫鑒賞——ThreadFacade中的分析
。最可怕的是,由于隊列的存在,對該資源操作的API將全部處于隊列中而成為等待狀態。
在ZStack 2.3版本開始引入了一個新的概念——LongJob。這基于ZStack的原有設計——FlowChain(我在我的博客中詳細分析過FlowChain,如果不懂的小伙伴可以點這里),依靠FlowChain,我們把業務邏輯拆成一個個個Flow,并設置對應的RollBack。為了避免之后講起來有點迷,先解釋一下技術名詞。
LongJob的狀態是用于被APIQuery的,也提供了進度條。并且允許start、stop、cancel等行為。
名詞 LongJob長任務。以API可操作的概念具現。上面提到過,允許運行、暫停、取消等行為。
LongJobInstance長任務實例。每個作業執行時,都會生成一個實例,實例會存放在LongJobVO這個數據庫表中。便于UI調用API查看各個LongJobInstance的狀態。
Flow最小的一個業務單元。LongJob的組成,前面說過,LongJob基于FlowChain。
LongJob ParametersLongJob參數。用于提交LongJob的參數,不同的參數可以區分不同的Job。
數據結構 LongJobVO@Entity @Table public class LongJobVO extends ResourceVO { @Column private String name; @Column private String description; @Column private String apiId; @Column private String jobName; @Column private String jobData; @Column private String jobResult; @Column @Enumerated(EnumType.STRING) private LongJobState state; @Column private String targetResourceUuid; @Column @ForeignKey(parentEntityClass = ManagementNodeVO.class, onDeleteAction = ForeignKey.ReferenceOption.SET_NULL) private String managementNodeUuid; @Column private Timestamp createDate; @Column private Timestamp lastOpDate; //忽略get set方法 }
該數據結構描述了如下關鍵信息:
targeResourceUuid - 用以描述 job 針對的資源,對于分類查找比較有用。通過 resourceUuid 可以在 ResourceVO 里找到類型。
apiId - 用以查詢該 job 在 TaskProgressVO 中的進度信息。
jobName - 執行該 job 的 class 名字。參見下面的 JobExecution (類似現有的 AbstractSchedulerJob)
jobData - 存放執行該 job 需要的額外參數信息。
LongJobpublic interface LongJob { void start(LongJobVO job, Completion completion); void cancel(LongJobVO job, Completion completion); }
所有LongJob都必須實現該接口,并實現start/cancel等方法。
LongJobFor@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface LongJobFor { Class> value(); }
為具體的LongJob增加該注解,表示該LongJob針對哪個APIMessage。
比如為BackupStorageMigrateImageJob增加注解:@LongJobFor(APIBackupStorageMigrateImageMsg.class)
LongJobDatainterface LongJobData { }
由于LongJob要復用現有邏輯以及保證可維護性,這里處理的代碼和原先邏輯為同一處。handleApiMessage和handleLongJobMessage必須要將所有的參數抽出來再傳到共用的邏輯層。不僅如此,之后定時任務也有可能做成LongJob,故此定義這個接口。
LongJobMessageDatapublic class LongJobMessageData implements LongJobData { protected final NeedReplyMessage needReplyMessage; public LongJobMessageData(NeedReplyMessage msg){ this.needReplyMessage = msg; } public NeedReplyMessage getNeedReplyMessage() { return needReplyMessage; } }
該接口實現了LongJobData(這里LongJobData僅僅用于標識一個類型),用于完成目前的需求——部分LongJob Feature來自于APIMessage的改進。而InnerMessage和APIMessage都繼承于NeedReplyMessage,為加強代碼可讀性,將公用數據結構抽取了出來,方便調用。
LongJobFactory根據jobName獲取LongJob實例。
比如當jobName為APIBackupStorageMigrateImageMsg時,獲取BackupStorageMigrateImageJob實例。
LongJobManager用以處理 Job 相關的 API,比如 APICancelJobMsg,APIRestartJobMsg 等等。維護 jobUuid 和相應的 CancellableSharedFlowChain 之間的關系。
CancellableShareFlowChain繼承 ShareFlowChain,實現 Cancellable。每個 Job 底層邏輯都必須用 CancellableSharedFlowChain 實現。
詳解 LongJob相關的API
在圖中我們可以看到LongJob提供了幾個API,較為重要的是QueryAPI——用戶可以使用它來查詢LongJob的一個進度狀態。
LongJob則是基于FlowChain之上擴展的,首先,每個LongJob調用與原有APIMessage行為相同的內部Message。我們以APIAddImageMsg為例,看一下它的邏輯。
在這里,我們可以看到Msg們將其的參數都Copy到了相應的LongJobData中,并進行傳參,進入了一個統一的入口。這樣便于邏輯的維護,免于和原有的API handle處分為兩段邏輯。
再看調用實例那么是如何調用的呢?按照老規矩,我們來看一個TestCase——AddImageLongJobCase:
void testAddImage() { int oldSize = Q.New(ImageVO.class).list().size() int flag = 0 myDescription = "my-test" env.afterSimulator(SftpBackupStorageConstant.DOWNLOAD_IMAGE_PATH) { Object response -> //DownloadImageMsg LongJobVO vo = Q.New(LongJobVO.class).eq(LongJobVO_.description, myDescription).find() assert vo.state == LongJobState.Running flag += 1 return response } APIAddImageMsg msg = new APIAddImageMsg() msg.setName("TinyLinux") msg.setBackupStorageUuids(Collections.singletonList(bs.uuid)) msg.setUrl("http://192.168.1.20/share/images/tinylinux.qcow2") msg.setFormat(ImageConstant.QCOW2_FORMAT_STRING) msg.setMediaType(ImageConstant.ImageMediaType.RootVolumeTemplate.toString()) msg.setPlatform(ImagePlatform.Linux.toString()) LongJobInventory jobInv = submitLongJob { sessionId = adminSession() jobName = "APIAddImageMsg" jobData = gson.toJson(msg) description = myDescription } as LongJobInventory assert jobInv.getJobName() == "APIAddImageMsg" assert jobInv.state == org.zstack.sdk.LongJobState.Running retryInSecs() { LongJobVO job = dbFindByUuid(jobInv.getUuid(), LongJobVO.class) assert job.state == LongJobState.Succeeded } int newSize = Q.New(ImageVO.class).count().intValue() assert newSize > oldSize assert 1 == flag }
可以看到本質是將原來的APIMsg轉為字符串作為LongJob的Data傳入,調用起來很方便。
實現再來看看它的實現,當APISubmitLongJobMsg被發送出去后,handle的地方做了什么呢?見LongJobManagerImpl
private void handle(APISubmitLongJobMsg msg) { // create LongJobVO LongJobVO vo = new LongJobVO(); if (msg.getResourceUuid() != null) { vo.setUuid(msg.getResourceUuid()); } else { vo.setUuid(Platform.getUuid()); } if (msg.getName() != null) { vo.setName(msg.getName()); } else { vo.setName(msg.getJobName()); } vo.setDescription(msg.getDescription()); vo.setApiId(msg.getId()); vo.setJobName(msg.getJobName()); vo.setJobData(msg.getJobData()); vo.setState(LongJobState.Waiting); vo.setTargetResourceUuid(msg.getTargetResourceUuid()); vo.setManagementNodeUuid(Platform.getManagementServerId()); vo = dbf.persistAndRefresh(vo); logger.info(String.format("new longjob [uuid:%s, name:%s] has been created", vo.getUuid(), vo.getName())); tagMgr.createTagsFromAPICreateMessage(msg, vo.getUuid(), LongJobVO.class.getSimpleName()); acntMgr.createAccountResourceRef(msg.getSession().getAccountUuid(), vo.getUuid(), LongJobVO.class); msg.setJobUuid(vo.getUuid()); // wait in line thdf.chainSubmit(new ChainTask(msg) { @Override public String getSyncSignature() { return "longjob-" + msg.getJobUuid(); } @Override public void run(SyncTaskChain chain) { APISubmitLongJobEvent evt = new APISubmitLongJobEvent(msg.getId()); LongJobVO vo = dbf.findByUuid(msg.getJobUuid(), LongJobVO.class); vo.setState(LongJobState.Running); vo = dbf.updateAndRefresh(vo); // launch the long job right now ThreadContext.put(Constants.THREAD_CONTEXT_API, vo.getApiId()); ThreadContext.put(Constants.THREAD_CONTEXT_TASK_NAME, vo.getJobName()); LongJob job = longJobFactory.getLongJob(vo.getJobName()); job.start(vo, new Completion(msg) { LongJobVO vo = dbf.findByUuid(msg.getJobUuid(), LongJobVO.class); @Override public void success() { vo.setState(LongJobState.Succeeded); vo.setJobResult("Succeeded"); dbf.update(vo); logger.info(String.format("successfully run longjob [uuid:%s, name:%s]", vo.getUuid(), vo.getName())); } @Override public void fail(ErrorCode errorCode) { vo.setState(LongJobState.Failed); vo.setJobResult("Failed : " + errorCode.toString()); dbf.update(vo); logger.info(String.format("failed to run longjob [uuid:%s, name:%s]", vo.getUuid(), vo.getName())); } }); evt.setInventory(LongJobInventory.valueOf(vo)); logger.info(String.format("longjob [uuid:%s, name:%s] has been started", vo.getUuid(), vo.getName())); bus.publish(evt); chain.next(); } @Override public String getName() { return getSyncSignature(); } }); }
這段邏輯大致為:
創建一個LongJob記錄,以及相關的SystemTag和賬戶資源管理引用
提交至線程池。使用LongJobFactory獲取一個LongJob實例。并執行LongJob對應實現的start,在合適的時機進行狀態變化。
LongJobFactorypublic class LongJobFactoryImpl implements LongJobFactory, Component { private static final CLogger logger = Utils.getLogger(LongJobFactoryImpl.class); /** * Key:LongJobName */ private TreeMapallLongJob = new TreeMap<>(); @Override public LongJob getLongJob(String jobName) { LongJob job = allLongJob.get(jobName); if (null == job) { throw new OperationFailureException(operr("%s has no corresponding longjob", jobName)); } return job; } @Override public boolean start() { LongJob job = null; List longJobClasses = BeanUtils.scanClass("org.zstack", LongJobFor.class); for (Class it : longJobClasses) { LongJobFor at = (LongJobFor) it.getAnnotation(LongJobFor.class); try { job = (LongJob) it.newInstance(); } catch (InstantiationException | IllegalAccessException e) { e.printStackTrace(); } if (null == job) { logger.warn(String.format("[LongJob] class name [%s] but get LongJob instance is null ", at.getClass().getSimpleName())); continue; } logger.debug(String.format("[LongJob] collect class [%s]", job.getClass().getSimpleName())); allLongJob.put(at.value().getSimpleName(), job); } return true; } @Override public boolean stop() { allLongJob.clear(); return true; } }
該FactoryImpl繼承了Component接口。在ZStack Start的時候會利用反射收集帶有LongJobFor這個Annotation的Class。在原先的版本中則是每一次調用的時候利用反射去尋找,會造成一個不必要的開銷。故此這里也是做了一個Cache般的改進,因為在Application起來后是不會動態的去添加一種LongJob的。
回來,還是以AddImageLongJob為例,我們來看看start時會做什么,見AddImageLongJob:
package org.zstack.image; import org.springframework.beans.factory.annotation.Autowire; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Configurable; import org.zstack.core.Platform; import org.zstack.core.cloudbus.CloudBus; import org.zstack.core.cloudbus.CloudBusCallBack; import org.zstack.core.db.DatabaseFacade; import org.zstack.header.core.Completion; import org.zstack.header.image.APIAddImageMsg; import org.zstack.header.image.AddImageMsg; import org.zstack.header.image.ImageConstant; import org.zstack.header.longjob.LongJobFor; import org.zstack.header.longjob.LongJobVO; import org.zstack.header.message.MessageReply; import org.zstack.longjob.LongJob; import org.zstack.utils.gson.JSONObjectUtil; /** * Created by on camile 2018/2/2. */ @LongJobFor(APIAddImageMsg.class) @Configurable(preConstruction = true, autowire = Autowire.BY_TYPE) public class AddImageLongJob implements LongJob { @Autowired protected CloudBus bus; @Autowired protected DatabaseFacade dbf; @Override public void start(LongJobVO job, Completion completion) { AddImageMsg msg = JSONObjectUtil.toObject(job.getJobData(), AddImageMsg.class); bus.makeLocalServiceId(msg, ImageConstant.SERVICE_ID); bus.send(msg, new CloudBusCallBack(null) { @Override public void run(MessageReply reply) { if (reply.isSuccess()) { completion.success(); } else { completion.fail(reply.getError()); } } }); } @Override public void cancel(LongJobVO job, Completion completion) { // TODO completion.fail(Platform.operr("not supported")); } }
這里則是發送了一個inner msg出去,我們看一下handle處的邏輯:
private void handle(AddImageMsg msg) { AddImageReply evt = new AddImageReply(); AddImageLongJobData data = new AddImageLongJobData(msg); BeanUtils.copyProperties(msg, data); handleAddImageMsg(data, evt); }
可以看到這里將msg的參數全部取了出來,放入一個公共結構里,并傳入了真正的handle處。
APIAddImageMsg也是這么做的:
private void handle(final APIAddImageMsg msg) { APIAddImageEvent evt = new APIAddImageEvent(msg.getId()); AddImageLongJobData data = new AddImageLongJobData(msg); BeanUtils.copyProperties(msg, data); handleAddImageMsg(data, evt); }
在前面提到過,為了更好的可維護性,這兩個Msg共用了一段邏輯。
復用Intercepter了解ZStack的同學都知道,任何一條APIMsg發送的時候會進入Intercepter。那么LongJob的submit其實是把APIMsg作為參數傳入了,那么如何復用之前的Intercepter呢?我們來看看LongJobApiInterceptor
public class LongJobApiInterceptor implements ApiMessageInterceptor, Component { private static final CLogger logger = Utils.getLogger(LongJobApiInterceptor.class); /** * Key:LongJobName */ private TreeMap> apiMsgOfLongJob = new TreeMap<>(); @Override public APIMessage intercept(APIMessage msg) throws ApiMessageInterceptionException { if (msg instanceof APISubmitLongJobMsg) { validate((APISubmitLongJobMsg) msg); } else if (msg instanceof APICancelLongJobMsg) { validate((APICancelLongJobMsg) msg); } else if (msg instanceof APIDeleteLongJobMsg) { validate((APIDeleteLongJobMsg) msg); } return msg; } private void validate(APISubmitLongJobMsg msg) { Class apiClass = apiMsgOfLongJob.get(msg.getJobName()); if (null == apiClass) { throw new ApiMessageInterceptionException(argerr("%s is not an API", msg.getJobName())); } // validate msg.jobData Map config = new HashMap<>(); List serviceConfigFolders = new ArrayList<>(); serviceConfigFolders.add("serviceConfig"); config.put("serviceConfigFolders", serviceConfigFolders); ApiMessageProcessor processor = new ApiMessageProcessorImpl(config); APIMessage jobMsg = JSONObjectUtil.toObject(msg.getJobData(), apiClass); jobMsg.setSession(msg.getSession()); jobMsg = processor.process(jobMsg); // may throw ApiMessageInterceptionException msg.setJobData(JSONObjectUtil.toJsonString(jobMsg)); // msg may be changed during validation } private void validate(APICancelLongJobMsg msg) { LongJobState state = Q.New(LongJobVO.class) .select(LongJobVO_.state) .eq(LongJobVO_.uuid, msg.getUuid()) .findValue(); if (state == LongJobState.Succeeded) { throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is succeeded")); } if (state == LongJobState.Canceled) { throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is already canceled")); } if (state == LongJobState.Failed) { throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is failed")); } } private void validate(APIDeleteLongJobMsg msg) { LongJobState state = Q.New(LongJobVO.class) .select(LongJobVO_.state) .eq(LongJobVO_.uuid, msg.getUuid()) .findValue(); if (state != LongJobState.Succeeded && state != LongJobState.Canceled && state != LongJobState.Failed) { throw new ApiMessageInterceptionException(argerr("delete longjob only when it"s succeeded, canceled, or failed")); } } @Override public boolean start() { Class apiClass = null; List longJobClasses = BeanUtils.scanClass("org.zstack", LongJobFor.class); for (Class it : longJobClasses) { LongJobFor at = (LongJobFor) it.getAnnotation(LongJobFor.class); try { apiClass = (Class ) Class.forName(at.value().getName()); } catch (ClassNotFoundException | ClassCastException e) { //ApiMessage and LongJob are not one by one corresponding ,so we skip it e.printStackTrace(); continue; } logger.debug(String.format("[LongJob] collect api class [%s]", apiClass.getSimpleName())); apiMsgOfLongJob.put(at.value().getSimpleName(), apiClass); } return true; } @Override public boolean stop() { apiMsgOfLongJob.clear(); return true; } }
邏輯很簡單,通過LongJob的name找出了對應的APIMsg,并將APIMsg發向了對應Intercepter。
在查找APIMsg這一步也是采用了Cache的思想,在Start的時候就進行了收集。
展望在前面的定義中,我們提到了LongJob是允許暫停和取消行為的。這在接口中也可以看到類似的期許:
public interface LongJob { void start(LongJobVO job, Completion completion); void cancel(LongJobVO job, Completion completion); }
那么該如何實現它呢?在這里我們僅僅做一個展望,到時還是以釋放出來的代碼為準。
Stop首先,在CancellableSharedFlowChain 定義一個必須被實現的接口。如`stop
Condition`,返回一個boolean。在每個Flow執行前會判斷該boolean是否為true,如果為true。則保存context到db,并停止執行。
同樣,也是在CancellableSharedFlowChain 定義一個必須被實現的接口。如cancelCondition,返回一個boolean。在每個Flow執行前會判斷該boolean是否為true,如果為true。則停止執行并觸發之前的所有rollback。
Rollback的特殊技巧那么可能會有同學問了,在這樣的設計下,如果發生了如斷電的情況,必然導致無法Rollback。這種情況如果發生在一個數據中心,可以說是災難也不為過。但是我們可以考慮一下如何實現更具有原子性Rollback。
淺談數據庫事務的實現數據庫的事務主要是通過Undo日志來實現。在一條記錄更新前(更新到硬盤),一定要把相關的Undo日志寫入硬盤;而“提交事務”這種記錄,要在記錄更新完畢后再寫入硬盤。所謂的Undo日志,就是沒有操作前的日志。如果同學們聽完還是覺得有點迷,可以看這篇文章:
碼農翻身:愛炫耀的數據庫老頭兒
可以考慮的方案在了解了數據庫事務的實現后,我們可以大致設計出一種方案,用于保證斷電后Rollback的完整性:
在一個FlowChain執行前,在DB里存入一個類似Start FlowChain的標記
定義每一個Flow的Number號,如第一個Flow為1。在Flow執行前,記錄當前Flow Number到數據庫,寫Flow1開始執行。Flow執行完之前,寫Flow1執行完畢。
Flow執行完了,在DB里存入一個類似Done FlowChian的標記。這里我們把Done的那部分也看做一個Flow。
那么在任何以步驟出問題的時候,基本都可以完成一個Rollback。我們來看一看:
還沒執行Flow的時候斷電DB中的記錄為Start FlowChain,那么是不需要Rollback的。
執行一個Flow的時候斷電DB中的最新記錄為Flow1開始執行的話,不需要Rollback。這種分布式場景下如果需要做到強一致性,只能對每行代碼做類似Undo日志的記錄了。
但是如果記錄為Flow1執行完畢,開始Rollback。
之后執行幾個Flow都是參考這里的一個做法。
小結在本文中,筆者和大家了解了ZStack在2.3引入的新模塊——LongJob。并對其的出現的背景、解決的痛點和實現進行了分析,最后展望了一下接下來版本中可能會增強的功能。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/68745.html
摘要:但在實際的二次開發中,這些做法未必能夠完全滿足需求。在源碼剖析之核心庫鑒賞一文中,我們了解到是的基礎設施之一,同時也允許通過顯示聲明的方式來聲明。同理,一些也可以使用繼承進行擴展。 本文首發于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack博文-5.通用插件系統中,官方提出了幾個較為經典的擴展方式。但在實際的二次開發中,這些做法未必...
摘要:但新增模塊的結構卻還是大致相同,此即是的經典設計模式這套模式也被開發者稱為三駕馬車。領域層定義負責表達業務概念,業務狀態信息以及業務規則。 本文首發于泊浮目的專欄:https://segmentfault.com/blog... 前言 隨著ZStack的版本迭代,其可以掌管的資源也越來越多。但新增模塊的結構卻還是大致相同,此即是ZStack的經典設計模式——這套模式也被開發者稱為ZS...
摘要:本文首發于泊浮目的專欄在語言中,有一個關鍵字叫做其作用是在函數前執行。一般有兩種用法在該函數拋出異常時執行。在該函數返回前執行。這里的放入來自系統啟動時利用反射所做的一個行為。因此并不會影響使用時的性能。 本文首發于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個關鍵字叫做defer——其作用是在函數return前執行。在ZStac...
摘要:本文首發于泊浮目的專欄在語言中,有一個關鍵字叫做其作用是在函數前執行。一般有兩種用法在該函數拋出異常時執行。在該函數返回前執行。這里的放入來自系統啟動時利用反射所做的一個行為。因此并不會影響使用時的性能。 本文首發于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個關鍵字叫做defer——其作用是在函數return前執行。在ZStac...
摘要:能夠整體地替換算法,能讓我們輕松地以不同的算法去解決一個問題,這種模式就是模式。這個類是在發布前常在中被使用的一個類,代碼如下以為例,從語義上來說就是為了中的每個元素調用函數。 本文首發于泊浮目的專欄:https://segmentfault.com/blog... 前言 無論什么程序,其目的都是解決問題。而為了解決問題,我們又需要編寫特定的算法。使用Strategy模式可以整體地替...
閱讀 856·2023-04-25 21:21
閱讀 3237·2021-11-24 09:39
閱讀 3082·2021-09-02 15:41
閱讀 2009·2021-08-26 14:13
閱讀 1840·2019-08-30 11:18
閱讀 2791·2019-08-29 16:25
閱讀 517·2019-08-28 18:27
閱讀 1590·2019-08-28 18:17