摘要:免費領取驗證碼內容安全短信發送直播點播體驗包及云服務器等套餐更多網易技術產品運營經驗分享請訪問網易云社區。文章來源網易云社區
本文由作者林洋港授權網易云社區發布。
作為服務端程序,我們總是需要向外界報告一些統計數據,以助于了解系統的運行情況,比如某個接口的調用時間、系統處理的請求數等等。當我們的程序以Storm Topology的形式運行時同樣需要輸出這些統計數據。Storm為我們提供了Metric接口,可以方便的把一些統計指標輸出到指定的地方。Storm Metric的統計方式為每隔指定的時間間隔輸出統計內容。本文首先介紹Storm Metric相關的接口以及它們之間的關系,然后以實際應用中的一個例子來說明如何使用Metric接口。本文使用的Storm版本為0.9.1-incubating。
IMetric是Storm用于保存統計數據的接口
public interface IMetric {
public Object getValueAndReset();
}
接口只有一個getValueAndReset方法,當需要輸出統計內容時,Storm就會調用這個方法。值得注意的是getValueAndReset方法返回的是Object類型,這為統計內容的形式提供了靈活性,我們可以返回任意的類型作為統計信息,這一點在后面的例子中我們會再提到。另一個引起我們注意的地方是IMetric接口并沒有聲明更新統計數據的方法,這樣當我們實現IMetric接口的時候就更加靈活了——參數類型、參數個數都沒有限制。Storm自身提供了6個IMetric實現:AssignableMetric、CombinedMetric、CountMetric、MultiCountMetric、ReducedMetric、StateMetric。這里只介紹CountMetric和MultiCountMetric的使用方式,以印證前面說的IMetric接口統計數據更新方式的靈活性以及getValueAndReset返回Object類型的靈活性。CountMetric就是一個簡單的計數器,有兩個方法incr()和incrBy(long incrementBy),其getValueAndReset方法返回一個long類型的值:
public Object getValueAndReset() { long ret = _value; _value = 0; return ret; }
MultiCountMetric,顧名思義,就是多個指標的計數器,維護著一個Map,只有一個方法CountMetric scope(String key)。因此MultiCountMetric的更新方式為MultiCountMetric.scope(key).incr()或MultiCountMetric.scope(key).incrBy(long incrementBy)。它的getValueAndReset返回的是一個Map:
public Object getValueAndReset() { Map ret = new HashMap(); for(Map.Entry e : _value.entrySet()) { ret.put(e.getKey(), e.getValue().getValueAndReset()); } return ret; }
除了IMetric接口,還有另外一個接口IMetricsConsumer,它負責向外輸出統計信息,即把IMetric getValueAndReset方法返回的數據輸出到外面。IMetricsConsumer有三個方法
void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter); void handleDataPoints(TaskInfo taskInfo, Collection dataPoints); void cleanup();
其中prepare是初始化,cleanup是生命周期結束時的清理工作,handleDataPoints才是真正的統計信息輸出方法,taskInfo參數存儲當前task的信息(host、port、component id、task id等等),dataPoints存儲的是IMetric返回的統計信息,可能是出于性能考慮,dataPoints是一個集合,包含了多個IMetric返回的數據。讓我們來具體看看DataPoint這個類:
public static class DataPoint { @Override public String toString() { return "[" + name + " = " + value + "]"; } public String name; public Object value; }
name是IMetric注冊時的名字,value就是IMetric getValueAndReset返回的那個Object。
Storm只提供了一個IMetricsConsumer實現——LoggingMetricsConsumer。LoggingMetricsConsumer做的事情很簡單,就是把dataPoints輸出到日志文件metrics.log,下面是其handleDataPoints方法的部分代碼:
for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.name) .append(padding).delete(header.length()+23,sb.length()).append(" ") .append(p.value); LOG.info(sb.toString()); }
可以看到它通過調用DataPoint的value的toString方法把統計信息輸出到日志里面的,所以如果你的IMetric實現返回的是自己定義的類型,記得重載toString()方法,讓統計信息以可讀的格式輸出。
到這里Storm的Metric接口和自帶的實現基本介紹完了,接下來我們來看看怎么使用Storm自帶的這些實現。首先,Storm默認的配置是關掉Metric功能的,可以有兩種方式開啟Metric功能: 1)在storm.yaml里面配置,這種是集群級別的設置,個人不建議這么做,所以就不多介紹了 2)conf.registerMetricsConsumer(Class klass, long parallelismHint);這是topology級別的,klass是IMetricsConsumer的實現類,parallelismHint這個參數Storm代碼里面沒注釋我也沒深入看底層的實現,這里結合自己的實驗談談它的意義:topology是在1個或多個worker上面以多個task的方式跑的嘛,parallelismHint就是指定多少個并發來輸出統計信息。這里我也不知道parallelismHint指的是多個task、worker還是supervisor,反正parallelismHint=1的時候只在特定的一個supervisor下面的metrics.log有統計信息,parallelismHint>1時可能取決于worker的數量,我測試的時候由于是在多個supervisor上跑的,因此觀察到多個supervisor都有metrics.log的輸出。個人經驗是parallelismHint設為1,這樣可以在一個supervisor下面的metrics.log就能看到所有task的統計信息。
由于我建議采用第二種方法,所以示例代碼為:
//客戶端注冊IMetricsConsumer
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
StormSubmitter.submitTopology(name, conf, builder.createTopology());
//我們假設要統計spout某段代碼的調用次數
//注冊IMetric
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { ... metric=new CountMetric(); context.registerMetric("spout time cost", metric, 60); //因此DataPoint的name為spout time cost,60表示1分鐘統計一次 ... } //更新統計數據
@Override
public void nextTuple() { if(...)... else{ ... metric.incr(); } } 這樣就可以了,然后你就能在metrics.log看到統計數據了。
現在,假設我們的需求跟上面不太一樣:1)metrics.log只打印我們自己維護的統計信息,屏蔽__system、__fail-count這種系統自己的統計信息;2)不只統計代碼的調用次數,還要統計調用時間——最小時間、最大時間、平均時間。
第一點可以通過重載LoggingMetricsConsumer的方法來實現:
public class AppLoggingMetricsConsumer extends LoggingMetricsConsumer {
@Override public void handleDataPoints(TaskInfo taskInfo, CollectiondataPoints) { if (taskInfo.srcComponentId != null && taskInfo.srcComponentId.startsWith("__")) return; if (dataPoints == null || dataPoints.isEmpty()) return; List list = new ArrayList (); for (DataPoint p : dataPoints) { if (p.name == null || p.name.startsWith("__")) continue; list.add(p); } if (list.isEmpty()) return; super.handleDataPoints(taskInfo, list); }
}
第二點需要開發我們自己的IMetric接口實現類TimeCostMetric,以下是其主要代碼:
@Override public Object getValueAndReset() { TimeCost timeCost=new TimeCost(); timeCost.count=count; if(timeCost.count>0){ timeCost.min=min; timeCost.max=max; timeCost.mean=all*1.0/timeCost.count; } init(); return timeCost; }
public void update(long time){
count++; all+=time; if(min>time)min=time; if(max
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/25377.html
摘要:項目地址前言大數據技術棧思維導圖大數據常用軟件安裝指南一分布式文件存儲系統分布式計算框架集群資源管理器單機偽集群環境搭建集群環境搭建常用命令的使用基于搭建高可用集群二簡介及核心概念環境下的安裝部署和命令行的基本使用常用操作分區表和分桶表視圖 項目GitHub地址:https://github.com/heibaiying... 前 言 大數據技術棧思維導圖 大數據常用軟件安裝指...
摘要:關于三者的一些概括總結離線分析框架,適合離線的復雜的大數據處理內存計算框架,適合在線離線快速的大數據處理流式計算框架,適合在線的實時的大數據處理我是一個以架構師為年之內目標的小小白。 整理自《架構解密從分布式到微服務》第七章——聊聊分布式計算.做了相應補充和修改。 [TOC] 前言 不管是網絡、內存、還是存儲的分布式,它們最終目的都是為了實現計算的分布式:數據在各個計算機節點上流動,同...
摘要:源碼版本簡介是下的一個監控項目,用于進行容器集群的監控和性能分析?;镜墓δ芗案拍罱榻B可以回顧我之前的一篇文章監控之介紹。在源碼分析之前我們先介紹的實現流程,由上圖可以看出會從各個上獲取相關的監控信息,然后進行匯總發送給后臺數據庫。 源碼版本 heapster version: release-1.2 簡介 Heapster是Kubernetes下的一個監控項目,用于進行容器集群的監控...
摘要:本文所闡述的時間序列數據庫,系筆者所負責產品對性能指標進行聚合分組過濾過程中的梳理和總結。而帶有標志的,則是數據采集源,將數據發給服務。左面的則是的特點之一,其規則為以上屬性值均為對應名稱的。 【編者按】 劉斌,OneAPM后端研發工程師,擁有10多年編程經驗,參與過大型金融、通信以及Android手機操作系的開發,熟悉Linux及后臺開發技術。曾參與翻譯過《第一本Docker書》、《...
閱讀 2854·2023-04-25 17:59
閱讀 685·2023-04-25 15:05
閱讀 674·2021-11-25 09:43
閱讀 3036·2021-10-12 10:13
閱讀 3540·2021-09-27 13:59
閱讀 3587·2021-09-23 11:21
閱讀 3884·2021-09-08 09:35
閱讀 569·2019-08-29 17:12