国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

使用多線程增加kafka消費(fèi)能力

adie / 807人閱讀

摘要:通過增加分區(qū)數(shù)量,能夠通過部署多個(gè)消費(fèi)者增加并行消費(fèi)能力。然后使用了飽和策略,使得多線程處理不過來的時(shí)候,能夠阻塞在的消費(fèi)線程上。多線程是為了增加效率,等是為了增加可靠性。

前提:本例適合那些沒有順序要求的消息主題。

kafka通過一系列優(yōu)化,寫入和讀取速度能夠達(dá)到數(shù)萬條/秒。通過增加分區(qū)數(shù)量,能夠通過部署多個(gè)消費(fèi)者增加并行消費(fèi)能力。但還是有很多情況下,某些業(yè)務(wù)的執(zhí)行速度實(shí)在是太慢,這個(gè)時(shí)候我們就要用到多線程去消費(fèi),提高應(yīng)用機(jī)器的利用率,而不是一味的給kafka增加壓力。

使用Spring創(chuàng)建一個(gè)kafka消費(fèi)者是非常簡(jiǎn)單的。我們選擇的方式是繼承kafka的ShutdownableThread,然后實(shí)現(xiàn)它的doWork方法即可。

參考:https://github.com/apache/kaf...

多線程消費(fèi)某個(gè)分區(qū)的數(shù)據(jù)

即然是使用多線程,我們就需要新建一個(gè)線程池。

我們創(chuàng)建了一個(gè)最大容量為20的線程池,其中有兩個(gè)參數(shù)需要注意一下。(參考《JAVA多線程使用場(chǎng)景和注意事項(xiàng)簡(jiǎn)版》)。

我們使用了了零容量的SynchronousQueue,一進(jìn)一出,避免隊(duì)列里緩沖數(shù)據(jù),這樣在系統(tǒng)異常關(guān)閉時(shí),就能排除因?yàn)樽枞?duì)列丟消息的可能。
然后使用了CallerRunsPolicy飽和策略,使得多線程處理不過來的時(shí)候,能夠阻塞在kafka的消費(fèi)線程上。

然后,我們將真正處理業(yè)務(wù)的邏輯放在任務(wù)中多線程執(zhí)行,每次執(zhí)行完畢,我們都手工的commit一次ack,表明這條消息我已經(jīng)處理了。由于是線程池認(rèn)領(lǐng)了這些任務(wù),順序性是無法保證的,可能有些任務(wù)沒有執(zhí)行完畢,后面的任務(wù)就已經(jīng)把它的offset給提交了。o.O

不過這暫時(shí)不重要,首先讓它并行化運(yùn)行就好。

可惜的是,當(dāng)我們運(yùn)行程序,直接拋出了異常,無法進(jìn)行下去。
程序直接說了:

KafkaConsumer is not safe for multi-threaded access

顯然,kafka的消費(fèi)端不是線程安全的,它拒絕你這么調(diào)用它的api。kafka的初衷是好的,想要避免一些并發(fā)環(huán)境的問題,但我確實(shí)需要使用多線程處理。

kafka消費(fèi)者通過比較調(diào)用者的線程id來判斷是否是由外部線程發(fā)起請(qǐng)求。

    long threadId = Thread.currentThread().getId();
    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
        throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
    refcount.incrementAndGet();

}


得,只能將commitSync函數(shù)放在線程外面了,先提交ack、再執(zhí)行任務(wù)。

加入管道

我們獲取的消息,可能在真正被執(zhí)行之前,會(huì)進(jìn)行一些過濾,比如一些空值或者特定條件的判斷。雖然可以直接放在消費(fèi)者線程里運(yùn)行,但顯的特別的亂,可以加入一個(gè)生產(chǎn)者消費(fèi)者模型(你可以認(rèn)為這是畫蛇添足)。這里采用的是阻塞隊(duì)列依然是SynchronousQueue,它充當(dāng)了管道的功能。

