摘要:第三個就是比較重點的內容,在有贊的實踐。第四部分是將實時計算化,界面化的一些實踐。二有贊實時平臺架構有贊的實時平臺架構呢有幾個主要的組成部分。實時平臺提供了集群管理,項目管理,任務管理和報警監控的功能。。
一、前言
這篇主要由五個部分來組成:
首先是有贊的實時平臺架構。
其次是在調研階段我們為什么選擇了 Flink。在這個部分,主要是 Flink 與 Spark 的 structured streaming 的一些對比和選擇 Flink 的原因。
第三個就是比較重點的內容,Flink 在有贊的實踐。這其中包括了我們在使用 Flink 的過程中碰到的一些坑,也有一些具體的經驗。
第四部分是將實時計算 SQL 化,界面化的一些實踐。
最后的話就是對 Flink 未來的一些展望。這塊可以分為兩個部分,一部分是我們公司接下來會怎么去更深入的使用 Flink,另一部分就是 Flink 以后可能會有的的一些新的特性。
有贊的實時平臺架構呢有幾個主要的組成部分。
首先,對于實時數據來說,一個消息中間件肯定是必不可少的。在有贊呢,除了業界常用的 Kafka 以外,還有 NSQ。與 Kafka 有別的是,NSQ 是使用 Go 開發的,所以公司封了一層 Java 的客戶端是通過 push 和 ack 的模式去保證消息至少投遞一次,所以 Connector 也會有比較大的差距,尤其是實現容錯的部分。在實現的過程中呢,參考了 Flink 官方提供的 Rabbit MQ 的連接器,結合 NSQ client 的特性做了一些改造。
接下來就是計算引擎了,最古老的就是 Storm 了,現在依然還有一些任務在 Storm 上面跑,至于新的任務基本已經不會基于它來開發了,因為除了開發成本高以外,語義的支持,SQL 的支持包括狀態管理的支持都做得不太好,吞吐量還比較低,將 Storm 的任務遷移到 Flink 上也是我們接下來的任務之一。還有呢就是 Spark Streaming 了,相對來說 Spark 有一個比較好的生態,但是 Spark Streaming 是微批處理的,這給它帶來了很多限制,除了延遲高以外還會比較依賴外部存儲來保存中間狀態。 Flink 在有贊是比較新的引擎,為什么在有了 Spark 和 Storm 的情況下我們還要引入 Flink 呢,下一個部分我會提到。
存儲引擎,除了傳統的 MySQL 以外,我們還使用 HBase ,ES 和 ZanKV。ZanKV 是我們公司開發的一個兼容 Redis 協議的分布式 KV 數據庫,所以姑且就把它當成 Redis 來理解好了。
實時 OLAP 引擎的話基于 Druid,在多維的統計上面有非常好的應用。
最后是我們的實時平臺。實時平臺提供了集群管理,項目管理,任務管理和報警監控的功能。。
關于實時平臺的架構就簡單介紹到這里,接下來是 Flink 在有贊的探索階段。在這個部分,我主要會對比的 Spark Structured Streaming。
至于為什么和 Spark Structured Streaming(SSS) 進行對比呢?因為這是實時SQL化這個大背景下比較有代表性的兩個引擎。
首先是性能上,從幾個角度來比較一下。首先是延遲,毫無疑問,Flink 作為一個流式引擎是優于 SSS 的微批引擎的。雖然說 Spark 也引入了一個連續的計算引擎,但是不管從語義的保證上,還是從成熟度上,都是不如 Flink 的。據我所知,他們是通過將 rdd 長期分配到一個結點上來實現的。
其次比較直觀的指標就是吞吐了,這一點在某些場景下 Flink 略遜于 Spark 。但是當涉及到中間狀態比較大的任務呢,Flink 基于 RocksDB 的狀態管理就顯示出了它的優勢。
?
Flink 在中間狀態的管理上可以使用純內存,也可以使用 RocksDB 。至于 RocksDB ,簡單點理解的話就是一個帶緩存的嵌入式數據庫。借助持久化到磁盤的能力,Flink 相比 SSS 來說可以保存的狀態量大得多,并且不容易OOM。并且在做 checkpoint 中選用了增量模式,應該是只需要備份與上一次 checkpoint 時不同的 sst 文件。使用過程中,發現 RocksDB 作為狀態管理性能也是可以滿足我們需求的。
聊完性能,接下來就說一說 SQL 化,這也是現在的一個大方向吧。我在開始嘗試 SSS 的時候,嘗試了一個 SQL 語句中有多個聚合操作,但是卻拋了異常。 后面仔細看了文檔,發現確實這在 SSS 中是不支持的。第二個是 distinct 也是不支持的。這兩點 Flink 是遠優于 SSS 的。所以從實時 SQL 的角度,Flink 又為自己贏得了一票。除此之外,Flink 有更靈活的窗口。還有輸出的話,同樣參考的是 DataFlow 模型,Flink 實現支持刪除并更新的操作,SSS 僅支持更新的操作。(這邊 SSS 是基于 Spark 的 2.3版本)
API 的靈活性。在 SSS 中,誠然 table 帶來了比較大的方便,但是對于有一些操作依然會想通過 DStream 或者 rdd 的形式來操作,但是 SSS 并沒有提供這樣的轉換,只能編寫一些 UDF。但是在 Flink 中,Table 和 DataStream 可以靈活地互相轉換,以應對更復雜的場景。
在真正開始使用 Flink 之前呢,第一個要考慮的就是部署的問題。因為現有的技術棧,所以選擇了部署在 Yarn 上,并且使用的是 Single Job 的模式,雖然會有更多的 ApplicationMaster,但無疑是增加了隔離性的。
4.1 問題一: FLINK-9567在開始部署的時候我遇到了一個比較奇怪的問題。先講一下背景吧,因為還處于調研階段,所以使用的是 Yarn 的默認隊列,優先級比較低,在資源緊張的時候也容易被搶占。
有一個上午,我起了一個任務,申請了5個 Container 來運行 TaskExecutor ,一個比較簡單地帶狀態的流式任務,想多跑一段時間看看穩定不穩定。這個 Flink 任務最后占了100多個 container,還在不停增加,但是只有五個 Container 在工作,其他的 container 都注冊了 slot,并且 slot 都處于閑置的狀態。以下兩張圖分別代表正常狀態下的任務,和出問題的任務。
出錯后
在涉及到這個問題細節之前,我先介紹一下 Flink 是如何和 Yarn 整合到一塊的。根據下圖,我們從下往上一個一個介紹這些組件是做什么的。
TaskExecutor 是實際任務的執行者,它可能有多個槽位,每個槽位執行一個具體的子任務。每個 TaskExecutor 會將自己的槽位注冊到 SlotManager 上,并匯報自己的狀態,是忙碌狀態,還是處于一個閑置的狀態。
SlotManager 既是 Slot 的管理者,也負責給正在運行的任務提供符合需求的槽位。還記錄了當前積壓的槽位申請。當槽位不夠的時候向Flink的ResourceManager申請容器。
Pending slots 積壓的 Slot 申請及計數器
Flink 的 ResourceManager 則負責了與 Yarn 的 ResourceManager 進行交互,進行一系列例如申請容器,啟動容器,處理容器的退出等等操作。因為采用的是異步申請的方式,所以還需要記錄當前積壓的容器申請,防止接收過多容器。
Pending container request 積壓容器的計數器
AMRMClient 是異步申請的執行者,CallbackHandler 則在接收到容器和容器退出的時候通知 Flink 的 ResourceManager。
Yarn 的 ResourceManager 則像是一個資源的分發器,負責接收容器請求,并為 Client 準備好容器。
這邊一下子引入的概念有點多,下面我用一個簡單地例子來描述一下這些組件在運行中起到的角色。
首先,我們的配置是3個 TaskManager,每個 TaskManager 有兩個 Slot,也就是總共需要6個槽位。當前已經擁有了4個槽位,任務的調度器向 Slot 申請還需要兩個槽位來運行子任務。
這時 SlotManager 發現所有的槽位都已經被占用了,所以它將這個 slot 的 request 放入了 pending slots 當中。所以可以看到 pending slots 的那個計數器從剛才的0跳轉到了現在的2. 之后 SlotManager 就向 Flink 的 ResourceManager 申請一個新的 TaskExecutor,正好就可以滿足這兩個槽位的需求。于是 Flink 的 ResourceManager 將 pending container request 加1,并通過 AMRM Client 去向 Yarn 申請資源。
當 Yarn 將相應的 Container 準備好以后,通過 CallbackHandler 去通知 Flink 的 ResourceManager。Flink 就會根據在每一個收到的 container 中啟動一個 TaskExecutor ,并且將 pending container request 減1,當 pending container request 變為0之后,即使收到新的 container 也會馬上退回。
當 TaskExecutor 啟動之后,會向 SlotManager 注冊自己的兩個 Slot 可用,SlotManager 便會將兩個積壓的 SlotRequest 完成,通知調度器這兩個子任務可以到這個新的 TaskExecutor 上執行,并且 pending requests 也被置為0. 到這兒一切都符合預期。
那這個超發的問題又是如何出現的呢?首先我們看一看這就是剛剛那個正常運行的任務。它占用了6個 Slot。
如果在這個時候,出現了一些原因導致了 TaskExecutor 非正常退出,比如說 Yarn 將資源給搶占了。這時 Yarn 就會通知 Flink 的 ResourceManager 這三個 Container 已經異常退出。所以 Flink 的 ResourceManager 會立即申請三個新的 container。在這兒會討論的是一個 worst case,因為這個問題其實也不是穩定復現的。
CallbackHandler 兩次接收到回調發現 Container 是異常退出,所以立即申請新的 Container,pending container requests 也被置為了3.
如果在這時,任務重啟,調度器會向 SlotManager 申請6個 Slot,SlotManager 中也沒有可用 Slot,就會向 Flink 的 ResourceManager 申請3個 Container,這時 pending container requests 變為了6.
最后呢結果就如圖所示,起了6個 TaskExecutor,總共12個 Slot,但是只有6個是被正常使用的,還有6個一直處于閑置的狀態。
在修復這個問題的過程中,我有兩次嘗試。第一次嘗試,在 Container 異常退出以后,我不去立即申請新的 container。但是問題在于,如果 Container 在啟動 TaskExecutor 的過程中出錯,那么失去了這種補償的機制,有些 Slot Request 會被一直積壓,因為 SlotManager 已經為它們申請了 Container。
?
第二次嘗試是在 Flink 的 ResourceManager 申請新的 container 之前先去檢查 pending slots,如果當前的積壓 slots 已經可以被積壓的 container 給滿足,那就沒有必要申請新的 container 了。
我們使用過程中踩到的第二個坑,其實是跟延遲監控相關的。例子是一個很簡單的任務,兩個 source,兩個除了 source 之外的 operator,并行度都是2. 每個 source 和 operator 它都有兩個子任務。
任務的邏輯是很簡單,但是呢當我們打開延時監控。即使是這么簡單的一個任務,它會記錄每一個 source 的子任務到每一個算子的子任務的延遲數據。這個延遲數據里還包含了平均延遲,最大延遲,百分之99的延遲等等等等。那我們可以得出一個公式,延遲數據的數量是 source 的子任務數量乘以的 source 的數量乘以算子的并行度乘以算子的數量。N = n(subtasks per source) n(sources) n(subtasks per operator) * n(operator)
這邊我做一個比較簡單地假設,那就是 source 的子任務數量和算則的子任務數量都是 p - 并行度。從下面這個公式我們可以看出,監控的數量隨著并行度的上升呈平方增長。N = p^2 n(sources) n(operator)
如果我們把上個任務提升到10個并行度,那么就會收到400份的延遲數據。這可能看起來還沒有太大的問題,這貌似并不影響組件的正常運行。
但是,在 Flink 的 dev mailing list 當中,有一個用戶反饋在開啟了延遲監控之后,JobMaster 很快就會掛掉。他收到了24000+的監控數據,并且包含這些數據的 ConcurrentHashMap 在內存中占用了1.6 G 的內存。常規情況 Flink 的 JobMaster 時會給到多少內存,我一般會配1-2 g,最后會導致長期 FullGC 和 OOM 的情況。
那怎么去解決這個問題呢?當延遲監控已經開始影響到系統的正常工作的時候,最簡單的辦法就是把它給關掉。可是把延時監控關掉,一方面我們無法得知當前任務的延時,另一方面,又沒有辦法去針對延時做一些報警的功能。
?
所以另一個解決方案就如下。首先是 Flink-10243,它提供了更多的延遲監控粒度的選項,從源頭上減少數量。比如說我們使用了 Single 模式去采集這些數據,那它只會記錄每個 operator 的子任務的延遲,忽略是從哪個 source 或是 source 的子任務中來。這樣就可以得出這樣一個公式,也能將之前我們提到的十個并行度的任務產生的400個延時監控降低到了40個。這個功能發布在了1.7.0中,并且 backport 回了1.5.5和1.6.2.
?
此外,Flink-10246 提出了改進 MetricQueryService。它包含了幾個子任務,前三個子任務為監控服務建立了一個專有的低優先級的 ActorSystem,在這里可以簡單的理解為一個獨立的線程池提供低優先級的線程去處理相關任務。它的目的也是為了防止監控任務影響到主要的組件。這個功能發布在了1.7.0中。
?
還有一個就是 Flink-10252,它還依舊處于 review 和改進當中,目的是為了控制監控消息的大小。
接下來會談一下 Flink 在有贊的一些具體應用。
?
首先是 Flink 結合 Spring。為什么要將這兩者做結合呢,首先在有贊有很多服務都只暴露了 Dubbo 的接口,而用戶往往都是通過 Spring 去獲取這個服務的 client,在實時計算的一些應用中也是如此。
?
另外,有不少數據應用的開發也是 Java 工程師,他們希望能在 Flink 中使用 Spring 以及生態中的一些組件去簡化他們的開發。用戶的需求肯定得得到滿足。接下來我會講一些錯誤的典型,以及最后是怎么去使用的。
第一個錯誤的典型就是在 Flink 的用戶代碼中啟動一個 Spring 環境,然后在算子中取調用相關的 bean。但是事實上,最后這個 Spring Context 是啟動在 client 端的,也就是提交任務的這一端,在圖中有一個紅色的方框中間寫著 Spring Context 表示了它啟動的位置。可是用戶在實際調用時確實在 TaskManager 的 TaskSlot 中,它們都處在不同的 jvm,這明顯是不合理的。所以呢我們又遇到了第二個錯誤。
第二個錯誤比第一個錯誤看起來要好多了,我們在算子中使用了 RichFunction,并且在 open 方法中通過配置文件獲取了一個 Spring Context。但是先不說一個 TaskManager 中啟動幾個 Spring Context 是不是浪費,一個 Jvm 中啟動兩個 Spring Context 就會出問題。可能有用戶就覺得,那還不簡單,把 TaskSlot 設為1不就行了。可是還有 OperatorChain 這個機制將幾個窄依賴的算子綁定到一塊運行在一個 TaskSlot 中。那我們關閉 OperatorChain 不就行了?還是不行,Flink可能會做基于 CoLocationGroup 的優化,將多個 subtask 放到一個 TaskSlot 中輪番執行。
但其實最后的解決方案還是比較容易的,無非是使用單例模式來封裝 SpringContext,確保每個jvm中只有一個,在算子函數的 open 方法中通過這個單例來獲取相應的 Bean。
可是在調用 Dubbo 服務的時候,一次響應往往最少也要在10 ms 以上。一個 TaskSlot 最大的吞吐也就在一千,可以說對性能是大大的浪費。那么解決這個問題的話可以通過異步和緩存,對于多次返回同一個值的調用可以使用緩存,提升吞吐我們可以使用異步。
4.4 具體實踐二可是如果想同時使用異步和緩存呢?剛開始我覺得這是一個挺容易實現的功能,但在實際寫 RichAsyncFunction 的時候我發現并沒有辦法使用 Flink 托管的 KeyedState。所以最初想到的方法就是做一個類似 LRU 的 Cache 去緩存數據。但是這完全不能借助到 Flink 的狀態管理的優勢。所以我研究了一下實現。
為什么不支持呢?
當一條記錄進入算子的時候,Flink 會先將 key 提取出來并將 KeyedState 指向與這個 key 關聯的存儲空間,圖上就指向了 key4 相關的存儲空間。但是如果此時 key1 關聯的異步操作完成了,希望把內容緩存起來,會將內容寫入到 key4 綁定的存儲空間。當下一次 key1 相關的記錄進入算子時,回去 key1 關聯的存儲空間查找,可是根本找不到數據,只好再次請求。
所以解決的方法是定制一個算子,每條記錄進入系統,都讓它指向同一個公用 key 的存儲空間。在這個空間使用 MapState 來做緩存。最后算子運行的 function 繼承 AbstractRichFunction 在 open 方法中來獲取 KeyedState,實現 AsyncFunction 接口來做異步操作。
最早我們使用 SDK 的方式來簡化 SQL 實時任務的開發,但是這對用戶來說也不算非常友好,所以現在講 SQL 實時任務界面化,用 Flink 作為底層引擎去執行這些任務。
在做 SQL 實時任務時,首先是外部系統的抽象,將數據源和數據池抽象為流資源,用戶將它們數據的 Schema 信息和元信息注冊到平臺中,平臺根據用戶所在的項目組管理讀寫的權限。在這里消息源的格式如果能做到統一能降低很多復雜度。比如在有贊,想要接入的用戶必須保證是 Json 格式的消息,通過一條樣例消息可以直接生成 Schema 信息。
接下來是根據用戶選擇的數據源和數據池,獲取相應的 Schema 信息和元信息,在 Flink 任務中注冊相應的外部系統 Table 連接器,再執行相應的 SQL 語句。
在 SQL 語義不支持的功能上盡量使用 UDF 的方式來拓展。
有數據源和數據池之間的元信息,還可以獲取實時任務之間可能存在的依賴關系,并且能做到整個鏈路的監控
Flink 的批處理和 ML 模塊的嘗試,會跟 Spark 進行對比,分析優劣勢。目前還處于調研階段,目前比較關注的是 Flink 和 Hive的結合,對應 FLINK-10566 這個 issue。
從 Flink 的發展來講呢,我比較關注并參與接下來對于調度和資源管理的優化。現在 Flink 的調度和任務執行圖是耦合在一塊的,使用比較簡單地調度機制。通過將調度器隔離出來,做成可插拔式的,可以應用更多的調度機制。此外,基于新的調度器,還可以去做更靈活的資源補充和減少機制,實現 Auto Scaling。這可能在接下來的版本中會是一個重要的特性。對應 FLINK-10404 和 FLINK-10429 這兩個 issue。
最后打個小廣告,有贊大數據團隊基礎設施團隊,主要負責有贊的數據平臺(DP), 實時計算(Storm, Spark Streaming, Flink),離線計算(HDFS,YARN,HIVE, SPARK SQL),在線存儲(HBase),實時 OLAP(Druid) 等數個技術產品,歡迎感興趣的小伙伴聯系 yangshimin@youzan.com
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/11438.html
摘要:第三個就是比較重點的內容,在有贊的實踐。第四部分是將實時計算化,界面化的一些實踐。二有贊實時平臺架構有贊的實時平臺架構呢有幾個主要的組成部分。實時平臺提供了集群管理,項目管理,任務管理和報警監控的功能。。 一、前言 這篇主要由五個部分來組成: 首先是有贊的實時平臺架構。 其次是在調研階段我們為什么選擇了 Flink。在這個部分,主要是 Flink 與 Spark 的 structure...
摘要:在有贊的技術演進。業務數據量正在不斷增大,這些任務會影響業務對外服務的承諾。監控需要收集上執行的的審計信息,包括提交者執行的具體,開始結束時間,執行完成狀態。還有一點是詳細介紹了的原理,實踐中設置了的比默認的減少了以上的時間。 前言 有贊數據平臺從2017年上半年開始,逐步使用 SparkSQL 替代 Hive 執行離線任務,目前 SparkSQL 每天的運行作業數量5000個,占離線...
摘要:在有贊的技術演進。業務數據量正在不斷增大,這些任務會影響業務對外服務的承諾。監控需要收集上執行的的審計信息,包括提交者執行的具體,開始結束時間,執行完成狀態。還有一點是詳細介紹了的原理,實踐中設置了的比默認的減少了以上的時間。 前言 有贊數據平臺從2017年上半年開始,逐步使用 SparkSQL 替代 Hive 執行離線任務,目前 SparkSQL 每天的運行作業數量5000個,占離線...
閱讀 2037·2021-09-30 09:47
閱讀 711·2021-09-22 15:43
閱讀 1991·2019-08-30 15:52
閱讀 2442·2019-08-30 15:52
閱讀 2552·2019-08-30 15:44
閱讀 915·2019-08-30 11:10
閱讀 3376·2019-08-29 16:21
閱讀 3302·2019-08-29 12:19