摘要:同時它也支持多線程的方式,每個線程消費指定分區進行消費。我們可以在一個消費組中創建多個消費實例來達到高可用高容錯的特性,不會出現單線程以及獨立消費者掛掉之后數據不能消費的情況。
前言
之前寫過一篇《從源碼分析如何優雅的使用 Kafka 生產者》 ,有生產者自然也就有消費者。
建議對 Kakfa 還比較陌生的朋友可以先看看。
就我的使用經驗來說,大部分情況都是處于數據下游的消費者角色。也用 Kafka 消費過日均過億的消息(不得不佩服 Kakfa 的設計),本文將借助我使用 Kakfa 消費數據的經驗來聊聊如何高效的消費數據。
單線程消費以之前生產者中的代碼為例,事先準備好了一個 Topic:data-push,3個分區。
先往里邊發送 100 條消息,沒有自定義路由策略,所以消息會均勻的發往三個分區。
先來談談最簡單的單線程消費,如下圖所示:
由于數據散列在三個不同分區,所以單個線程需要遍歷三個分區將數據拉取下來。
單線程消費的示例代碼:
這段代碼大家在官網也可以找到:將數據取出放到一個內存緩沖中最后寫入數據庫的過程。
先不討論其中的 offset 的提交方式。
通過消費日志可以看出:
取出的 100 條數據確實是分別遍歷了三個分區。
單線程消費雖然簡單,但存在以下幾個問題:
效率低下。如果分區數幾十上百個,單線程無法高效的取出數據。
可用性很低。一旦消費線程阻塞,甚至是進程掛掉,那么整個消費程序都將出現問題。
多線程消費既然單線程有諸多問題,那是否可以用多線程來提高效率呢?
在多線程之前不得不將消費模式分為兩種進行探討:消費組、獨立消費者。
這兩種消費模式對應的處理方式有著很大的不同,所以很有必要多帶帶來講。
獨立消費者模式先從獨立消費者模式談起,這種模式相對于消費組來說用的相對小眾一些。
看一個簡單示例即可知道它的用法:
值得注意的是:獨立消費者可以不設置 group.id 屬性。
也是發送100條消息,消費結果如下:
通過 API 可以看出:我們可以手動指定需要消費哪些分區。
比如 data-push Topic 有三個分區,我可以手動只消費其中的 1 2 分區,第三個可以視情況來消費。
同時它也支持多線程的方式,每個線程消費指定分區進行消費。
為了直觀,只發送了 10 條數據。
根據消費結果可以看出:
c1 線程只取 0 分區;c2 只取 1 分區;c3 只取 2 分區的數據。
甚至我們可以將消費者多進程部署,這樣的消費方式如下:
假設 Topic:data-push 的分區數為 4 個,那我們就可以按照圖中的方式創建兩個進程。
每個進程內有兩個線程,每個線程再去消費對應的分區。
這樣當我們性能不夠新增 Topic 的分區數時,消費者這邊只需要這樣水平擴展即可,非常的靈活。
這種自定義分區消費的方式在某些場景下還是適用的,比如生產者每次都將某一類的數據只發往一個分區。這樣我們就可以只針對這一個分區消費。
但這種方式有一個問題:可用性不高,當其中一個進程掛掉之后;該進程負責的分區數據沒法轉移給其他進程處理。
消費組模式消費組模式應當是使用最多的一種消費方式。
我們可以創建 N 個消費者實例(new KafkaConsumer()),當這些實例都用同一個 group.id 來創建時,他們就屬于同一個消費組。
在同一個消費組中的消費實例可以收到消息,但一個分區的消息只會發往一個消費實例。
還是借助官方的示例圖來更好的理解它。
某個 Topic 有四個分區 p0 p1 p2 p3,同時創建了兩個消費組 groupA,groupB。
A 消費組中有兩個消費實例 C1、C2。
B 消費組中有四個消費實例 C3、C4、C5、C6。
這樣消息是如何劃分到每個消費實例的呢?
通過圖中可以得知:
A 組中的 C1 消費了 P0 和 P3 分區;C2 消費 P1、P2 分區。
B 組有四個實例,所以每個實例消費一個分區;也就是消費實例和分區是一一對應的。
需要注意的是:
這里的消費實例簡單的可以理解為 new KafkaConsumer,它和進程沒有關系。
比如說某個 Topic 有三個分區,但是我啟動了兩個進程來消費它。
其中每個進程有兩個消費實例,那其實就相當于有四個實例了。
這時可能就會問 4 個實例怎么消費 3 個分區呢?
消費組自平衡這個 Kafka 已經幫我做好了,它會來做消費組里的 Rebalance。
比如上面的情況,3 個分區卻有 4 個消費實例;最終肯定只有三個實例能取到消息。但至于是哪三個呢,這點 Kakfa 會自動幫我們分配好。
看個例子,還在之前的 data-push 這個 Topic,其中有三個分區。
當其中一個進程(其中有三個線程,每個線程對應一個消費實例)時,消費結果如下:
里邊的 20 條數據都被這個進程的三個實例消費掉。
這時我新啟動了一個進程,程序和上面那個一模一樣;這樣就相當于有兩個進程,同時就是 6 個實例。
我再發送 10 條消息會發現:
進程1 只取到了分區 1 里的兩條數據(之前是所有數據都是進程1里的線程獲取的)。
同時進程2則消費了剩下的 8 條消息,分別是分區 0、2 的數據(總的還是只有三個實例取到了數據,只是分別在不同的進程里)。
當我關掉進程2,再發送10條數據時會發現所有數據又被進程1里的三個線程消費了。
通過這些測試相信大家已經可以看到消費組的優勢了。
我們可以在一個消費組中創建多個消費實例來達到高可用、高容錯的特性,不會出現單線程以及獨立消費者掛掉之后數據不能消費的情況。同時基于多線程的方式也極大的提高了消費效率。
而當新增消費實例或者是消費實例掛掉時 Kakfa 會為我們重新分配消費實例與分區的關系就被稱為消費組 Rebalance。
發生這個的前提條件一般有以下幾個:
消費組中新增消費實例。
消費組中消費實例 down 掉。
訂閱的 Topic 分區數發生變化。
如果是正則訂閱 Topic 時,匹配的 Topic 數發生變化也會導致 Rebalance。
所以推薦使用這樣的方式消費數據,同時擴展性也非常好。當性能不足新增分區時只需要啟動新的消費實例加入到消費組中即可。
總結本次只分享了幾個不同消費數據的方式,并沒有著重研究消費參數、源碼;這些內容感興趣的話可以在下次分享。
文中提到的部分源碼可以在這里查閱:
https://github.com/crossoverJie/JCSprout
歡迎關注公眾號一起交流:
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72274.html
摘要:中有一個微軟團隊的分享。微軟有一套服務化的數據管道,作為云產品售賣。結尾微軟用主要目的還是為了更容易使用流計算等開源軟件,從安全性使用上而言,在收集端消費端監控等仍有非常多的點需要提高。 Kafka Summit 2016中有一個微軟MS/Bing團隊的分享。看了數據給大家分析下。微軟有一套服務化的數據管道EventHub,作為云產品售賣。但在Bing、Ads、Office等場景上仍在使用K...
閱讀 1572·2021-10-25 09:44
閱讀 2937·2021-09-04 16:48
閱讀 1564·2019-08-30 15:44
閱讀 2509·2019-08-30 15:44
閱讀 1738·2019-08-30 15:44
閱讀 2825·2019-08-30 14:14
閱讀 2977·2019-08-30 13:00
閱讀 2152·2019-08-30 11:09