摘要:慕課網流處理平臺學習總結時間年月日星期日說明本文部分內容均來自慕課網。
慕課網《Kafka流處理平臺》學習總結
時間:2018年09月09日星期日
說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com
教學源碼:無
學習源碼:https://github.com/zccodere/s...
第一章:課程介紹 1-1 課程介紹課程介紹
Kafka概念解析
Kafka結構設計
Kafka場景應用
Kafka高級特性
第二章:概念解析 2-1 發展背景LinkedIn 開源
Databus 分布式數據同步系統
Cubert 高性能計算引擎
ParSeq Java異步處理框架
Kafka 分布式發布訂閱消息系統,流處理平臺
Kafka發展歷程
LinkedIn 開發
2011年初開源,加入Apache基金會
2012年從Apache Incubator畢業
Apache頂級開源項目
Kafka的特性
可以發布和訂閱且記錄數據的流,類似于消息隊列
數據流存儲的平臺,具備容錯能力
在數據產生時就可以進行處理
Kafka通常被用于
構建實時數據流管道
構建實時數據流處理
Kafka是什么
面向于數據流的生產、轉換、存儲、消費整體的流處理平臺
Kafka不僅僅是一個消息隊列
2-2 基本概念Producer:數據生產者
消息和數據的生產者
向Kafka的一個topic發布消息的進程或代碼或服務
Consumer:數據消費者
消息和數據的消費者
向Kafka訂閱數據(topic)并且處理其發布的消息的進程或代碼或服務
Consumer Group:消費者組
對于同一個topic,會廣播給不同的Group
一個Group中,只有一個Consumer可以消費該消息
Broker:服務節點
Kafka集群中的每個Kafka節點
Topic:主題
Kafka消息的類別
對數據進行區分、隔離
Partition:分區
Kafka中數據存儲的基本單元
一個topic數據,會被分散存儲到多個Partition
一個Partition只會存在一個Broker上
每個Partition是有序的
Replication:分區的副本
同一個Partition可能會有多個Replication
多個Replication之間數據是一樣的
Replication Leader:副本的老大
一個Partition的多個Replication上
需要一個Leader負責該Partition上與Producer和Consumer交互
Replication Manager:副本的管理者
負責管理當前Broker所有分區和副本的信息
處理KafkaController發起的一些請求
副本狀態的切換
添加、讀取消息等
2-3 概念延伸Partition:分區
每一個Topic被切分為多個Partition
消費者數目少于或等于Partition的數目
Broker Group中的每一個Broker保存Topic的一個或多個Partition
Consumer Group中的僅有一個Consumer讀取Topic的一個或多個Partition,并且是惟一的Consumer
Replication:分區的副本
當集群中有Broker掛掉的情況,系統可以主動地使Replication提供服務
系統默認設置每一個Topic的Replication系數為1,可以在創建Topic時多帶帶設置
Replication的基本單位是Topic的Partition
所有的讀和寫都從Replication Leader進行,Replication Followers只是作為備份
Replication Followers必須能夠及時復制Replication Leader的數據
增加容錯性與可擴展性
第三章:結構設計 3-1 基本結構Kafka功能結構
Kafka數據流勢
Kafka消息結構
Offset:當前消息所處于的偏移
Length:消息的長度
CRC32:校驗字段,用于校驗當前信息的完整性
Magic:很多分布式系統都會設計該字段,固定的數字,用于快速判定當前信息是否為Kafka消息
attributes:可選字段,消息的屬性
Timestamp:時間戳
Key Length:Key的長度
Key:Key
Value Length:Value的長度
Value:Value
3-2 功能特點Kafka特點:分布式
多分區
多副本
多訂閱者
基于Zookeeper調度
Kafka特點:高性能
高吞吐量
低延遲
高并發
時間復雜度為O(1)
Kafka特點:持久性與擴展性
數據可持久化
容錯性
支持在線水平擴展
消息自動平衡
第四章:場景應用 4-1 應用場景Kafka應用場景
消息隊列
行為跟蹤
元信息監控
日志收集
流處理
事件源
持久性日志(commit log)
4-2 應用案例Kafka簡單案例
部署啟動
簡單生產者
簡單消費者
學習筆記
1.下載與安裝 Zookeeper下載:https://zookeeper.apache.org/releases.html#download Kafka下載:http://kafka.apache.org/downloads 安裝:解壓、配置環境變量 2.Zookeeper啟動 解壓:tar -zxf zookeeper-3.4.12.tar.gz 目錄:cd zookeeper-3.4.12/bin 啟動:./zkServer.sh start /home/zc/server/kafka_2.12-2.0.0/config/zookeeper.properties 3.Kafka啟動 解壓:tar -zxf kafka_2.12-2.0.0.tgz 目錄:cd kafka_2.12-2.0.0 啟動:sudo bin/kafka-server-start.sh config/server.properties 4.使用控制臺操作生產者與消費者 創建Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic myimooc-kafka-topic 查看Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181 啟動生產者:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myimooc-kafka-topic 啟動消費者:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myimooc-kafka-topic --from-beginning 生產消息:first message 生產消息:second message4-3 代碼案例
創建49-kafka-example的maven工程pom如下
49-kafka com.myimooc 1.0-SNAPSHOT 4.0.0 49-kafka-example 2.0.4.RELEASE org.springframework.boot spring-boot-parent ${spring.boot.version} pom import org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka com.alibaba fastjson 1.2.36 org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
1.編寫MessageEntity
package com.myimooc.kafka.example.common; import java.util.Objects; /** *
* 標題: 消息實體
* 描述: 消息實體
* 時間: 2018/09/09
* * @author zc */ public class MessageEntity { /** * 標題 */ private String title; /** * 內容 */ private String body; @Override public String toString() { return "MessageEntity{" + "title="" + title + """ + ", body="" + body + """ + "}"; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } MessageEntity that = (MessageEntity) o; return Objects.equals(title, that.title) && Objects.equals(body, that.body); } @Override public int hashCode() { return Objects.hash(title, body); } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } }
2.編寫SimpleProducer
package com.myimooc.kafka.example.producer; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** *
* 標題: 生產者
* 描述: 生產者
* 時間: 2018/09/09
* * @author zc */ @Component public class SimpleProducer{ private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String key, Object entity) { logger.info("發送消息入參:{}", entity); ProducerRecord record = new ProducerRecord<>( topic, key, JSON.toJSONString(entity) ); long startTime = System.currentTimeMillis(); ListenableFuture > future = this.kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable ex) { logger.error("消息發送失敗:{}", ex); } @Override public void onSuccess(SendResult result) { long elapsedTime = System.currentTimeMillis() - startTime; RecordMetadata metadata = result.getRecordMetadata(); StringBuilder record = new StringBuilder(128); record.append("message(") .append("key = ").append(key).append(",") .append("message = ").append(entity).append(")") .append("send to partition(").append(metadata.partition()).append(")") .append("with offset(").append(metadata.offset()).append(")") .append("in ").append(elapsedTime).append(" ms"); logger.info("消息發送成功:{}", record.toString()); } }); } }
3.編寫SimpleConsumer
package com.myimooc.kafka.example.consumer; import com.alibaba.fastjson.JSONObject; import com.myimooc.kafka.example.common.MessageEntity; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.util.Optional; /** *
* 標題: 消費者
* 描述: 消費者
* 時間: 2018/09/09
* * @author zc */ @Component public class SimpleConsumer { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "${kafka.topic.default}") public void listen(ConsumerRecord, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { //判斷是否NULL Optional> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { //獲取消息 Object message = kafkaMessage.get(); MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class); logger.info("接收消息Topic:{}", topic); logger.info("接收消息Record:{}", record); logger.info("接收消息Message:{}", messageEntity); } } }
4.編寫Response
package com.myimooc.kafka.example.common; import java.io.Serializable; /** *
* 標題: REST請求統一響應對象
* 描述: REST請求統一響應對象
* 時間: 2018/09/09
* * @author zc */ public class Response implements Serializable { private static final long serialVersionUID = -972246069648445912L; /** * 響應編碼 */ private int code; /** * 響應消息 */ private String message; public Response() { } public Response(int code, String message) { this.code = code; this.message = message; } @Override public String toString() { return "Response{" + "code=" + code + ", message="" + message + """ + "}"; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
5.編寫ErrorCode
package com.myimooc.kafka.example.common; /** *
* 標題: 錯誤編碼
* 描述: 錯誤編碼
* 時間: 2018/09/09
* * @author zc */ public class ErrorCode { /** * 成功 */ public final static int SUCCESS = 200; /** * 失敗 */ public final static int EXCEPTION = 500; }
6.編寫ProducerController
package com.myimooc.kafka.example.controller; import com.alibaba.fastjson.JSON; import com.myimooc.kafka.example.common.ErrorCode; import com.myimooc.kafka.example.common.MessageEntity; import com.myimooc.kafka.example.common.Response; import com.myimooc.kafka.example.producer.SimpleProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.*; /** *
* 標題: 生產者Controller
* 描述: 生產者Controller
* 時間: 2018/09/09
* * @author zc */ @RestController @RequestMapping("/producer") public class ProducerController { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpleProducer simpleProducer; @Value("${kafka.topic.default}") private String topic; private static final String KEY = "key"; @PostMapping("/send") public Response sendKafka(@RequestBody MessageEntity message) { try { logger.info("kafka的消息:{}", JSON.toJSONString(message)); this.simpleProducer.send(topic, KEY, message); logger.info("kafka消息發送成功!"); return new Response(ErrorCode.SUCCESS,"kafka消息發送成功"); } catch (Exception ex) { logger.error("kafka消息發送失敗:", ex); return new Response(ErrorCode.EXCEPTION,"kafka消息發送失敗"); } } }
7.編寫application.properties
##----------kafka配置 ## TOPIC kafka.topic.default=myimooc-kafka-topic # kafka地址 spring.kafka.bootstrap-servers=192.168.0.105:9092 # 生產者配置 spring.kafka.producer.retries=0 # 批量發送消息的數量 spring.kafka.producer.batch-size=4096 # 緩存容量 spring.kafka.producer.buffer-memory=40960 # 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 消費者配置 spring.kafka.consumer.group-id=myimooc spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.enable-auto-commit=true # 指定消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 指定listener 容器中的線程數,用于提高并發量 spring.kafka.listener.concurrency=3
8.編寫ExampleApplication
package com.myimooc.kafka.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; /** *第五章:高級特性 5-1 消息事務
* 標題: 啟動類
* 描述: 啟動類
* 時間: 2018/09/09
* * @author zc */ @SpringBootApplication @EnableKafka public class ExampleApplication { public static void main(String[] args) { SpringApplication.run(ExampleApplication.class, args); } }
為什么要支持事務
滿足“讀取-處理-寫入”模式
流處理需求的不斷增強
不準確的數據處理的容忍度不斷降低
數據傳輸的事務定義
最多一次:消息不會被重復發送,最多被傳輸一次,但也有可能一次不傳輸
最少一次:消息不會被漏發送,最少被傳輸一次,但也有可能被重復傳輸
精確的一次(Exactly once):不會漏傳輸也不會重復傳輸,每個消息都被傳輸一次且僅僅被傳輸一次,這是大家所期望的
事務保證
內部重試問題:Procedure冪等處理
多分區原子寫入
避免僵尸實例
每個事務Procedure分配一個 transactionl. id,在進程重新啟動時能夠識別相同的Procedure實例5-2 零拷貝
Kafka增加了一個與transactionl.id相關的epoch,存儲每個transactionl.id內部元數據
一旦epoch被觸發,任務具有相同的transactionl.id和更舊的epoch的Producer被視為僵尸,Kafka會拒絕來自這些Producer的后續事務性寫入
零拷貝簡介
網絡傳輸持久性日志塊
Java Nio channel.transforTo()方法
Linux sendfile系統調用
文件傳輸到網絡的公共數據路徑
第一次拷貝:操作系統將數據從磁盤讀入到內核空間的頁緩存
第二次拷貝:應用程序將數據從內核空間讀入到用戶空間緩存中
第三次拷貝:應用程序將數據寫回到內核空間到socket緩存中
第四次拷貝:操作系統將數據從socket緩沖區復制到網卡緩沖區,以便將數據經網絡發出
零拷貝過程(指內核空間和用戶空間的交互拷貝次數為零)
第一次拷貝:操作系統將數據從磁盤讀入到內核空間的頁緩存
將數據的位置和長度的信息的描述符增加至內核空間(socket緩存區)
第二次拷貝:操作系統將數據從內核拷貝到網卡緩沖區,以便將數據經網絡發出
文件傳輸到網絡的公共數據路徑演變
第六章:課程總結 6-1 課程總結課程總結
Kafka基礎概念與結構
Kafka的特點
Kafka應用場景
Kafka應用案例
Kafka高級特性
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77032.html
摘要:時間年月日星期六說明本文部分內容均來自慕課網。這個時候,可以啟動多臺積分系統,來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。 時間:2017年07月22日星期六說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學源碼:無學習源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程安排 Java...
時間:2017年07月06日星期四說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學示例源碼:無學習學習源碼:無 第一章:微服務架構在二手交易平臺(轉轉)中的實踐 1-1 微服務架構特點 分享要點-微服務架構 特點 使用原因 演進 通信協議、服務注冊與發現 柔性可用實踐 服務治理 什么是微服務 微服務是一系列小服務的組合 微服務可以單獨運行,獨立的進程 微服務整...
摘要:時間年月日星期一說明本文部分內容均來自慕課網。多用于網絡加密。散列函數函數或消息摘要函數主要作用散列函數用來驗證數據的完整性。 時間:2017年4月10日星期一說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學示例源碼:https://github.com/zccodere/s...個人學習源碼:https://github.com/zccodere...
時間:2017年10月16日星期一說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學源碼:無學習源碼:https://github.com/zccodere/s... 第一章:課程簡介 1-1 課程介紹 本門課程的主要內容 RxJava是什么 RxAndroid是什么 RxJava常用操作符(重點、難點) 怎樣在項目中使用RxJava和RxAndroid 如何學...
閱讀 2513·2021-11-15 11:38
閱讀 1962·2021-11-05 09:37
閱讀 2286·2021-10-08 10:12
閱讀 2819·2019-08-30 15:55
閱讀 2122·2019-08-30 15:52
閱讀 1231·2019-08-29 13:24
閱讀 472·2019-08-26 18:27
閱讀 1483·2019-08-26 18:27