摘要:而在服務器中應該充分利用多線程來處理執行邏輯。能保證所在的失效,該消息仍然可以從新選舉的中獲取,不會造成消息丟失。這意味著無需等待來自的確認而繼續發送下一批消息。
1.概述
Apache Kafka最早是由LinkedIn開源出來的分布式消息系統,現在是Apache旗下的一個子項目,并且已經成為開源領域應用最廣泛的消息系統之一。Kafka社區非常活躍,從0.9版本開始,Kafka的標語已經從“一個高吞吐量,分布式的消息系統”改為"一個分布式流平臺"。
Kafka和傳統的消息系統不同在于:
kafka是一個分布式系統,易于向外擴展。
它同時為發布和訂閱提供高吞吐量
它支持多訂閱者,當失敗時能自動平衡消費者
消息的持久化
kafka和其他消息隊列的對比:
2.入門實例 2.1生產者producer
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class UserKafkaProducer extends Thread { private final KafkaProducer2.2 消費者producer; private final String topic; private final Properties props = new Properties(); public UserKafkaProducer(String topic) { props.put("metadata.broker.list", "localhost:9092"); props.put("bootstrap.servers", "master2:6667"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer (props); this.topic = topic; } @Override public void run() { int messageNo = 1; while (true) { String messageStr = new String("Message_" + messageNo); System.out.println("Send:" + messageStr); //返回的是Future ,異步發送 producer.send(new ProducerRecord (topic, messageStr)); messageNo++; try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Properties props = new Properties(); /* 定義kakfa 服務的地址,不需要將所有broker指定上 */ props.put("bootstrap.servers", "localhost:9092"); /* 制定consumer group */ props.put("group.id", "test"); /* 是否自動確認offset */ props.put("enable.auto.commit", "true"); /* 自動確認offset的時間間隔 */ props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); /* key的序列化類 */ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* value的序列化類 */ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* 定義consumer */ KafkaConsumer3.Kafka架構原理consumer = new KafkaConsumer<>(props); /* 消費者訂閱的topic, 可同時訂閱多個 */ consumer.subscribe(Arrays.asList("foo", "bar")); /* 讀取數據,讀取超時時間為100ms */ while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
對于kafka的架構原理我們先提出幾個問題?
1.Kafka的topic和分區內部是如何存儲的,有什么特點?
2.與傳統的消息系統相比,Kafka的消費模型有什么優點?
3.Kafka如何實現分布式的數據存儲與數據讀取?
3.1Kafka架構圖 3.2kafka名詞解釋在一套kafka架構中有多個Producer,多個Broker,多個Consumer,每個Producer可以對應多個Topic,每個Consumer只能對應一個ConsumerGroup。
整個Kafka架構對應一個ZK集群,通過ZK管理集群配置,選舉Leader,以及在consumer group發生變化時進行rebalance。
3.3Topic和Partition在Kafka中的每一條消息都有一個topic。一般來說在我們應用中產生不同類型的數據,都可以設置不同的主題。一個主題一般會有多個消息的訂閱者,當生產者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生產者寫入的新消息。
kafka為每個主題維護了分布式的分區(partition)日志文件,每個partition在kafka存儲層面是append log。任何發布到此partition的消息都會被追加到log文件的尾部,在分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,也就是我們的offset,offset是一個long型的數字,我們通過這個offset可以確定一條在該partition下的唯一消息。在partition下面是保證了有序性,但是在topic下面沒有保證有序性。
在上圖中在我們的生產者會決定發送到哪個Partition。
1.如果沒有Key值則進行輪詢發送。
2.如果有Key值,對Key值進行Hash,然后對分區數量取余,保證了同一個Key值的會被路由到同一個分區,如果想隊列的強順序一致性,可以讓所有的消息都設置為同一個Key。
3.4消費模型消息由生產者發送到kafka集群后,會被消費者消費。一般來說我們的消費模型有兩種:推送模型(psuh)和拉取模型(pull)
基于推送模型的消息系統,由消息代理記錄消費狀態。消息代理將消息推送到消費者后,標記這條消息為已經被消費,但是這種方式無法很好地保證消費的處理語義。比如當我們把已經把消息發送給消費者之后,由于消費進程掛掉或者由于網絡原因沒有收到這條消息,如果我們在消費代理將其標記為已消費,這個消息就永久丟失了。如果我們利用生產者收到消息后回復這種方法,消息代理需要記錄消費狀態,這種不可取。如果采用push,消息消費的速率就完全由消費代理控制,一旦消費者發生阻塞,就會出現問題。
Kafka采取拉取模型(poll),由自己控制消費速度,以及消費的進度,消費者可以按照任意的偏移量進行消費。比如消費者可以消費已經消費過的消息進行重新處理,或者消費最近的消息等等。
3.5網絡模型 3.5.1 KafkaClient --單線程Selector單線程模式適用于并發鏈接數小,邏輯簡單,數據量小。
在kafka中,consumer和producer都是使用的上面的單線程模式。這種模式不適合kafka的服務端,在服務端中請求處理過程比較復雜,會造成線程阻塞,一旦出現后續請求就會無法處理,會造成大量請求超時,引起雪崩。而在服務器中應該充分利用多線程來處理執行邏輯。
3.5.2 Kafka--server -- 多線程Selector在kafka服務端采用的是多線程的Selector模型,Acceptor運行在一個多帶帶的線程中,對于讀取操作的線程池中的線程都會在selector注冊read事件,負責服務端讀取請求的邏輯。成功讀取后,將請求放入message queue共享隊列中。然后在寫線程池中,取出這個請求,對其進行邏輯處理,即使某個請求線程阻塞了,還有后續的縣城從消息隊列中獲取請求并進行處理,在寫線程中處理完邏輯處理,由于注冊了OP_WIRTE事件,所以還需要對其發送響應。
3.6高可靠分布式存儲模型在Kafka中保證高可靠模型的依靠的是副本機制,有了副本機制之后,就算機器宕機也不會發生數據丟失。
3.6.1高性能的日志存儲kafka一個topic下面的所有消息都是以partition的方式分布式的存儲在多個節點上。同時在kafka的機器上,每個Partition其實都會對應一個日志目錄,在目錄下面會對應多個日志分段(LogSegment)。LogSegment文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示為segment索引文件和數據文件。這兩個文件的命令規則為:partition全局的第一個segment從0開始,后續每個segment文件名為上一個segment文件最后一條消息的offset值,數值大小為64位,20位數字字符長度,沒有數字用0填充,如下,假設有1000條消息,每個LogSegment大小為100,下面展現了900-1000的索引和Log:
由于kafka消息數據太大,如果全部建立索引,即占了空間又增加了耗時,所以kafka選擇了稀疏索引的方式,這樣的話索引可以直接進入內存,加快偏查詢速度。
簡單介紹一下如何讀取數據,如果我們要讀取第911條數據首先第一步,找到他是屬于哪一段的,根據二分法查找到他屬于的文件,找到0000900.index和00000900.log之后,然后去index中去查找 (911-900) =11這個索引或者小于11最近的索引,在這里通過二分法我們找到了索引是[10,1367]然后我們通過這條索引的物理位置1367,開始往后找,直到找到911條數據。
上面講的是如果要找某個offset的流程,但是我們大多數時候并不需要查找某個offset,只需要按照順序讀即可,而在順序讀中,操作系統會對內存和磁盤之間添加page cahe,也就是我們平常見到的預讀操作,所以我們的順序讀操作時速度很快。但是kafka有個問題,如果分區過多,那么日志分段也會很多,寫的時候由于是批量寫,其實就會變成隨機寫了,隨機I/O這個時候對性能影響很大。所以一般來說Kafka不能有太多的partition。針對這一點,RocketMQ把所有的日志都寫在一個文件里面,就能變成順序寫,通過一定優化,讀也能接近于順序讀。
可以思考一下:1.為什么需要分區,也就是說主題只有一個分區,難道不行嗎?2.日志為什么需要分段
1.分區是為了水平擴展3.6.2副本機制
2.日志如果在同一個文件太大會影響性能。如果日志無限增長,查詢速度會減慢
Kafka的副本機制是多個服務端節點對其他節點的主題分區的日志進行復制。當集群中的某個節點出現故障,訪問故障節點的請求會被轉移到其他正常節點(這一過程通常叫Reblance),kafka每個主題的每個分區都有一個主副本以及0個或者多個副本,副本保持和主副本的數據同步,當主副本出故障時就會被替代。
在Kafka中并不是所有的副本都能被拿來替代主副本,所以在kafka的leader節點中維護著一個ISR(In sync Replicas)集合,翻譯過來也叫正在同步中集合,在這個集合中的需要滿足兩個條件:
節點必須和ZK保持連接
在同步的過程中這個副本不能落后主副本太多
另外還有個AR(Assigned Replicas)用來標識副本的全集,OSR用來表示由于落后被剔除的副本集合,所以公式如下:ISR = leader + 沒有落后太多的副本; AR = OSR+ ISR;
這里先要說下兩個名詞:HW(高水位)是consumer能夠看到的此partition的位置,LEO是每個partition的log最后一條Message的位置。HW能保證leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取,不會造成消息丟失。
當producer向leader發送數據時,可以通過request.required.acks參數來設置數據可靠性的級別:
1(默認):這意味著producer在ISR中的leader已成功收到的數據并得到確認后發送下一條message。如果leader宕機了,則會丟失數據。
0:這意味著producer無需等待來自broker的確認而繼續發送下一批消息。這種情況下數據傳輸效率最高,但是數據可靠性確是最低的。
-1:producer需要等待ISR中的所有follower都確認接收到數據后才算一次發送完成,可靠性最高。但是這樣也不能保證數據不丟失,比如當ISR中只有leader時(其他節點都和zk斷開連接,或者都沒追上),這樣就變成了acks=1的情況。
4.高可用模型及冪等?在分布式系統中一般有三種處理語義:
at-least-once:
至少一次,有可能會有多次。如果producer收到來自ack的確認,則表示該消息已經寫入到Kafka了,此時剛好是一次,也就是我們后面的exactly-once。但是如果producer超時或收到錯誤,并且request.required.acks配置的不是-1,則會重試發送消息,客戶端會認為該消息未寫入Kafka。如果broker在發送Ack之前失敗,但在消息成功寫入Kafka之后,這一次重試將會導致我們的消息會被寫入兩次,所以消息就不止一次地傳遞給最終consumer,如果consumer處理邏輯沒有保證冪等的話就會得到不正確的結果。
在這種語義中會出現亂序,也就是當第一次ack失敗準備重試的時候,但是第二消息已經發送過去了,這個時候會出現單分區中亂序的現象,我們需要設置Prouducer的參數max.in.flight.requests.per.connection,flight.requests是Producer端用來保存發送請求且沒有響應的隊列,保證Producer端未響應的請求個數為1。
at-most-once:
如果在ack超時或返回錯誤時producer不重試,也就是我們講request.required.acks=-1,則該消息可能最終沒有寫入kafka,所以consumer不會接收消息。
exactly-once:
剛好一次,即使producer重試發送消息,消息也會保證最多一次地傳遞給consumer。該語義是最理想的,也是最難實現的。在0.10之前并不能保證exactly-once,需要使用consumer自帶的冪等性保證。0.11.0使用事務保證了
4.1 如何實現exactly-once要實現exactly-once在Kafka 0.11.0中有兩個官方策略:
4.1.1單Producer單Topic每個producer在初始化的時候都會被分配一個唯一的PID,對于每個唯一的PID,Producer向指定的Topic中某個特定的Partition發送的消息都會攜帶一個從0單調遞增的sequence number。
在我們的Broker端也會維護一個維度為
如果消息序號比Broker維護的序號大一以上,說明中間有數據尚未寫入,也即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
如果消息序號小于等于Broker維護的序號,說明該消息已被保存,即為重復消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
如果消息序號剛好大一,就證明是合法的
上面所說的解決了兩個問題:
1.當Prouducer發送了一條消息之后失敗,broker并沒有保存,但是第二條消息卻發送成功,造成了數據的亂序。
2.當Producer發送了一條消息之后,broker保存成功,ack回傳失敗,producer再次投遞重復的消息。
上面所說的都是在同一個PID下面,意味著必須保證在單個Producer中的同一個seesion內,如果Producer掛了,被分配了新的PID,這樣就無法保證了,所以Kafka中又有事務機制去保證。
4.1.2事務在kafka中事務的作用是
實現exactly-once語義
保證操作的原子性,要么全部成功,要么全部失敗。
有狀態的操作的恢復
事務可以保證就算跨多個
關于消息隊列或者Kafka的一些常見的面試題,通過上面的文章可以提煉出以下幾個比較經典的問題:
為什么使用消息隊列?消息隊列的作用是什么?
Kafka的topic和分區內部是如何存儲的,有什么特點?
與傳統的消息系統相比,Kafka的消費模型有什么優點?
Kafka如何實現分布式的數據存儲與數據讀取?
kafka為什么比rocketmq支持的單機partion要少?
為什么需要分區,也就是說主題只有一個分區,難道不行嗎?
日志為什么需要分段?
kafka是依靠什么機制保持高可靠,高可用?
消息隊列如何保證消息冪等?
讓你自己設計個消息隊列,你會怎么設計,會考慮哪些方面?
大部分問題都可以從上面總結后找到答案,如果還不會的話就關注我的公眾號,讓我為你解答吧。
最后這篇文章被我收錄于JGrowing-中間件篇,一個全面,優秀,由社區一起共建的Java學習路線,如果您想參與開源項目的維護,可以一起共建,github地址為:https://github.com/javagrowin...
麻煩給個小星星喲。
打個廣告,如果你覺得這篇文章對你有文章,可以關注我的技術公眾號,你的關注和轉發是對我最大的支持,O(∩_∩)O
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/73523.html
摘要:而每個文件系統又可以設置不同的調度算法,另外,還有虛擬內存缺頁中斷帶來的性能毛刺良心的提供了調優的腳本,這點做的不錯跑題了。測試環境核線程內存磁盤讀寫左右虛擬內存未關閉,大小測試注意點為了防止緩存的影響,每次都生成一個新的文件進行讀取。 前言 Java 在 JDK 1.4 引入了 ByteBuffer 等 NIO 相關的類,使得 Java 程序員可以拋棄基于 Stream ,從而使用基...
摘要:由于配置流是從關系型數據庫中讀取,速度較慢,導致實時數據流流入數據的時候,配置信息還未發送,這樣會導致有些實時數據讀取不到配置信息。從數據庫中解析出來,再去統計近兩周占比。 Flink 學習 https://github.com/zhisheng17/flink-learning 麻煩路過的各位親給這個項目點個 star,太不易了,寫了這么多,算是對我堅持下來的一種鼓勵吧! showI...
摘要:通過消息中間件來通信的話,系統組件間的耦合度就大大降低。所以,消息中間件的最主要的作用是解耦。消息中間件的核心是消息隊列。是阿里開源的消息中間件,它是純開發,具有高吞吐量高可用性適合大規模分布式系統應用的特點。云計算服務商除了提供云主機、云存儲、云數據庫這些最常用的服務外,通常也會提供一些軟件服務,消息中間件就是比較常用的一種基礎軟件。消息中間件對于分布式系統來說,是一個非常重要的組成部分,...
摘要:以下為大家整理了阿里巴巴史上最全的面試題,涉及大量面試知識點和相關試題。的內存結構,和比例。多線程多線程的幾種實現方式,什么是線程安全。點擊這里有一套答案版的多線程試題。線上系統突然變得異常緩慢,你如何查找問題。 以下為大家整理了阿里巴巴史上最全的 Java 面試題,涉及大量 Java 面試知識點和相關試題。 JAVA基礎 JAVA中的幾種基本數據類型是什么,各自占用多少字節。 S...
閱讀 3326·2021-11-23 09:51
閱讀 2454·2021-11-09 09:46
閱讀 1487·2019-08-30 15:54
閱讀 3134·2019-08-30 14:22
閱讀 2917·2019-08-29 12:40
閱讀 1639·2019-08-26 10:33
閱讀 1786·2019-08-23 17:09
閱讀 1563·2019-08-23 16:11