一 認(rèn)識(shí)Kafka
Kafka是一個(gè)開(kāi)源流處理平臺(tái),它由 Apache 軟件基金會(huì)開(kāi)發(fā)的,開(kāi)發(fā)它的目的是為了提供一個(gè)統(tǒng)一的、高吞吐、低延遲的實(shí)時(shí)數(shù)據(jù)處理平臺(tái)。它的持久化層與同類(lèi)平臺(tái)不同,本質(zhì)上是一個(gè)“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊(duì)列”,這使它非常具有價(jià)值。
二 Kafka的使用
想要完成分區(qū)副本的重分配,需要在 Kafka 的根路徑下,執(zhí)行如下命令
執(zhí)行
./bin/kafka‐reassign‐partitions.sh ‐‐zookeeper localhost:2181/kafka ‐‐reassignment‐json‐file reassign‐topic.json ‐‐execute
分區(qū)副本的分布情況由eassign‐topic.json 文件指定,如
{ "version": 1, "partitions": [ { "topic": "test", "partition": 2, "replicas": [ 2, 1 ], "log_dirs": [ "any", "any" ] } }
從上我們可以看出opic=test,partition=2 的分區(qū)的兩副本分別移動(dòng)到 brokerId=2 和 brokerId=1 的節(jié)點(diǎn)的任意磁盤(pán)路徑上。
三 ZooKeeper 和 Kafka Controller
3.1 ZooKeeper
Kafka 的元數(shù)據(jù)存儲(chǔ)在 ZooKeeper 中。Apache ZooKeeper是可靠的分布式協(xié)調(diào)服務(wù)框架。它憑借著數(shù)據(jù)模型類(lèi)似于文件系統(tǒng)的樹(shù)形結(jié)構(gòu),實(shí)現(xiàn)保存一些元數(shù)據(jù)協(xié)調(diào)信息。同時(shí) ZooKeeper具有 Watch 通知功能。一旦 znode 節(jié)點(diǎn)被創(chuàng)建、刪除,子節(jié)點(diǎn)數(shù)量發(fā)生變化,或是 znode 所存的數(shù)據(jù)本身變更, ZooKeeper會(huì)及時(shí)通知客戶端,觸發(fā)對(duì)應(yīng)的處理操作。
3.2 Kafka Controller
Kafka Controller作為 Apache Kafka 的核心組件,它能夠在 Apache ZooKeeper 的幫助下管理和協(xié)調(diào)整個(gè) Kafka 集群。集群中任意一臺(tái) Broker 都能充當(dāng)控制器的角色。事實(shí)上,在運(yùn)行過(guò)程中,只能有一個(gè) Broker 成為控制器,來(lái)發(fā)送各種操作指令。
四 分區(qū)重分配流程
Kafka需要在client、broker 和 controller 的協(xié)同運(yùn)行下完成分區(qū)重分配。
流程圖如下:
流程圖分析
1、kafka-reassign-partitions 客戶端
先由客戶端發(fā)起分區(qū)重分配任務(wù),它的入口主類(lèi)為 ReassignPartitionsCommand.scala 中,接著調(diào)用 executeAssignment 方法。客戶端的 executeAssignment 方法主要完成了如下操作:
· 解析 json 文件 ,進(jìn)行json 文件校驗(yàn)
· 讀取 json 文件內(nèi)容,判斷是否繼續(xù)執(zhí)行副本重分配
· 校驗(yàn)分區(qū)副本數(shù)和副本數(shù)據(jù)路徑數(shù)是否一致,校驗(yàn) partition/replica 是否為空/重復(fù)
· 檢查待重分配的分區(qū)在集群中是否存在,檢查確認(rèn)所有目標(biāo) broker 均在線,檢查是否已存在分區(qū)副本重分配任務(wù)
· 分配任務(wù)記錄,發(fā)送 alterReplicaLogDirs 請(qǐng)求
2、controller 維護(hù)分區(qū)的元數(shù)據(jù)信息
在 controller 啟動(dòng)時(shí)會(huì)創(chuàng)建 partitionReassignmentHandler,kafkaController 主線程回調(diào) onControllerFailover 時(shí),當(dāng)/admin/reassign_partitions 發(fā)生變化時(shí),會(huì)觸發(fā)分區(qū)副本重分配操作,在 maybeTriggerPartitionReassignment 中通過(guò)調(diào)用 onPartitionReassignment 真正執(zhí)行分區(qū)副本重分配。
onPartitionReassignment 的執(zhí)行過(guò)程如下:
· 在 zk 中將 AR 更新為 RAR+OAR
· 向所有副本(RAR+OAR)中發(fā)送 LeaderAndIsr 請(qǐng)求
· 將 RAR-OAR 的副本狀態(tài)置為 NewReplica,直到所有 RAR 中的副本完成與 leader 的同步
· 將所有 RAR 的副本置為 OnlineReplica 狀態(tài),將 RAR 作為 AR
· 判斷 leader 不在 RAR 中,檢查 leader 狀態(tài),如果 leader 健康則更新 LeaderEpoch,否則重新選擇 leader
· 將 OAR-RAR 的副本置為 Offline 狀態(tài)
· 將 OAR-RAR 的副本置為 NonExistentReplica 狀態(tài),并將 zk 中的 AR 置為 RAR(/brokers/topics/${topicName}數(shù)據(jù)格式:{"version":1,"partitions":{"0":[${brokerId}]}})
· 更新 zk 中/admin/reassign_partitions 的值,同步所有 broker,更新元數(shù)據(jù)信息
3、broker 端數(shù)據(jù)跨路徑遷移
底層數(shù)據(jù)跨路徑遷移需要 broker 端完成的,broker 接收到客戶端發(fā)來(lái)的請(qǐng)求后,調(diào)用 alterReplicaLogDirs 方法
步驟如下:
· 確保目的路徑/待移動(dòng)分區(qū)在線
· 標(biāo)記需要進(jìn)行遷移的分區(qū)副本路徑
· 對(duì)于需要移動(dòng)的分區(qū)副本,創(chuàng)建 future Log
· 停止當(dāng)前 Log 的清理工作,等待 future Log 同步
· 創(chuàng)建 ReplicaAlterLogDirsThread,逐個(gè)數(shù)據(jù)構(gòu)造 Fetch 請(qǐng)求
· 通過(guò) ReplicaManager.fetchMessages 從分區(qū)副本 leader 獲取數(shù)據(jù),完成數(shù)據(jù)同步
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/127584.html
摘要:而在服務(wù)器中應(yīng)該充分利用多線程來(lái)處理執(zhí)行邏輯。能保證所在的失效,該消息仍然可以從新選舉的中獲取,不會(huì)造成消息丟失。這意味著無(wú)需等待來(lái)自的確認(rèn)而繼續(xù)發(fā)送下一批消息。 showImg(https://segmentfault.com/img/remote/1460000018373147?w=702&h=369); 1.概述 Apache Kafka最早是由LinkedIn開(kāi)源出來(lái)的分布式...
閱讀 1195·2022-09-27 09:47
閱讀 1098·2022-09-27 09:28
閱讀 1559·2022-09-27 09:16
閱讀 860·2022-09-27 08:21
閱讀 1019·2022-09-27 08:08
閱讀 1166·2022-09-18 12:33
閱讀 879·2022-09-16 08:01
閱讀 890·2022-09-15 12:27