摘要:下面就來講講第一個初始化操作拓撲分配。如果沒有舊的分配信息,說明拓撲分配類型為。到這里,預分配,創(chuàng)建拓撲分配上下文就完成了。集群下的分配,見下文講解資源準備首先第一步是判斷拓撲分配的類型是否符合要求,不符合則拋出異常。
??寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關注或者收藏,轉發(fā)請先私信我,謝謝。對了,筆者看的是2.2.1這個版本。上一篇博客,JStorm源碼分析系列--01--Nimbus啟動分析筆者講解了Nimbus啟動過程中做的一些基本的操作,在initFollowerThread方法中,如果當前的Nimbus變成Leader之后,這個方法內會負責執(zhí)行一些初始化init操作。下面就來講講第一個初始化操作--拓撲分配。本文將詳細(非常長,所以慢慢看)的講解如何去為一個拓撲分配相應的資源。
??從方法initTopologyAssign開始,TopologyAssign是一個單例對象,在這個類的init方法內,做了簡單的賦值操作之后,并初始化一個調度器實例對象之后,就建立一個守護線程,這個守護線程的目的是不斷從TopologyAssign內部維護的一個阻塞隊列中讀取系統(tǒng)提交的拓撲任務,并調用相應的方法doTopologyAssignment進行分配操作。代碼都比較簡單,就不浪費版面去貼了。
??下面是doTopologyAssignment方法的源碼,
protected boolean doTopologyAssignment(TopologyAssignEvent event) { Assignment assignment; try { Assignment oldAssignment = null; boolean isReassign = event.isScratch(); if (isReassign) { //如果存在舊的分配信息,需要先將舊的分配信息存儲下來 oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null); } //調用方法執(zhí)行新的分配 assignment = mkAssignment(event); //將task添加到集群的metrics中 pushTaskStartEvent(oldAssignment, assignment, event); if (!isReassign) { //如果是新建的拓撲,需要把拓撲設置為active狀態(tài) setTopologyStatus(event); } } catch (Throwable e) { LOG.error("Failed to assign topology " + event.getTopologyId(), e); event.fail(e.getMessage()); return false; } if (assignment != null) //將拓撲備份到ZK上 backupAssignment(assignment, event); event.done(); return true; }
??所以,最重要的方法還是mkAssignment,這里執(zhí)行了實際的分配操作。下面就來詳細的介紹這個方法。
prepareTopologyAssign??prepareTopologyAssign這個方法總體的目的為了初始化拓撲分配的上下文信息,生成一個TopologyAssignContext的實例對象。這個上下文對象需要存下拓撲的很多關鍵信息,包括拓撲的組件信息(用StormTopology對象保存,下文在添加acker的時候會詳細介紹這個類),拓撲的配置信息,拓撲上所有的task id,以及死掉的task id,unstopped task id(這里的解釋是,那些supervisor死掉但是worker還繼續(xù)運行的稱為unstopworker,而包含在unstopworker內的task則稱為unstoppedTask)。以及這個拓撲能分配到的worker,以上提及的這些信息都會在這個方法內慢慢的初始化。下面一步步來看吧。prepareTopologyAssign方法的源碼比較長,一部分一部分來講解。
//創(chuàng)建一個上下文的實例對象 TopologyAssignContext ret = new TopologyAssignContext(); String topologyId = event.getTopologyId(); ret.setTopologyId(topologyId); int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId(); ret.setTopologyMasterTaskId(topoMasterId); LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId); Map
??緊接著,根據(jù)目前集群的狀態(tài),初始化一份集群上所有的supervisor,并獲取所有可用的worker
StormClusterState stormClusterState = nimbusData.getStormClusterState(); // get all running supervisor, don"t need callback to watch supervisor MapsupInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null); // init all AvailableWorkerPorts for (Entry supInfo : supInfos.entrySet()) { SupervisorInfo supervisor = supInfo.getValue(); if (supervisor != null) //設置全部的端口都為可用,后面通過HB去除掉那些已經被使用的worker //supervisor是一個k-v,k是supervisorid,v是保存實例信息 supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts()); } //這個方法就是利用HB去掉那些掛掉的supervisor //判斷的方法是獲取每個supervisor最近的HB時間, //由當前時間減去最近HB時間和超時時間做對比。 getAliveSupervsByHb(supInfos, nimbusConf);
??接下來獲取拓撲中定義的taskid對應上組件,這里要解釋下,對于一個拓撲而言,taskid總是從1開始分配的,并且,相同的組件taskid是相鄰的。比如你定義了一個SocketSpout(并行度5),一個PrintBolt(并行度4,那么SocketSpout的taskid可能是1-5,PrintBolt的taskid可能是6-9。
//這個k-v,k是taskid,v是拓撲內定義的組件的id。 //寫過應用的同學都應該知道,TopologyBuilder在setSpout或者Bolt的時候,需要指定<組件id,對象,和并行度>。 //eg:builder.setSpout("integer", new ReceiverSpout(), 2); MaptaskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null); ret.setTaskToComponent(taskToComponent); //獲取所有的taskid。 Set allTaskIds = taskToComponent.keySet(); ret.setAllTaskIds(allTaskIds);
??如果原來存在舊的拓撲分配信息,還需要設置unstoppedTasks,deadTasks,unstoppedWorkers等信息。然后調用getFreeSlots方法負責去除那些已經分配出去的worker。處理過程比較直觀,獲取集群上所有的拓撲分配信息,然后根據(jù)每個分配信息中保存的worker信息,從原先supInfos中移除那些被分配出去的worker。
??如果沒有舊的分配信息,說明拓撲分配類型為ASSIGN_TYPE_NEW。如果存在同名的拓撲,也會把同名的拓撲設置舊的分配信息,放到上下文中。如果存在舊的分配信息,需要把舊的分配信息放入到上下文中,此外還要判斷是ASSIGN_TYPE_REBALANCE還是ASSIGN_TYPE_MONITOR,因為還需要設置unstoppedWorkers的信息。到這里,預分配,創(chuàng)建拓撲分配上下文就完成了。目前我們帶有比較重要的信息是拓撲所有的taskid,以及拓撲基本的組件信息。
??在完成拓撲上下文初始化之后,開始實際給拓撲分配相應的worker,不過這里需要判斷是本地模式還是集群模式,本地模式下比較簡單,找個一個合適的端口,然后新建一個worker的資源對象ResourceWorkerSlot,將一些關鍵信息如hostname,port,allTaskId配置好。因為local模式下比較簡單,所以,即使設置多個worker也不會啟動多個jvm。而在集群模式下,一個worker表示的是一個jvm進程。下面就重點講解集群下的分配情況。我把集群上的分配過程(assignTasks這個方法)分成三個主要的部分,分別是資源準備,worker分配,task分配。
Set資源準備assignments = null; if (!StormConfig.local_mode(nimbusData.getConf())) { IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); //集群下的分配,見下文講解 assignments = scheduler.assignTasks(context); } else { assignments = mkLocalAssignment(context); }
??首先第一步是判斷拓撲分配的類型是否符合要求,不符合則拋出異常。緊接著,根據(jù)上一個方法生成的拓撲分配上下文來生成一個默認的拓撲分配上下文實例對象,DefaultTopologyAssignContext這個類的構造方法執(zhí)行了很多很細節(jié)的操作。包括為拓撲添加附加的組件,存儲下taskid和組件的對應信息,計算拓撲需要的worker數(shù)目,計算unstopworker的數(shù)目等。
//根據(jù)之前的上下文,初始化一個分配的上下文對象 DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context); if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) { freeUsed(defaultContext); }
??下面代碼是DefaultTopologyAssignContext的構造方法
public DefaultTopologyAssignContext(TopologyAssignContext context){ super(context); try { sysTopology = Common.system_topology(stormConf, rawTopology); } catch (Exception e) { throw new FailedAssignTopologyException("Failed to generate system topology"); } sidToHostname = generateSidToHost(); hostToSid = JStormUtils.reverse_map(sidToHostname); if (oldAssignment != null && oldAssignment.getWorkers() != null) { oldWorkers = oldAssignment.getWorkers(); } else { oldWorkers = new HashSet(); } refineDeadTasks(); componentTasks = JStormUtils.reverse_map(context.getTaskToComponent()); for (Entry > entry : componentTasks.entrySet()) { List componentTaskList = entry.getValue(); Collections.sort(componentTaskList); } totalWorkerNum = computeWorkerNum(); unstoppedWorkerNum = computeUnstoppedAssignments(); }
??從上面的代碼可以看出在DefaultTopologyAssignContext的構造方法中,第一句是調用父類構造方法先去初始化一些參數(shù),然后調用system_topology這個方法。下面來看看這個方法的內部。第一個方法就是添加一個acker到原來的拓撲中去。拓撲作為JStrom處理的一個邏輯模型,對用戶提供了非常簡單且強大的編程原語,只要分別繼承兩大組件,就可以構造一個拓撲模型,但是實際上,一個實際運行的拓撲模型遠遠不止用戶定義的用于處理輸入的spout和用于處理業(yè)務的bolt,JStorm為了保證消息的可靠性,拓撲Metrics管理,拓撲HB管理,再拓撲實際模型中添加了幾個非常重要的bolt,下面就詳細的介紹acker,用于保證消息的可靠性。
public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException { StormTopology ret = topology.deepCopy(); add_acker(storm_conf, ret); addTopologyMaster(storm_conf, ret); add_metrics_component(ret); add_system_components(ret); return ret; }
??這里先來介紹下StormTopology這個類,才能往下理解。StormTopology這個類用于存儲拓撲的組件信息,在這個類內部,有三個非常重要的成員變量,分別存儲spout和bolt以及state_spout,第三個筆者暫時沒有弄清楚其作用,但是前兩個就非常明顯,分別存儲拓撲的兩大組件,spout和bolt
private Mapspouts; // required private Map bolts; // required private Map state_spouts; // required
??Map中的key表示我們定義的組件的id,上文提到過的id。SpoutSpec和Bolt中有兩個重要的成員變量。
private ComponentObject spout_object; // required private ComponentCommon common; // required
??ComponentObject用于存儲序列化后的代碼信息,第二個ComponentCommon用于存儲很重要的配置信息,包括輸入的流,輸出的流和分組信息。有三個重要的成員變量
//GlobalStreamId有兩個String成員變量,componentId表示這個輸入組件的流來源的那個組件id, //streamId表示componentId所輸出的特定的流 private Mapinputs; // 輸入的來源和分組情況 //StreamInfo有個重要的成員變量List output_fields,表示輸出的域。 private Map streams; // 輸出的流 private int parallelism_hint; // 并行度
??根據(jù)上述的結構,StormTopology能夠完整的表示拓撲中每個組件輸出之后的流所流向的位置。
??這一小節(jié)筆者不打算先從源碼的角度入手,先來將一個acker的作用以及從一個小例子來講解acker是怎樣工作的。我們都知道作為一個流式處理框架,消息的可靠性是一個非常特性之一。除開更加高級的事務框架能保證消息只被處理一次(exactly-once),JStorm本身也提供了at-least-once,這個機制能保證消息一定會被處理。下面從一個例子的角度來講解,這是如何實現(xiàn)的。
??如上圖所示,integer作為輸入的spout,sliding和printer都是負責處理的bolt,F(xiàn)ield表示之間輸出的元組內的元素對應的key。StreamID為默認,不指定數(shù)據(jù)流分組的形式,則默認情況下shuffle。上述是一個非常簡單的拓撲邏輯結構,然后在經過add_acker這個方法之后,實際的拓撲結構發(fā)生了一些變化,如下圖
??JStrom為原來的拓撲結構添加了一個_ack的bolt,負責維護拓撲的可靠性,大致的情況可以從上圖中看出,每當一個元組被發(fā)送到拓撲下游bolt中去的時候,也會發(fā)送到_ack中去保存下來,然后后續(xù)處理的每個bolt每次調用ack函數(shù)都會發(fā)送給_ack(bolt),在指定時間間隔內收到最后處理的ack,那么_ack(bolt)就發(fā)送一個消息給最初的spout,則保證了一個元組的可靠性。所以綜上,_ack這個Bolt就是維護了整個拓撲的可靠性,那么讀者可能會問,_ack里面保存了那么多的消息,如果某個元組經過的組件非常多,是否會造成該元組的拓撲樹變的很大。這里阿里利用異或,實現(xiàn)了一個非常簡單且高效低耗的判斷方法。
??其實在_ack中存儲的內容非常簡單,就是一個k-v鍵值對,k是一個隨機無重復的id(root_id),且在元組被處理的整個過程中保持不變,將消息存儲為
??后續(xù)的幾個方法如addTopologyMaster,add_metrics_component,add_system_components都是添加了相應的控件(bolt)來進行協(xié)同操作。比如topology master可以負責metrics,也可以負責baskpressure(反壓)機制。筆者還沒深入解讀,相應部分后續(xù)再做相應的添加,這里先挖個坑。
??在DefaultTopologyAssignContext的構造函數(shù)中,添加完附加的組件之后,緊接著獲取supervisorid和hostname對應的鍵值對,如果存在舊的分配信息,則獲取原先所有的worker,如果沒有,則新建一個worker的集合。去除deadtaskid中那些在unstopworker內的task(這里的目的是分開處理,如果是new的情況下,這兩個都是空集)。然后計算需要的worker數(shù)目。看下面的源碼,
private int computeWorkerNum() { //獲取拓撲設置的worker數(shù)目 Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS)); // int ret = 0, hintSum = 0, tmCount = 0; Mapworker分配components = ThriftTopologyUtils.getComponents(sysTopology); for (Entry entry : components.entrySet()) { String componentName = entry.getKey(); Object component = entry.getValue(); ComponentCommon common = null; if (component instanceof Bolt) { common = ((Bolt) component).get_common(); } if (component instanceof SpoutSpec) { common = ((SpoutSpec) component).get_common(); } if (component instanceof StateSpoutSpec) { common = ((StateSpoutSpec) component).get_common(); } //獲取每個組件中設置的并行度 int hint = common.get_parallelism_hint(); if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { //如果是屬于TM組件,則加到tmCount tmCount += hint; continue; } //這個變量存下所有組件并行度的和 hintSum += hint; } //ret存下較小的值 if (settingNum == null) { ret = hintSum; } else { ret = Math.min(settingNum, hintSum); } //這里還需要判斷主TM是否需要一個獨立的worker節(jié)點用于處理 Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf); if (isTmSingleWorker != null) { if (isTmSingleWorker == true) { ret += tmCount; setAssignSingleWorkerForTM(true); } } else { if (ret >= 10) { ret += tmCount; setAssignSingleWorkerForTM(true); } } return ret; }
??實例化完DefaultTopologyAssignContext之后,如果是rebalance類型,則還需要先將原先占用的那些worker給釋放掉,具體做法就是將worker使用的端口放回可用端口集合中。幾個變量的含義,needAssignTasks:就是指需要分配的task,也就是除去unstopworker中的那些task。allocWorkerNum:等于原先計算好的worker的數(shù)目-減去unstopworker的數(shù)目再減去keepAssigns(只有在拓撲類型是ASSIGN_TYPE_MONITOR才有的)的數(shù)目。實際worker分配中,最重要是方法WorkerScheduler.getAvailableWorkers。下面就來詳細講解這個方法內部怎么實現(xiàn)。
int workersNum = getAvailableWorkersNum(context); if (workersNum < allocWorkerNum) { throw new FailedAssignTopologyException("there"s no enough worker.allocWorkerNum="+ allocWorkerNum + ", availableWorkerNum="+ workersNum); } workersNum = allocWorkerNum; ListassignedWorkers = new ArrayList (); getRightWorkers(context,needAssign,assignedWorkers,workersNum,getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));
??首先得知集群上可用的全部worker,如果可用的worker小于需要分配的worker數(shù),則需要拋出異常。如果足夠,則會分配足量的worker給指定的拓撲。調用getRightWorkers這個方法來獲取合適的worker,這里所謂right的worker是指用戶自定義的worker,可以指定worker的資源分配情況。
??分為兩部分來講解這個方法,首先是準備工作--getUserDefineWorkers這個方法,這個方法需要兩個參數(shù),拓撲的上下文信息context,用戶自定義的worker列表workers。看下面的源碼:
private ListgetUserDefineWorkers( DefaultTopologyAssignContext context, List workers) { List ret = new ArrayList (); //如果沒有用戶自定義的worker,則沒必要任何操作 if (workers == null) return ret; Map > componentToTask = (HashMap >) ((HashMap >) context .getComponentTasks()).clone(); //如果分配類型不是NEW,則還是從workers資源分配信息列表中去除unstopworker。 //這里是用戶有指定某些worker資源屬于unstopworker才能去掉。 if (context.getAssignType() != context.ASSIGN_TYPE_NEW) { checkUserDefineWorkers(context, workers, context.getTaskToComponent()); } //遍歷用戶定義的worker,去除那些沒有分配task的worker //用戶定義的worker中已經指定哪些task該分配到哪個worker中 for (WorkerAssignment worker : workers) { ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,componentToTask); if (workerSlot.getTasks().size() != 0) { ret.add(workerSlot); } } return ret; }
??去除那些沒有指定task的worker之后,真正進入getRightWorkers方法內部。源碼如下,這里解釋下五個參數(shù)的含義,context表示之前準備的拓撲上下文信息,needAssign表示這個拓撲需要分配的各個taskid,assignedWorkers表示用來存儲那些在這個方法內分配到的worker資源,workersNum表示需要拓撲需要分配的worker數(shù)目,workers表示上個方法中用戶自定義的可用的worker資源。簡而言之,這個方法就是從workers中選出已經分配了指定的task的worker,然后存到assignedWorkers中去。
private void getRightWorkers(DefaultTopologyAssignContext context, SetneedAssign, List assignedWorkers, int workersNum, Collection workers) { Set assigned = new HashSet (); List users = new ArrayList (); if (workers == null) return; for (ResourceWorkerSlot worker : workers) { boolean right = true; Set tasks = worker.getTasks(); if (tasks == null) continue; for (Integer task : tasks) { if (!needAssign.contains(task) || assigned.contains(task)) { right = false; break; } } if (right) { assigned.addAll(tasks); users.add(worker); } } if (users.size() + assignedWorkers.size() > workersNum) { LOG.warn( "There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}", users, assignedWorkers, workersNum); return; } assignedWorkers.addAll(users); needAssign.removeAll(assigned); }
??上面代碼主要的處理邏輯是在for循環(huán)中,在這個循環(huán)會去判斷worker內是否存有本拓撲內的taskid,如果有則把worker存儲起來,并且從taskid列表中移除掉那些分配出去的task,沒有則直接退出了。
??回到getAvailableWorkers方法內,看下面這段代碼。
//如果配置指定要使用舊的分配,則從舊的分配中選出合適的worker。 if (ConfigExtension.isUseOldAssignment(context.getStormConf())) { getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers()); } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) { //如果是rebalance,且可以使用原來的worker,將原來使用的worker存儲起來。 int cnt = 0; for (ResourceWorkerSlot worker : context.getOldWorkers()) { if (cnt < workersNum) { ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot(); resFreeWorker.setPort(worker.getPort()); resFreeWorker.setHostname(worker.getHostname()); resFreeWorker.setNodeId(worker.getNodeId()); assignedWorkers.add(resFreeWorker); cnt++; } else { break; } } } // 計算TM bolt的個數(shù) int workersForSingleTM = 0; if (context.getAssignSingleWorkerForTM()) { for (Integer taskId : needAssign) { String componentName = context.getTaskToComponent().get(taskId); if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { workersForSingleTM++; } } } int restWokerNum = workersNum - assignedWorkers.size(); if (restWokerNum < 0) throw new FailedAssignTopologyException( "Too much workers are needed for user define or old assignments. workersNum=" + workersNum + ", assignedWokersNum=" + assignedWorkers.size());
??筆者一開始覺得上述的代碼可能是在判斷restWokerNum < 0是很可能會成立而導致拋出異常的,因為如果用戶一開始就指定了worker分配信息,然后rebalance情況下,不斷去添加舊的worker到assignedWorkers內,這樣就會導致assignedWorkers的大小比實際需要的worker數(shù)目workersNum大。但是還沒來得及用實際集群去測試,只是在github問了官方的人,如果有更新解決方案會后續(xù)再這里說明。
//restWokerNum是剩下需要的worker的數(shù)目,直接添加ResourceWorkerSlot實例對象。 for (int i = 0; i < restWokerNum; i++) { assignedWorkers.add(new ResourceWorkerSlot()); } //這里是獲取那些專門指定運行拓撲的supervisor節(jié)點。 ListisolationSupervisors = this.getIsolationSupervisors(context); if (isolationSupervisors.size() != 0) { putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(isolationSupervisors)); } else { putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(context.getCluster())); } this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers); LOG.info("Assigned workers=" + assignedWorkers); return assignedWorkers;
??上述代碼中的isolationSupervisors存放的是那些指定給這個拓撲的supervisor節(jié)點的id。如果有指定,則在這些特定的節(jié)點上分配,如果沒有指定,那么,就在全局內分配。所以實際剩下的分配任務的是putAllWorkerToSupervisor這個方法,getResAvailSupervisors這個方法負責剔除那些無法分配worker的supervisor節(jié)點,因為節(jié)點上分配的worker已經滿了。下面來介紹putAllWorkerToSupervisor這個方法的作用。
??putAllWorkerToSupervisor需要兩個參數(shù),第一個是已經分配的worker,包含那些還沒有設定運行在那個節(jié)點的worker(上面直接新建的那些worker),第二個參數(shù)是目前可用的supervisor節(jié)點。下面是這個方法的代碼
private void putAllWorkerToSupervisor( ListassignedWorkers, List supervisors) { for (ResourceWorkerSlot worker : assignedWorkers) { if (worker.getHostname() != null) { for (SupervisorInfo supervisor : supervisors) { if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0) { putWorkerToSupervisor(supervisor, worker); break; } } } } supervisors = getResAvailSupervisors(supervisors); Collections.sort(supervisors, new Comparator () { @Override public int compare(SupervisorInfo o1, SupervisorInfo o2) { // TODO Auto-generated method stub return -NumberUtils.compare( o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size()); } }); putWorkerToSupervisor(assignedWorkers, supervisors); }
??進入方法的第一步,首先要做的事情,就是對于那些已經分配好節(jié)點的worker,從supervisor節(jié)點上給該worker分配一個合適的端口。putWorkerToSupervisor這方法主要的操作是從supervisor節(jié)點上獲取一個可用的端口,然后設置worker的端口,并將該端口從supervisor節(jié)點的可用端口列表中移除。代碼結構非常簡單,如下:
private void putWorkerToSupervisor(SupervisorInfo supervisor, ResourceWorkerSlot worker) { int port = worker.getPort(); if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) { port = supervisor.getAvailableWorkerPorts().iterator().next(); } worker.setPort(port); supervisor.getAvailableWorkerPorts().remove(port); worker.setNodeId(supervisor.getSupervisorId()); }
??設置好了一部分已經分配好的worker之后,繼續(xù)分配那些沒有指定supervisor的worker。根據(jù)supervisor中可用端口逆序,從大到小排。然后調用putWorkerToSupervisor這個方法。
??putWorkerToSupervisor方法內部首先統(tǒng)計所有已經使用的端口,然后計算出一個理論的負載平均值{(所有使用掉的+將要分配的)/supervisor的個數(shù),就會得到分配后,集群的一個理論上的負載值theoryAveragePorts,可以平攤到每個supervisor身上}。然后通過遍歷需要分配worker的list,進行第一次分配,可以將worker依次分配到那些負載值(跟理論值的計算方式一樣)小于理論平均負載的supervisor上。而超過負載的,則放進到負載列表中。經過一輪分配之后,如果還存在沒有分配的worker(源碼這里先進行排序再進行判斷,很明顯造成排序時間浪費的可能性)。根據(jù)supervisor中可用端口逆序,從大到小排序。再不斷將worker分配進去。
??到這里,worker的分配就順利結束了,總結一下,首先是根據(jù)拓撲信息初始化上下文信息,然后計算出實際使用的worker數(shù)目,如果這些worker有指定運行在某個supervisor節(jié)點上,那么就在節(jié)點上分配合適的worker。如果沒有指定,那么就根據(jù)節(jié)點的負載情況,盡量平均的分配到每個supervisor節(jié)點上。如果大家的負載都比較大的情況下,再分配到哪些具有比較多的可用端口的節(jié)點,完成分配。
??getAvailableWorkers方法完成了worker的分配,以及如果用戶指定了特定的worker上運行指定的task,剩下的taskid將會在接下來的方法中說明如何去分配。主要在TaskScheduler的構造函數(shù)中,這里需要三個參數(shù),第一個是拓撲的上下文信息defaultContext,第二個是需要分配的task的列表needAssignTasks,以及上文中獲取到的合適的worker列表availableWorkers。(ps:記住,前文如果沒有指定特定的worker資源分配的信息,則沒有taskid被分配到worker中去,也就是worker內部僅有supervisorid,內存,cpu,端口等信息,不存在tasks信息)。接下來看看TaskScheduler的構造函數(shù)。
public TaskScheduler(DefaultTopologyAssignContext context, Settasks, List workers) { this.tasks = tasks; LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers); this.context = context; this.taskContext = new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent()); this.componentSelector = new ComponentNumSelector(taskContext); this.inputComponentSelector = new InputComponentNumSelector(taskContext); this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext); if (tasks.size() == 0) return; if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){ // warning ! it doesn"t consider HA TM now!! if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) { assignForTopologyMaster(); } } int taskNum = tasks.size(); Map workerSlotIntegerMap = taskContext.getWorkerToTaskNum(); Set preAssignWorkers = new HashSet (); for (Entry worker : workerSlotIntegerMap.entrySet()) { if (worker.getValue() > 0) { taskNum += worker.getValue(); preAssignWorkers.add(worker.getKey()); } } setTaskNum(taskNum, workerNum); // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers. // Remove the workers which have been assigned with enough workers. for (ResourceWorkerSlot worker : preAssignWorkers) { if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){ Set doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker); if (doneWorkers != null) { for (ResourceWorkerSlot doneWorker : doneWorkers) { taskNum -= doneWorker.getTasks().size(); workerNum--; } } } } setTaskNum(taskNum, workerNum); // For Scale-out case, the old assignment should be kept. if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) { keepAssignment(taskNum, context.getOldAssignment().getWorkers()); } }
??在這個構造函數(shù)中,首先是構造一個task分配的上下文信息。這個對象主要需要維護的幾個重要信息是
taskToComponent:一個Map,Key表示taskid,Value表示所對應的組件id。
supervisorToWorker:也是一個Map,Key表示這個拓撲分配的supervisorid,Value表示節(jié)點上分配到的worker列表。
relationship:維護這個拓撲的一個結構信息,依然是個Map,Key表示組件bolt/spout的組件id,Value表示的是,如果Key對應組件是一個bolt,則Value存下是所有輸入到組件的對應組件的id。如果Key對應組件是一個spout,則Value存下是這個組件所有輸出到的組件id。舉個例子,integer(spout)輸出到sliding(bolt),sliding(bolt)輸出到printer(bolt)。則relationship存下的是[{integer,[sliding]},{sliding,[integer]},{printer,[sliding]}]。
workerToTaskNum:Map,Key表示一個worker,Value表示實際在這個worker上運行的task的總數(shù)目。
workerToComponentNum:Map,Key表示一個worker,Value表示一個Map,存下的是組件id,以及對應的數(shù)目。
??緊接著初始化三個selector,第一個是ComponentNumSelector(內部定義了一二WorkerComparator,負責對worker進行比對,對比worker內某個組件的task數(shù)目。以及對比每個supervisor上所有worker內某個組件的總task和),第二個是InputComponentNumSelector(內部也是定義了兩個比對函數(shù),一個是獲取worker內某個組件的全部輸入的task個數(shù),以及在整個supervisor上的全部輸入task個數(shù)),第三個是TotalTaskNumSelector(worker內全部task的個數(shù),和supervisor上全部task的個數(shù))。這三個selector的目的都是為了后續(xù)合理的將task分配到這些worker上做的準備。
??如果集群資源足夠,用戶定義TM需要多帶帶分配到一個獨立的worker上,則需要調用assignForTopologyMaster進行多帶帶分配。
private void assignForTopologyMaster() { int taskId = context.getTopologyMasterTaskId(); ResourceWorkerSlot workerAssigned = null; int workerNumOfSuperv = 0; for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){ Listworkers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId()); if (workers != null && workers.size() > workerNumOfSuperv) { for (ResourceWorkerSlot worker : workers) { Set tasks = worker.getTasks(); if (tasks == null || tasks.size() == 0) { workerAssigned = worker; workerNumOfSuperv = workers.size(); break; } } } } if (workerAssigned == null) throw new FailedAssignTopologyException("there"s no enough workers for the assignment of topology master"); updateAssignedTasksOfWorker(taskId, workerAssigned); taskContext.getWorkerToTaskNum().remove(workerAssigned); assignments.add(workerAssigned); tasks.remove(taskId); workerNum--; LOG.info("assignForTopologyMaster, assignments=" + assignments); }
??這個方法首先是找出某個最合適的worker,這個worker符合兩個條件,一是沒有分配其他的task,第二,worker所在的supervisor相對分配了最多的worker,第二點的目的是保證負載均衡。如果找不到合適的worker,那么就拋出異常。如果能找到的話,就把負責TM的task分配給這個worker。updateAssignedTasksOfWorker這個方法的目的就是更新新的分配情況。
??接下來獲取全部的task數(shù)目,以及已經分配出去的worker列表preAssignWorkers。根據(jù)獲得的總task數(shù)目來計算每個worker上平均的task數(shù)目avgTaskNum,以及剩下多少還沒有分配出去的task(總task%總worker,求得余數(shù)leftTaskNum)。然后遍歷preAssignWorkers,調用方法removeWorkerFromSrcPool來判斷一個worker是否分配了足夠的task,并且移除那些已經合理分配的task和worker。
for (ResourceWorkerSlot worker : preAssignWorkers) { if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){ SetdoneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker); if (doneWorkers != null) { for (ResourceWorkerSlot doneWorker : doneWorkers) { taskNum -= doneWorker.getTasks().size(); workerNum--; } } } }
??removeWorkerFromSrcPool這個方法挺有趣的,第一次看的時候有點懵逼,但是其實仔細看下就很明確了。下面我簡單講解下:
private SetremoveWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) { Set ret = new HashSet (); if (leftTaskNum <= 0) { if (taskNum >= avgTaskNum) { taskContext.getWorkerToTaskNum().remove(worker); assignments.add(worker); ret.add(worker); } } else { if (taskNum > avgTaskNum ) { taskContext.getWorkerToTaskNum().remove(worker); leftTaskNum = leftTaskNum -(taskNum -avgTaskNum); assignments.add(worker); ret.add(worker); } if (leftTaskNum <= 0) { List needDelete = new ArrayList (); for (Entry entry : taskContext.getWorkerToTaskNum().entrySet()) { if (avgTaskNum != 0 && entry.getValue() == avgTaskNum) needDelete.add(entry.getKey()); } for (ResourceWorkerSlot workerToDelete : needDelete) { taskContext.getWorkerToTaskNum().remove(workerToDelete); assignments.add(workerToDelete); ret.add(workerToDelete); } } } return ret; }
??ret保存的是需要返回給調用者需要移除的worker集合。看這個方法,首先判斷,在剩余數(shù)小于等于0的情況,如果當前worker內的task數(shù)目大于等于平均數(shù),說明這個worker的確分配了合理的task。(原因是,如果leftTaskNum小于等于0,是不是就看成,平均數(shù)會比正常情況下加1。舉個例子,有3個盒子,10個球放進去,那么,平均數(shù)為3的情況下,余數(shù)為1,如果平均數(shù)為4,那么余數(shù)就是-2了)。如果leftTaskNum大于0,判斷就復雜一點,首先如果數(shù)目taskNum大于平均的avgTaskNum,說明這個worker多分配了一些task,那么這些多分配的就必須從leftTaskNum減去。甚至可能taskNum的數(shù)目大于avgTaskNum+leftTaskNum的數(shù)目,那么直接導致leftTaskNum小于等于0。在leftTaskNum小于等于0的情況下,找出分配上下文中worker分配的task數(shù)目剛好是平均數(shù)的worker,存在needDelete列表中。然后遍歷這個列表,把這些worker從加到需要移除的集合ret中,并返回。(因為如果有某個worker分配的數(shù)目多于avgTaskNum+leftTaskNum的數(shù)目,那么那些分配數(shù)是平均數(shù)的worker肯定是合理的,剩下那些分配小于平均數(shù)的才是需要調整的)。
??在執(zhí)行完上述的操作之后,更新下目前的平均數(shù)avgTaskNum和分配剩余的task數(shù)目leftTaskNum。(此刻還有一些task尚未實際分配),完成分配的調度是在assign方法中。在這個方法內,如果已經沒有需要分配的task,則將原來已經分配好的返回就行了。如果還存在需要分配的task,遍歷這個需要分配的task列表,如果task對應的組件屬于系統(tǒng)組件(組件id為__acker或者__topology_master的組件),則存下來,如果是一般的task,則調用chooseWorker方法選擇一個合適的worker,然后將task分配到worker上。(當然這里還需要做一些額外的操作,比如清除那些已經合理的分配的worker,通過調用removeWorkerFromSrcPool這個方法去清除)。而chooseWorker這個方法利用的就是前文提到的三個selector來選擇最佳的supervisor,選擇最佳的worker(需要考慮這個task接收的input,需要考慮supervisor節(jié)點的負載情況和worker內的負載情況)。分配完普通的task之后,在分配系統(tǒng)組件,分配方式也是一樣的。
??至此,task的分配也完成,總結一下,除開那些已經指定的分配外,比較重要的是,定義合理的selector(綜合考慮節(jié)點負載,worker負載,已經input輸入,考慮本地化)。分配的同時不斷去檢測是否已經有worker已經合理分配了,就不要在繼續(xù)分配到那個worker上。
??上述完成task和worker的分配之后,回到mkAssignment方法。剩下的操作就是設置task的HB起始時間和超時時間。這些比較簡單就不再細說了。
結束語??解讀拓撲分配的過程可以讓我們更加清楚,我們寫的一個邏輯拓撲,實際上是如何變成一個可以實際運行在集群的拓撲。以及拓撲如何保證負載均衡等問題。筆者后續(xù)還會更新JStorm幾個比較重要的特性的源碼分析。包括如何實現(xiàn)反壓機制,如何實現(xiàn)nimbus和supervisor容錯,supervisor啟動的時候需要執(zhí)行那些操作。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67029.html
摘要:方法首先初始化一個回調函數(shù),這是當一個成為之后就會調用的一個用于初始化一系列變量的方法,包括拓撲如何在集群上分配,拓撲狀態(tài)更新,清除函數(shù),還有監(jiān)控線程等。 寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關注或者收藏,轉發(fā)請先私信我,謝謝。對了,筆者看的是2.2.1這個版本 概述 ?...
摘要:第二個問題就是說業(yè)務團隊之間沒有擴大管理,預算和審核是無頭緒的。支持一些高優(yōu)先級的比如說支持以及窗口等特性包括說。到現(xiàn)在為止,整體遷移完了,還剩下十個左右的作業(yè)沒有遷移完。 作者:張光輝 本文將為大家展示字節(jié)跳動公司怎么把Storm從Jstorm遷移到Flink的整個過程以及后續(xù)的計劃。你可以借此了解字節(jié)跳動公司引入Flink的背景以及Flink集群的構建過程。字節(jié)跳動公司是如何兼容以...
摘要:即使是容器已經退出的也可以看到,所以可以通過這種方式來分析非預期的退出。也可以直接通過在容器內啟動一個更方便地調試容器,不必一條條執(zhí)行。和獲得容器中進程的狀態(tài)和在容器里執(zhí)行的效果類似。通過查看容器的詳細信息飯后鏡像和容器的詳細信息。 『重用』容器名 但我們在編寫/調試Dockerfile的時候我們經常會重復之前的command,比如這種docker run --name jstorm-...
摘要:在每臺主機上我們執(zhí)行列出主機和網絡接口。其它的應用服務容器每個容器有兩個地址,一個屬于子網,另一個屬于的子網。雖然這會帶來一些性能上的影響,但是可以確保的網絡默認是安全的。 本文中,我們首先將Rancher部署到EC2實例上,并且添加新的主機,之后用Rancher的Catalog啟動了RocketChat應用,緊接著對運行中的容器的網絡接口和其他屬性的進行了分析。 同時,我們簡要介紹了...
閱讀 1529·2021-11-22 09:34
閱讀 3329·2021-09-29 09:35
閱讀 576·2021-09-04 16:40
閱讀 2919·2019-08-30 15:53
閱讀 2594·2019-08-30 15:44
閱讀 2591·2019-08-30 14:10
閱讀 1335·2019-08-29 18:43
閱讀 2215·2019-08-29 13:26