我們把任務(wù)放入管道后,立馬commit。如果線程池已經(jīng)滿了,將一直阻塞在消費(fèi)者線程里,直到有空缺。然后,我們多帶帶啟動(dòng)了一個(gè)線程,用來接收這些數(shù)據(jù),然后提交到這部分的代碼看起來大概這樣。

應(yīng)用能夠啟動(dòng)了,消費(fèi)速度賊快。

參數(shù)配置

kafka的參數(shù)非常的多,我們比較關(guān)心的有以下幾個(gè)參數(shù)。

max.poll.records

調(diào)用一次poll,返回的最大條數(shù)。這個(gè)值設(shè)置的大,那么處理的就慢,很容易超出max.poll.interval.ms的值(默認(rèn)5分鐘),造成消費(fèi)者的離線。在耗時(shí)非常大的消費(fèi)中,是需要特別注意的。

enable.auto.commit

是否開啟自動(dòng)提交(offset)如果開啟,consumer已經(jīng)消費(fèi)的offset信息將會(huì)間歇性的提交到kafka中(持久保存)

當(dāng)開啟offset自動(dòng)提交時(shí),提交請(qǐng)求的時(shí)間頻率由參數(shù)`
auto.commit.interval.ms`控制。

fetch.max.wait.ms

如果broker端反饋的數(shù)據(jù)量不足時(shí)(fetch.min.bytes),fetch請(qǐng)求等待的最長(zhǎng)時(shí)間。如果數(shù)據(jù)量滿足需要,則立即返回。

session.timeout.ms

consumer會(huì)話超時(shí)時(shí)長(zhǎng),如果在此時(shí)間內(nèi),server尚未接收到consumer任何請(qǐng)求(包括心跳檢測(cè)),那么server將會(huì)判定此consumer離線。此值越大,server等待consumer失效、rebalance時(shí)間就越長(zhǎng)。

heartbeat.interval.ms

consumer協(xié)調(diào)器與kafka集群之間,心跳檢測(cè)的時(shí)間間隔。kafka集群通過心跳判斷consumer會(huì)話的活性,以判斷consumer是否在線,如果離線則會(huì)把此consumer注冊(cè)的partition分配(assign)給相同group的其他consumer。此值必須小于“session.timeout.ms”,即會(huì)話過期時(shí)間應(yīng)該比心跳檢測(cè)間隔要大,通常為session.timeout.ms的三分之一,否則心跳檢測(cè)就失去意義。

在本例中,我們的參數(shù)簡(jiǎn)單的設(shè)置如下,主要調(diào)整了每次獲取的條數(shù)和檢測(cè)時(shí)間。其他的都是默認(rèn)。

消息保證

仔細(xì)的同學(xué)可能會(huì)看到,我們的代碼依然不是完全安全的。這是由于我們提前提交了ack導(dǎo)致的。程序正常運(yùn)行下,這無傷大雅。但在應(yīng)用異常關(guān)閉的時(shí)候,那些正在執(zhí)行中的消息,很可能會(huì)丟失,對(duì)于一致性要求非常高的應(yīng)用,我們要從兩個(gè)手段上進(jìn)行保證。

使用關(guān)閉鉤子

第一種就是考慮kill -15的情況。這種方式比較簡(jiǎn)單,只要覆蓋ShutdownableThread的shutdown方法即可,應(yīng)用將有機(jī)會(huì)執(zhí)行線程池中的任務(wù),確保消費(fèi)完畢再關(guān)閉應(yīng)用。

@Override
    public void shutdown() {
        super.shutdown();
        executor.shutdown();
}
使用日志處理

應(yīng)用oom,或者直接kill -9了,事情就變得麻煩起來。

維護(hù)一個(gè)多帶帶的日志文件(或者本地db),在commit之前寫入一條日志,然后在真正執(zhí)行完畢之后寫入一條對(duì)應(yīng)的日志。當(dāng)系統(tǒng)啟動(dòng)時(shí),讀取這些日志文件,獲取沒有執(zhí)行成功的任務(wù),重新執(zhí)行。

