摘要:相關概念協議高級消息隊列協議是一個標準開放的應用層的消息中間件協議。可以用命令與不同,不是線程安全的。手動提交執行相關邏輯提交注意點將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當就能最恰當的發揮好的作用。
本文使用的Kafka版本0.11
先思考些問題:
我想分析一下用戶行為(pageviews),以便我能設計出更好的廣告位
我想對用戶的搜索關鍵詞進行統計,分析出當前的流行趨勢。這個很有意思,在經濟學上有個長裙理論,就是說,如果長裙的銷量高了,說明經濟不景氣了,因為姑娘們沒錢買各種絲襪了。
有些數據,我覺得存數據庫浪費,直接存硬盤又怕到時候操作效率低。
這個時候,我們就可以用到分布式消息系統了。雖然上面的描述更偏向于一個日志系統,但確實kafka在實際應用中被大量的用于日志系統。
這些場景都有一個共同點:數據是由上游模塊產生,上游模塊,使用上游模塊的數據計算、統計、分析,這個時候就可以使用消息系統,尤其是分布式消息系統!
Kafka是一個分布式消息系統,由linkedin使用scala編寫. Kafka的動態擴容是通過Zookeeper來實現的。
Zookeeper是一種在分布式系統中被廣泛用來作為:分布式狀態管理、分布式協調管理、分布式配置管理、和分布式鎖服務的集群。kafka增加和減少服務器都會在Zookeeper節點上觸發相應的事件。
1、 AMQP協議(Advanced Message Queuing Protocol,高級消息隊列協議)
AMQP是一個標準開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了通過網絡發送的字節流的數據格式。因此兼容性非常好,任何實現AMQP協議的程序都可以和與AMQP協議兼容的其他程序交互,可以很容易做到跨語言,跨平臺。
2、 一些基本的概念
消費者:(Consumer):從消息隊列中請求消息的客戶端應用程序
生產者:(Producer) :向broker發布消息的應用程序
AMQP服務端(broker):用來接收生產者發送的消息并將這些消息路由給服務器中的隊列,便于fafka將生產者發送的消息,動態的添加到磁盤并給每一條消息一個偏移量,所以對于kafka一個broker就是一個應用程序的實例
主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業務一個主題。
分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
生產者生產(push)消息、kafka集群、消費者獲取(pull)消息這樣一種架構,kafka集群中的消息,是通過Topic(主題)來進行組織的. 生產者可以選擇自己喜歡的序列化方法對消息內容編碼。
kafka分區是提高kafka性能的關鍵所在,當你發現你的集群性能不高時,常用手段就是增加Topic的分區,分區里面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。
簡化圖如下:
我們看上面的圖,我們把broker的數量減少,只有一臺。現在假設我們按照上圖進行部署:
Server-1 broker其實就是kafka的server,因為producer和consumer都要去連它。Broker主要還是做存儲用。
Server-2是zookeeper的server端,在這里你可以先想象,它維持了一張表,記錄了各個節點的IP、端口等信息(以后還會講到,它里面還存了kafka的相關信息)。
Server-3、4、5他們的共同之處就是都配置了zkClient,這之間的連接都是需要zookeeper來進行分發的。
Server-1和Server-2的關系,他們可以放在一臺機器上,也可以分開放,zookeeper也可以配集群。目的是防止某一臺掛了。
kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除.日志文件將會根據broker中的配置要求,保留一定的時間之后刪除;比如log文件保留2天,那么兩天后,文件會被清除,無論其中的消息是否被消費.但kafka并沒有提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;kafka只能使用作為"常規"的消息系統,在一定程度上,無法確保消息的發送與接收絕對可靠(比如,消息重發,消息發送丟失等)
對于consumer而言,它需要保存消費消息的offset,對于offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費.事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值..(offset將會保存在zookeeper中)
kafka集群幾乎不需要維護任何consumer和producer狀態信息,這些信息有zookeeper保存;因此producer和consumer的實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響.
partitions的目的有多個.最根本原因是kafka基于文件存儲.通過分區,可以將日志內容分散到多個上,來避免文件尺寸達到單機磁盤的上限;可以將一個topic切分多任意多個partitions.此外越多的partitions意味著可以容納更多的consumer,有效提升并發消費的能力.
每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費(而不是該group下的所有consumer,一定要注意這點).
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡.
如果所有的consumer都具有不同的group,那這就是"發布-訂閱";消息將會廣播給所有的消費者.
在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息
注意:kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的本身特性。
在分布式方面:
broker的部署是一種no central master的概念,并且每個節點都是同等的,節點的增加和減少都不需要改變任何配置。
producer和consumer通過zookeeper去發現topic,并且通過zookeeper來協調生產和消費的過程。
producer、consumer和broker均采用TCP連接,通信基于NIO實現。Producer和consumer能自動檢測broker的增加和減少。
使用場景:
常規消息系統。
kafka可以作為"網站活性跟蹤"的最佳工具;可以將網頁/用戶操作等信息發送到kafka中.并實時監控,或者離線統計分析等
kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統化的存儲和分析系統.
簡單說下整個系統運行的順序:
1.啟動zookeeper的server
2.啟動kafka的server
3.Producer如果生產了數據,會先通過zookeeper找到broker,然后將數據存放進broker
4.Consumer如果要消費數據,會先通過zookeeper找對應的broker,然后消費。
本地單擊測試環境啟動順序:
1.啟動zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties &
2.啟動kafka server: bin/kafka-server-start.sh ../config/server.properties &
3.Kafka為我們提供了一個console來做連通性測試,
先創建一個topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ,你可以運行bin/kafka-topics.sh --list --zookeeper localhost:2181來檢查是否創建成功和topic列表
運行producer(默認broker端口9092):bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 這是相當于開啟了一個producer的命令行。
4.接下來運行consumer,新啟一個terminal:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
5.執行完consumer的命令后,你可以在producer的terminal中輸入信息,馬上在consumer的terminal中就會出現你輸的信息。有點兒像一個通信客戶端。
配置項:見http://kafka.apache.org/docum...
必要配置項:
broker.id
log.dirs
zookeeper.connect
編程APIDOC:http://kafka.apache.org/0110/...
官方github例子: https://github.com/apache/kaf...
org.apache.kafka kafka-clients 0.11.0.0 org.apache.kafka kafka-streams 0.11.0.0
首先貼一下官方例子:
Producer:
public class MyKafkaProducer { public static void main(String[] args) { /** * 這個例子中,每次調用都會創建一個Producer實例,但此處只是為了演示方便,實際使用中,請將Producer作為單例使用,它是線程安全的。 * 從Kafka 0.11 開始,KafkaProducer支持兩種額外的模式:冪等(idempotent)與事務(transactional)。冪等使得之前的at least once變成exactly once傳送 * 冪等Producer的重試不再會導致重復消息。事務允許應用程序以原子方式將消息發送到多個分區(和主題!) * 開啟idempotence冪等:props.put("enable.idempotence", true);設置之后retries屬性自動被設為Integer.MAX_VALUE;;acks屬性自動設為all;;max.inflight.requests.per.connection屬性自動設為1.其余一樣。 * 開啟事務性: props.put("transactional.id", "my-transactional-id");一旦這個屬性被設置,那么冪等也會自動開啟。然后使用事務API操作即可 */ } private static void send(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("enable.idempotence", true);//開啟idempotence冪等 extract-once // props.put("acks", "all");//acks配置控制請求被認為完成的條件 // props.put("retries", 0);重試次數 // props.put("batch.size", 16384); // props.put("linger.ms", 1); // props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); } private static void sendInTx(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id");//要啟用事務,必須配置一個唯一的事務id /** * http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html * KafkaProducer類是線程安全的,可以在多線程之間共享。 */ Producer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++){ // send()是異步的,會立即返回,內部是緩存到producer的buffer中,以便于生產者可以批量提交, 你也可以傳遞一個回調send(ProducerRecord record, Callback callback) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); } producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { //無法恢復的異常,我們只能關閉producer producer.close(); } catch (KafkaException e) { // 可恢復的異常,終止事務然后重試即可。 producer.abortTransaction(); } producer.close(); } }
發送完之后,我們可以用bin目錄下的kafka-console-consumer來看發送的結果(當然現在用的topic是test)。可以用命令:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Consumer:
/** *與producer不同,Kafka consumer不是線程安全的。 */ public class MyKafkaConsumer { /** * 通過配置enable.auto.commit,auto.commit.interval.ms來定期自動提交消費的偏移量 */ private void recieveByAutoCommitOffset(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } // consumer.wakeup(); } /** * 手動提交消費的偏移量,這樣用戶可以控制記錄何時被視為已消費,從而提交其偏移量。 當消息的消耗與一些處理邏輯相結合時,這是有用的,因為在完成處理之前不應將消息視為已消費。 */ private void recieveByManualCommitOffset(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false");//手動提交offset props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List > buffer = new ArrayList<>(); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { // insertIntoDb(buffer); 執行相關邏輯 consumer.commitSync();//提交offset buffer.clear(); } } } }
Streams:
public class MyKafkaStreams { public void test(){ Mapprops = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream("my-input-topic").mapValues(value -> value.toString()+"!!!").to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); } }
注意點:
將producer寫成單例模式,有助于減少zookeeper端占用的資源。Producer自身是線程安全的類,只要封裝得當就能最恰當的發揮好producer的作用。(ZkClient去連接zookeeper的server時候都會創建sendThread和eventThread兩個線程,其中sendThread主要用于client與server端之間的網絡連接,真正的處理線程由eventThread來執行。Zookeeper是一個分布式的協調框架,而分布式應用中經常會出現動態的增加或刪除節點的操作,所以為了實時了解分布式整個節點的數量和基本信息,就有必要維護一個長連接的線程與服務端保持連接。另外zookeeper連接時占用的時間也比較長,如果每次生產數據時都連接發起一次連接勢必造成了大量時間的耗費。)
kafka是將消息按照topic的形式存儲,一個topic會按照partition存在同一個文件夾下,目錄在config/server.properties中指定:
# The directory under which to store log files log.dir=/tmp/kafka-logs
在消息系統中都會有這樣一個問題存在,數據消費狀態這個信息到底存哪里。是存在consumer端,還是存在broker端。對于這樣的爭論,一般會出現三種情況:
At most once :消息一旦發出就立馬標記已消費,不會再有第二發生即使失敗了,缺點是容易丟失消息。
At least once :消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功.
Exactly once :每個消息僅發生一次,而且一次就能確保到達。這是理想狀態。(kafka0.11支持冪等之后,在開啟冪等的情況下,就是這種模式)
at most once: 消費者fetch消息,然后保存offset,然后處理消息;當client保存offset之后,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理.那么此后"未處理"的消息將不能被fetch到,這就是"atmost once".
at least once: 消費者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態.
logback-kafka集成例子https://github.com/xbynet/log...
參考:http://kafka.apache.org/docum...
http://kafka.apache.org/intro...
https://my.oschina.net/ielts0...
http://blog.csdn.net/my_bai/a...
http://www.infoq.com/cn/artic...
http://www.cnblogs.com/likehu...
http://www.cnblogs.com/likehu...
https://www.iteblog.com/archi...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67313.html
摘要:作者胡夕人人貸計算平臺部總監,將在這篇專欄中一步一步的教你填平這些坑,全面提升你的實戰能力搭配掘金小冊圖解之核心原理學習效果更佳哦送學習筆記 showImg(https://segmentfault.com/img/bVbsg9O?w=258&h=258);關注有課學微信公眾號,回復暗號 kafka 獲取購買《Kafka核心技術與實戰》極客時間專欄地址,購買成功后提交購買截圖即可獲得返...
摘要:可靠性一旦數據更新成功,將一直保持,直到新的更新。這是一種主動的分布式數據結構,能夠在外部情況發生變化時候主動修改數據項狀態的數據機構。如果監視節點狀態發生變化,則跳轉到第步,繼續進行后續的操作,直到退出鎖競爭。 題外話:從字面上來看,ZooKeeper表示動物園管理員,而Hadoop生態系統中,許多項目的Logo都采用了動物,比如Hadoop采用了大象的形象,所以可以ZooKeepe...
閱讀 900·2021-10-27 14:19
閱讀 1119·2021-10-15 09:42
閱讀 1542·2021-09-14 18:02
閱讀 749·2019-08-30 13:09
閱讀 2996·2019-08-29 15:08
閱讀 2097·2019-08-28 18:05
閱讀 962·2019-08-26 10:25
閱讀 2794·2019-08-23 16:28