想要效率,還想要可靠,是得下點(diǎn)苦力氣的。

借助redis處理

這種方式與日志方式類似,但由于redis的效率很高(可達(dá)數(shù)萬),而且方便,是優(yōu)于日志方式的。

可以使用Hash結(jié)構(gòu),提交任務(wù)的同時(shí)寫入Redis,任務(wù)執(zhí)行完畢刪掉這個(gè)值,那么剩下的就是出現(xiàn)問題的消息。


在系統(tǒng)啟動(dòng)時(shí),首先檢測(cè)一下redis中是否有異常數(shù)據(jù)。如果有,首先處理這些數(shù)據(jù),然后正常消費(fèi)。

End

多線程是為了增加效率,redis等是為了增加可靠性。業(yè)務(wù)代碼是非常好編寫的,搞懂了邏輯就搞定了大部分;業(yè)務(wù)代碼有時(shí)候又是困難的,你要編寫大量輔助功能增加它的效率、照顧它的邊界。

以程序員的角度來說,最有競(jìng)爭(zhēng)力的代碼都是為了照顧小概率發(fā)生的邊界異常。

kafka在吞吐量和可靠性方面,有各種的權(quán)衡,很多都是魚和熊掌的關(guān)系。不必糾結(jié)于它本身,我們可以借助外部的工具,獲取更大的收益。在這種情況下,redis當(dāng)機(jī)與應(yīng)用同時(shí)當(dāng)機(jī)的概率還是比較小的。5個(gè)9的消息保證是可以做到的,剩下的那點(diǎn)不完美問題消息,你為什么不從日志里找呢?

擴(kuò)展閱讀:

1、JAVA多線程使用場(chǎng)景和注意事項(xiàng)簡(jiǎn)版

2、Kafka基礎(chǔ)知識(shí)索引

3、360度測(cè)試:KAFKA會(huì)丟數(shù)據(jù)么?其高可用是否滿足需求?

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/73894.html

相關(guān)文章

  • 開源一個(gè)kafka增強(qiáng):okmq-1.0.0

    摘要:只有兩個(gè)基礎(chǔ)組件同時(shí)死亡,才會(huì)受到嚴(yán)重影響。的意外死亡,造成生產(chǎn)端發(fā)送失敗。后臺(tái)會(huì)有一個(gè)線程進(jìn)行這些失敗消息的遍歷和重新投遞。二阻塞業(yè)務(wù)正常進(jìn)行。死亡,或者單獨(dú)死亡,消息最終都會(huì)被發(fā)出,僅當(dāng)與同時(shí)死亡,消息才會(huì)發(fā)送失敗,并記錄在日志文件里。 本工具的核心思想就是:賭。只有兩個(gè)基礎(chǔ)組件同時(shí)死亡,才會(huì)受到嚴(yán)重影響。哦,斷電除外。 mq是個(gè)好東西,我們都在用。這也決定了mq應(yīng)該是高高高可用的...

    PAMPANG 評(píng)論0 收藏0
  • Kafka學(xué)習(xí)筆記之掃盲

    摘要:相關(guān)概念協(xié)議高級(jí)消息隊(duì)列協(xié)議是一個(gè)標(biāo)準(zhǔn)開放的應(yīng)用層的消息中間件協(xié)議??梢杂妹钆c不同,不是線程安全的。手動(dòng)提交執(zhí)行相關(guān)邏輯提交注意點(diǎn)將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當(dāng)就能最恰當(dāng)?shù)陌l(fā)揮好的作用。 本文使用的Kafka版本0.11 先思考些問題: 我想分析一下用戶行為(pageviews),以便我能設(shè)計(jì)出更好的廣告位 我想對(duì)用戶的搜索關(guān)鍵詞進(jìn)行統(tǒng)計(jì),...

    GT 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<