摘要:所以消息可以重復的放入不同的隊列中。而是對于消息來說的,在其發送消息到交換器時,需指定。與發布訂閱模式的相同點是可以將消息重復發送。它需要處理低延遲的傳遞,用于支持傳統的消息傳遞系統用例。
理解概念的一個方法
之前說過學習一個新的東西,最核心的就是掌握概念。而如何掌握概念呢?我的其中一個方法就是對比,把相似且模糊不清的兩個概念進行對比,這樣就理解更快。
RabbitMQ模式RabbitMQ有以下模式:
1.工作隊列(Worke Queues)
發消息和收消息都是直接通過隊列。在耗時比較多的任務,我們把任務放入隊列里,然后每個工作者去獲取任務然后處理。所以這個工作隊列,也稱為任務隊列(Task Queues)。這樣就將耗資源的任務從產生任務的應用上解耦出來。
這個模式最主要的特征是:每個任務只會分發到一個工作者中。
2.發布/訂閱(Publish/Subscribe)
這個發布/訂閱和觀察者模式很像,但不是同一個東西。具體可看看發布/訂閱和觀察者區別。
在這里,RabbitMQ引入了交換器(Exchange)的概念,生產者不直接與隊列交互,而是通過交換器去與隊列進行交互(或者叫綁定)。也就說生產者只和交換器交互。引入交換器這概念后,這消息中間件可以玩的花樣就多了。發布/訂閱(Publish/Subscribe)就是其中的一個。這里使用到的就是fanout的交換器。
這個模式最主要的特征是:類似于廣播(broadcast),同個消息可以發送到不同的隊列中去,而且這fanout交換器也不關系隊列有哪些,只要隊列和fanout交換器有綁定就發送,這樣就可以將消息重復發送到不同的隊列上。
與工作隊列模式的區別是:發布/訂閱的概念叫消息,而不是任務。所以消息可以重復的放入不同的隊列中。
3.路由(Routing)
路由模式也是引入交換器概念后,消息中間件玩的一個花樣。這里用到的交換器叫direct。
在這模式里,得新增兩個概念,分別是binding key和routing key, binding key是對于隊列來說的,在其與direct交換器綁定時指定binding key。而routing key是對于消息來說的,在其發送消息到direct交換器時,需指定routing key。這樣routing key能夠和binding key匹配得上的(就是值相等),direct交換器就會將消息發送到對應binding key的隊列上。
這個模式最主要的特征是:控制消息的精度更高,可以指定哪些消息發送到哪些隊列里。
與發布/訂閱模式的區別是:區別是發布/訂閱是廣播,將消息發送到任何綁定交換器的隊列上,所以沒能力選擇消息,而路由是需binding key和routing key匹配上,消息才能發送到對應binding key的隊列上,從而有能力去選擇消息。
與發布/訂閱模式的相同點是:可以將消息重復發送。
注:隊列可以綁定多個routing key
4.主題(Topics)
當然,主題模式也是引入交換器概念后,消息中間件玩的一個花樣。這里用到的交換器叫topic。
這里用到的也是binding key和routing key,但不一樣的是,routing_key不能指定明確的key。而是這個key需要帶有點“.”,如 "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。而在這模式下,binding key的指定可以更廣泛些,其結構是這樣的".orange." 、 "..rabbit" 和"lazy.#"。其中*(星號)是可以代表一個單詞,#(井號)是可以代表零個或多個單詞。也跟路由類似的,只要這樣routing key能夠和binding key匹配得上的(這里可以不用值相等,模式匹配上即可),topic交換器就會將消息發送到對應binding key的隊列上。
如Q1隊列的binding key是".orange.",而 Q2是"..rabbit"和"lazy.#"。如果消息的routing key是 "quick.orange.rabbit" 則此消息會被發送到Q1和Q2隊列上。routing key是"quick.orange.fox"的消息只會發送到Q1隊列上。routing key是"lazy.pink.rabbit" 的消息只會發送到Q2隊列一次,routing key是 "quick.brown.fox" 的消息沒有匹配任何的binding key則此消息丟棄。
注:隊列可以綁定多個routing key
5.遠程過程調用RPC(Remote Procedure Call)
RPC可以遠程調用函數,等待服務器返回結果。
RPC的一個備注:RPC雖然用得很廣泛,然而它也有不足之處,就是開發人員無法清晰的知道自己調用的這個函數到底是本地函數還是很慢的RPC。這種困惑很容易導致出一個不可預測的系統和增加沒必要的復雜性導致難以定位問題。如果不用簡單的程序,誤用RPC還可能寫出很維護的意大利面條式的代碼。。
對于這個問題,有三個建議保證函數是很容易被辨別出是本地函數還是遠程函數。
文檔化,清晰地記錄組件間的依賴。
處理網絡帶來的異常,如超時等。
當出現用RPC是否必要時,如果可以的話,你最好用異步管道(asynchronous pipeline)的形式,而不是使用阻塞形式的RPC。
。
RabbitMQ可以用于構建RPC系統。一個客戶端和一個可擴展的RPC服務器。不過此功能不太常用,所以就不留篇幅來講解。大概原理就是可以新增消息的屬性,從而將請求和響應的消息給匹配上。
觀察者模式和發布/訂閱模式的區別觀察者模式
觀察者模式的定義:對象間的一種一對多的組合關系,以便一個對象的狀態發生變化時,所有依賴于它的對象都得到通知。
舉個例子
假設你正在找一份軟件工程師的工作,對“香蕉公司”很感興趣。所以你聯系了他們的HR,給了他你的聯系電話。他保證如果有任何職位空缺都會通知你。這里還有幾個候選人也你一樣很感興趣。所以職位空缺大家都會知道,如果你回應了他們的通知,他們就會聯系你面試。
該模式必須包含兩個角色:觀察者和觀察對象,香蕉公司就是被觀察者Subject,你就是Observers(還有和你一樣的候選人),當被觀察者狀態發送變化(比如職位空缺)就會通知(notify)觀察者,前提是Observers注冊到Subject里,也就是香蕉公司的HR得有你的電話號碼。
發布/訂閱模式
在觀察者模式中的Subject就像一個發布者(Publisher),而觀察者(Observer)完全可以看作一個訂閱者(Subscriber)。subject通知觀察者時,就像一個發布者通知他的訂閱者。這也就是為什么很多書和文章使用“發布-訂閱”概念來解釋觀察者設計模式。但是這里還有另外一個流行的模式叫做發布-訂閱設計模式。它的概念和觀察者模式非常類似。最大的區別是:
在發布-訂閱模式,消息的發送方,叫做發布者(publishers),消息不會直接發送給特定的接收者(訂閱者)。
意思就是發布者和訂閱者不知道對方的存在。需要一個第三方組件,叫做消息中間件,它將訂閱者和發布者串聯起來,它過濾和分配所有輸入的消息。換句話說,發布/訂閱模式用來處理不同系統組件的信息交流,即使這些組件不知道對方的存在。
我們設計kafka,是希望它能成為統一的平臺來處理大公司可能擁有的所有實時數據流。要做到這一點,我們必須考慮相當廣的用例(use case)。
它需要擁有高吞吐量來支持大容量事件流,如實時日志聚合(real-time log aggregation)。
它需要優雅地處理大量的數據備份,用于支持離線系統的周期性數據負載。
它需要處理低延遲的傳遞,用于支持傳統的消息傳遞系統用例。
我們想它是分區、分布式、實時處理信息流,以創建新的信息流和傳輸信息流。這些動機造就了kafka的分區和消費者模型。
最后有可能數據流被輸入到其他數據系統中,而這些系統需要對外提供服務,所以kafka需要有能力保證容錯性,哪怕存在有機器宕機。
為了支持上述這些,我們設計了一些獨特元素,更類似于數據庫日志,而不是傳統的消息傳遞系統。
我們將在下面部分中概述設計中的一些元素。
持久化(Persistence) 別害怕文件系統kafka重度依賴文件系統,用文件系統來存儲和緩存消息。人們都由這感覺“硬盤很慢”,以致于大家懷疑一個持久化架構是否能具有競爭力的性能。實際上硬盤它很快也很慢,這取決于我們怎么去使用它。一個合理的硬盤架構通常可以和網絡一樣快。(看來作者的網速都很快)。
硬盤性能的關鍵是,磁盤驅動器的吞吐量與過去十年的硬盤搜索的延遲有所不同。因此在6×7200rpm SATA RAID-5陣列的JBOD配置上的線性寫的性能大約為600MB/秒,但隨機寫入的性能僅為100k/秒,即超過6000倍的差別。這些線性讀寫是所有使用模式中最可預測的,并且由操作系統進行了大量優化。現代操作系統都提供了預讀取(read-ahead)和后寫(write-behind)操作的技術,這些支持多次讀取到一個大塊中和合并小的邏輯寫形成一個大的物理寫。這問題更深入的討論可以在這找到 ACM Queue article,他們確實發現順序硬盤讀寫在某些情況下比隨機內存訪問還快。
為了彌補這些性能差異,現代操作系統越來越著重使用主存來做磁盤緩存。現代操作系統很樂意將空余內存轉移到磁盤緩存中,但這需要承受在內存被回收時帶來的一點點的性能損失。所有硬盤讀寫都通過這統一的緩存(磁盤緩存)。如果沒有直接IO,這特性并沒有那么容易被拋棄。因此即使一個進場維護自己數據緩存時,這些數據將會在OS的頁緩存里復制兩份,兩次高效地存儲所有東西。
此外,我們是在JVM基礎上建立的,任何一位有花時間去研究Java內存的使用,都會知道以下兩件事情:
1.對象的內存開銷非常高,通常會使要存儲的數據的大小增大一倍(甚至更多)。
2.隨著堆內存的增加,Java垃圾收集會變得越來越繁瑣和緩慢。
也正是使用文件系統和依賴頁緩存(pagecache)帶來的結果優于維護一個內存中的緩存(in-memory cache)或是其他結構,通過對所有空閑內存進行自動訪問,我們至少可以將可用緩存加倍,并且還可以繼續加倍,通過存儲緊湊的字節結構而不是單個對象。這樣做的話可以在32GB的機器上使用28-30GB緩存,而不用擔心GC問題。而且,即使服務重啟,這些數據也保持熱度,對比起來,進程內存中的緩存在重啟后需要重建(對于10GB的緩存可能需要10分鐘),否則它需要從一個完全冷的緩存開始(這可能意味更糟糕的初始化性能)。這也極大地簡化了代碼,因為在緩存和文件系統之間保持一致性的所有邏輯現在都在操作系統中,這比一次性在進程內嘗試更有效、更正確。如果您的磁盤使用傾向于線性讀取,那么預讀取將有效地預操作這些緩存。
這表明了一個非常簡單的設計:在我們耗盡空間的時候,與其保持盡可能多的內存并將其全部清空到文件系統,不如反過來,數據都是被立即寫入到文件系統上的持久日志中,而不必刷新到磁盤。實際上,這僅僅意味著它被轉移到內核的頁緩存中。
以頁緩存為核心的設計,在這里文章里有被描述,此文章是Varnish的設計。
在消息傳遞系統里的持久化數據結構通常是一個消費者隊列關聯著一棵BTree或者其他通用的隨機訪問數據結構來維護消息的元數據。BTree是一個萬能的數據結構,可以在消息傳遞系統中支持各種事務和非事務性的語義。但它帶來相當高的成本:BTree操作是O(log N)。通常O(log N)本質上被認為是等于常量時間,但對于硬盤操作則并不是這樣。磁盤尋軌達到10ms,并且每個磁盤一次只能執行一次尋軌,所以并行性是有限的。因此,即使是少量的磁盤尋軌也會導致很高的開銷。由于存儲系統將非常快的緩存操作與非常慢的物理磁盤操作混合在一起,因此當在緩存固定時,數據增加時,樹結構的性能通常是超線性的。數據加倍則會使速度慢兩倍以上。
直觀上,一個持久的隊列可以建立在簡單的讀取和追加的形式,這通常也是日志解決方案使用的。這結構有這樣的好處,所有操作都是O(1),并且讀操作不會阻塞寫和讀的操作。這是具有明顯的優勢,是因為性能完全與數據量大小解耦了,一個服務現在可以充分利用那些大量的,且便宜,低轉速的SATA驅動器。雖然硬盤的尋軌性能差,但它們的大型讀和寫的性能還是可以接受的,而且還是三分之一的價格就有三倍的容量。
在沒有任何性能懲罰的情況下訪問幾乎無限的磁盤空間意味著我們可以提供一些在消息傳遞系統中不常見的特性。例如,在kafka中,我們可以在相對較長的時間內保留消息(比如一個星期),而不是每次消費完就刪除消息。這將給消費者帶來很大的靈活性。
我們在效率方面付出大量的努力。我們最初用例中的一個是處理網站活動數據,這可以是非常大量的數據:每個頁面的訪問都會產生許多寫操作。此外,我們假設每條消息至少被一個消費者讀取(通常是很多消費者),因此我們努力讓消費盡可能的便宜。
我們還發現,經歷過構建和運行多個類似的系統,有效的多租戶業務的關鍵是效率。
我們在前面章節討論過硬盤的效率。一旦消除了糟糕的磁盤訪問模式,在這種類型的系統中有兩個常見的低效原因:太多小的I/O操作和過度的字節復制。
這小IO問題發生在客戶端和服務器之間,和服務器自身的持久化操作中。
為了避免這種情況,我們的協議是圍繞一個“消息集(message set)”抽象構建的,該抽象可以自然地將消息分組在一起。這允許網絡請求將消息分組,并分攤網絡往返的開銷,而不是一次發送一條消息。服務器依次將大量的消息追加到其日志中,而消費者一次獲取大量的線性塊。
這個簡單的優化產生數量級的加速。批處理導致了更大的網絡數據包、更大的順序磁盤操作、連續的內存塊等等,所有這些都使得Kafka可以將隨機消息寫入的流變成 線性的寫 流給消費者。
另一個低效率的是字節復制。在低消息率下,這不是一個問題,但在負載下的影響是顯著的。為了避免這種情況,我們采用了一種標準化的二進制消息格式,由生產者、代理和消費者共享(因此數據塊可以在不進行修改的情況下傳輸)。
broker維護的消息日志本身就是一個文件目錄,每個文件都由一個以生產者和消費者使用的相同格式寫入磁盤的消息集的序列填充。保持這種通用格式可以優化最重要的操作:持久日志塊的網絡傳輸。現代unix操作系統為將數據從頁緩存傳輸到套接字提供了高度優化的代碼路徑;在Linux中,這是通過sendfile的系統調用完成的。
要了解sendfile的作用,首先最重要先理解將數據從文件傳輸到套接字的公共數據路徑:
1.操作系統從磁盤讀取數據到內核空間的頁緩存。
2.應用程序將數據從內核空間讀取到用戶空間緩沖區中。
3.應用程序將數據返回到內核空間,并將其寫入套接字緩沖區。
4.操作系統將數據從套接字緩沖區復制到通過網絡發送的NIC緩沖區。
有4次復制,兩次系統內核調用,這樣的效率當然就低下。使用sendfile,通過允許操作系統直接將數據從頁緩存發送到網絡,避免了重復復制。因此在這個優化的路徑中,只需要最后的復制,一次從磁盤復制到NIC緩沖區即可。——零拷貝(zero-copy)
我們期望一個常見的用例是在一個主題上有多個使用者。使用上述的零拷貝優化,數據被完全復制到頁緩存中,并在每次讀取時重復使用,而不是存儲在內存中并在每次讀取時將其復制到用戶空間。這就允許以接近網絡連接的極限的速率來讀取消息。
頁緩存和sendfile的組合意味著,在一個Kafka集群上,在有消費者的機子上,您將看到磁盤上沒有任何讀取活動,因為它們將完全從緩存中提供數據。
更多Java支持的sendfile和零拷貝,請點擊這里。
在某性情況下,事實上真正的瓶頸不是CPU也不是硬盤,而是網絡帶寬。對于需要在廣域網上的數據中心之間發送消息的數據管道來說,尤其如此。當然,用戶自己可以壓縮消息而不需要kafka的支持。但這可能導致非常差的壓縮比,特別是當消息的冗余字段很多(如JSON里的字段名和網站日志里的user agent或公共字符串)。高效的壓縮需要多個消息壓縮在一起,而不是每個消息獨立壓縮。
Kafka用高效的批處理格式支持這一點。可以將一批消息聚合到一起壓縮,并以這種形式發送到服務器。這批消息將以壓縮的形式寫入,并且將在日志中保持壓縮,并且只會被使用者解壓。
Kafka支持GZIP、Snappy和LZ4壓縮協議。關于壓縮的更多細節可以在這里找到。
生產者直接發送數據到broker,不需要任何的中間路由層,而接受的broker是該分區的leader。為了幫助生產者實現這一點,所有Kafka節點都可以回答關于哪些是可用服務器的元數據的請求,以及在任何給定的時間內,某個主題的分區的leader是否允許生產者適當地發送它的請求。
由客戶端控制它想往哪個分區生產消息。這可以隨機地進行,實現一種隨機的負載平衡,或者可以通過一些語義分區函數來實現。我們提供了語義分區的接口,允許用戶指定一個分區的key,并使用這個key來做hash到一個分區(如果需要的話,也是可以復寫這分區功能的)。例如,我們選擇user的id作為可用,則所以該用戶的信息都會發送到同樣的分區。這反過來又會讓消費者對他們的消費產生局部性的假設。這種明確設計的分區,允許消費者自己本地的處理。
批處理是效率的主要驅動因素之一,為了能夠批處理,kafka的生產者會嘗試在內存中積累數據,然后在一起在一個請求中以大批量的形式發送出去。批處理這個可以設置按固定的消息數量或按特定的延遲(64k或10ms)。這允許累積更多字節的發送出去,這樣只是在服務器上做少量的大IO操作。這種緩沖是可配置的,這樣提供了一種機制來以額外的延遲來提高吞吐量。
具體的配置)和生產者的api可以在這文檔中找到。
kafka消費者的工作方式是,向其想消費的分區的leader發送“fetch”請求。在每個請求中消費者指定日志的偏移量,然后接受回一大塊從偏移量開始的日志。因此,消費者對position有重要的控制權,如果需要,可以重置position來重新消費數據。
Push和pull我們首先考慮的一個問題是,消費者應該是從broker拉取消息,還是應該是broker把消息推送給消費者。在這方面,kafka遵循了一種更傳統的設計,大多數消息傳遞系統也會用的,那就是數據是從生產者push到broker,消費者是從broker拉取數據。一些日志集中系統,如Scribe和Apache Flume,遵循一個非常不同的,基于推送的路徑,將數據被推到下游。這兩種方法都由利弊,在基于推送的系統,由于是broker得控制數據傳輸的速率,不同消費者可能要不同的速率。然而消費者一般的目的都是讓消費者自己能夠以最大的速度進行消費,但在基于push的系統,當消費速率低于生產效率時,消費者就不知道該怎么辦好了(本質上就是一種拒絕服務攻擊(DOS))。一個基于pull的系統就擁有很好的熟悉,消費者可以簡單的調控速率。
基于pull的系統的另一個優點是,它可以對發送給消費者的數據進行聚合的批處理。基于推送的系統必須選擇立即發送請求或積累更多數據,然后在不知道下游用戶是否能夠立即處理它的情況下發送它。如果對低延遲進行調優,這將導致僅在傳輸結束時發送一條消息,最終將被緩沖,這是浪費。基于pull的設計解決了這個問題,因為用戶總是在日志的當前位置(或者是一些可配置的最大大小)之后提取所有可用的消息。因此,我們可以在不引入不必要的延遲的情況下獲得最佳的批處理。
基于pull的系統的缺點是,如果broker沒數據,則消費者可能會不停的輪訓。為了避免這一點,我們在pull請求上提供了參數,允許消費者在“長輪訓”中阻塞,直到數據達到(并且可以選擇等待,直到一定數量的自己可以,確保傳輸的大小)。
你可能詳細其他可能的設計,如只有pull,點到點。生產者會將本地的日志寫到本地日志中,而broker則會從這些日志中拉取數據。通常還會提出類似的“存儲轉發(store-and-forward)”生產者。這很有趣,但是我們覺得不太適合我們的目標用例:它有成千上萬的生產者。我們在大規模上運行持久數據系統的經驗使我們覺得,在許多應用程序中涉及到數千個磁盤,實際上并不會使事情變得更可靠,而且操作起來也會是一場噩夢。在實踐中,我們發現,我們可以在不需要生產者持久化的情況下,以大規模的SLAs來運行管道。
消費者的Position(Consumer Position)令人驚訝的是,跟蹤所使用的內容是消息傳遞系統的關鍵性能點之一。
很多消息傳遞系統在broker中保存了關于什么消息是被消費了的元數據。也就是說,當消息傳遞給消費者時,broker要么立即記錄信息到本地,要么就是等待消費者的確認。這是一個相當直觀的選擇,而且對于一臺機器服務器來說,很清楚地知道這些消息的狀態。由于許多消息傳遞系統中用于存儲的數據結構都很糟糕,因此這(記錄消息狀態)也是一個實用的選擇——因為broker知道什么是已經被消費的,所以可以立即刪除它,保持數據的大小。
讓broker和消費者就已經消費的東西達成一致,這可不是小問題。如果一條消息發送到網絡上,broker就把它置為已消費,但消費者可能處理這條消息失敗了(或許是消費者掛了,也或許是請求超時等),這條消息就會丟失了。為了解決這個問題,很多消息傳遞系統增加了確認機制。當消息被發送時,是被標志為已發送,而不是已消費;這是broker等待消費者發來特定的確認信息,則將消息置為已消費。這個策略雖然解決了消息丟失的問題,但卻帶來了新的問題。第一,如果消費者在發送確認信息之前,在處理完消息之后,消費者掛了,則會導致此消息會被處理兩次。第二個問題是關于性能,broker必須保存每個消息的不同狀態(首先先鎖住消息以致于不會讓它發送第二次,其次標志位已消費從而可以刪除它)。還有些棘手的問題要處理。如消息被發送出去,但其確認信息一直沒返回。
kafka處理則不一樣。我們的主題被分為一個有序分區的集合,且每個分區在任何給定的時間內只會被訂閱它的消費者組中的一個消費者給使用。這意味著每個分區中的消費者的position僅僅是一個整數,這是下一次消費時,消息的偏移量。這使狀態(記錄是否被消費)非常小,每個分區只有一個數字。這個狀態可以被定期檢查。這樣確認一條消息是否被消費的成本就很低。
這樣還附加了一個好處。消費者可以重置其最先的position從而重新消費數據。這雖然違反了隊列的公共契約,但它卻變成關鍵功能給許多消費者。例如,如果消費者代碼有一個bug,并且在一些消息被消費后才被發現,那么當bug被修復后,消費者就可以重新使用這些消息。
離線數據加載(Offline Data Load)可擴展持久化允許只有周期性地使用批量數據的消費者的可能性,比如定期將批量數據加載到離線系統(如Hadoop或關系數據倉庫)。
消息傳遞語義(Message Delivery Semantics)現在我們已經了解了些生產者和消費者是怎么工作的,接下來我們說下kafka提供給生產者和消費者的語義保證。很明顯這里提供了以下幾種消息傳遞保證機制:
至多一次(At most once),這樣消息可能會丟失,但永遠不會重新傳遞。
至少一次(At least once),這樣消息不可能會丟失,但可能會重新傳遞。
有且僅有一次(Exactly once),這是大家想要的,每個消息會被傳遞一次,而且也僅僅只有一次。
值得注意的是,這可以歸結為兩個問題:發布消息的持久化保證,以及在消費消息時的保證。
很多系統聲稱提供“有且僅有一次”的傳遞語義,但閱讀這些細節時,會發現其中大部分都是誤導(他們不理解消費者或生產者可能掛掉的情況,那些有多個消費者處理的情況,或者是那些被寫入磁盤的數據可能丟失的情況)。
kafka的語義很直接。在發布消息時,我們將消息“提交”到log中。一旦發布的消息被提交,只要有一個broker復制這個消息被寫入活動分區,它就不會丟失。提交的消息的定義、活動分區以及我們試圖處理的失敗的類型的描述將在下一節(副本)中詳細描述。現在我們假設在完美的情況下,現在讓我們假設一個完美的、無損的broker,和嘗試理解對生產者和消費者的保證。如果一個生產者試圖發布消息并經歷一個網絡錯誤,那么就不能確定該錯誤發生在消息提交之前還是之后。這類似于插入到一個數據庫表的自動生成的主鍵的語義。
在0.11.0.0版本之前,如果一個生產者沒有收到一個消息已經提交的響應,那么它幾乎沒有選擇,只能重新發送消息。這提供了“至少一次”的傳遞語義,因為如果原始請求實際上成功了,那么在重新發送期間,消息可能再次被寫入到日志中。從0.11.0.0開始,Kafka生產者也支持一個冪傳遞的選項,該選項保證重新發送不會導致日志中有這重復的消息。為了實現這一目標,broker為每個生產者分配一個ID,并使用由生產者發送消息時一起把序列號發送到broker,這樣broker就可以根據序列和id來處理重復的消息。同樣,從0.11.0.0開始,生產者支持使用類似于事務的語義向多個主題分區發送消息:即所有消息都已成功寫入或都失敗寫入。這種情況的主要應用場景是在Kafka主題之間進行“有且僅有一次”的處理(如下所述)。
并非所有的用例都需要這樣強的保證。對于延遲敏感的使用,我們允許生產者指定它需要的持久化級別。如果生產者指定要等待消息被提交要在10ms完成。則生產者可以指定它異步地執行發送,或者等待直到leader(但不一定是follower)得到消息。
現在我們描述下消費者視角下的語義。所有的副本都有相同的日志和相同的偏移量。消費者控制它在這個日志中的position。如果消費者從未崩潰,它可以將這個position存儲在內存中,但是如果消費者崩潰了,我們希望這個主題的分區來接替這個position的處理,那么新的進程將需要選擇一個合適的position來開始處理。
消費者讀取消息時,有幾個處理消息和更新其位置的選項。
第二種是它先讀取消息,然后將position保存到日志中,最后是處理消息。在這種情況下,在保存其position之后,在保存處理消息產生的輸出之前,消費者進程可能會崩潰。在這種情況下,接手處理的過程將從保存的position開始,即使在此position之前的一些消息未被處理。這是對應著“至多一次”的語義,失敗的消息可能不被處理。
第二種是它先讀取消息,然后處理消息,最后保存position到日志中。在這種情況下,在處理消息后,消費者進程可能會崩潰,但是在它保存它的position之前崩潰的。在這種情況下,當新進程接手了它接收到的最初幾條消息時,或許這幾條消息就已經被處理過了。在消費者崩潰的情況下,這相當于“至少一次”的語義。在許多情況下,消息有主鍵,因此更新是冪等的(接收相同的消息兩次,只是用另一個副本重寫了一個記錄)。
那“有且僅有一次”的語義怎樣(或者是說你到底想要什么)?從kafka主題中獲取消息處理后發布到其他主題(如一個Kafka Streams應用),我們可以利用上面提到的版本0.11.0.0里的新事務生產者的功能。消費者的position被當做一個消息存儲在一個主題,因此我們可以在與接收處理數據的輸出主題相同的事務中寫入kafka的偏移量。 如果事務被中止,消費者的position將恢復到原來的值,而輸出主題的生成數據將不會被其他消費者看到,這取決于他們的“隔離級別”。在默認的“read_uncommitted”隔離級別中,所有消息對消費者都是可見的,即使它們是被中止的事務的一部分,但是在“read_committed”中,使用者只會從提交的事務中返回消息(以及任何不屬于事務的消息)。
當寫入外部系統時,限制是在需要協調消費者的position和實際存儲的輸出。實現這一目標的經典方法是在存儲消費者position和存儲消費者輸出之間引入兩階段提交。但這可以更簡單地處理,并且通常通過讓消費者將其偏移量存儲在與輸出相同的位置。這樣做比較好,因為消費者可能想要寫入的輸出系統都不支持兩階段提交。作為一個例子,考慮一個Kafka Connect連接器,它在HDFS中填充數據,以及它讀取的數據的偏移量,從而保證數據和偏移量都得到了更新,或者兩者都不更新。對于需要這些更強語義的其他許多數據系統,我們遵循類似的模式是為了那些需要強一致性語義的系統,還為了這些消息沒有主鍵來允許刪除重復數據。
因此kafka為了kafka Streams,高效地支持“有且僅有一次”的傳遞,并且在Kafka主題之間傳輸和處理數據時,通常可以使用事務生產者/消費者提供“有且僅有一次”的傳遞。對于其他目標系統的“有且僅有一次”的傳遞一般需要協調,但kafka提供了偏移量,它可以實現這要求(參見Kafka Connect)。否則,缺省情況下Kafka保證“至少一次”傳遞,并且允許用戶禁止生產者的重試或消費者在處理數據之前提交position,從而實現“至多一次”的專遞。
副本(Replication)Kafka通過一個可配置的服務器數量對每個主題的分區進行復制日志(你您可以按主題設置此副本因子(replication factor))。這允許在集群中的服務器發生故障時自動恢復,因此當在出現故障時仍然可以使用消息。
其他消息傳遞系統提供了副本相關的特性,但,我們認為,這似乎是一種策略而已,并沒有大量的使用,而且還有個很大的缺點:slave是未被用上的,吞吐量受到嚴重的影響,恢復還需要繁瑣的人工配置,等等。kafka默認是使用了副本功能,實際上那些副本因子設置為1的主題,我們也會當做是使用副本功能的主題。
副本的最小單元是主題的分區。在沒有失敗的情況下,kafka的每個分區都是有一個leader,其follower可以為零個或多個。包括leader在內的副本數量就是副本因子。所有讀和寫都是通過leader分區。通常情況,分區的數據量是多個broker,leader的數量時平均分配當每個broker。follower的日志和leader的日志是完全相同的——它們都具有相同的偏移量和相同順序的消息(當然,在任何給定的時刻,在日志的末尾可能會有一些還未同步到的消息)。
follower也跟kafka的普通消費者一樣從leader消費消息。follower從leader拉消息時,有個很好的特性,那就時可以讓follower很容易地批量把日志應用到其(follower)日志中。
跟很多 分布式系統處理自動恢復 一樣,對于節點是否“存活(alive)”需要有一個明確的定義。對于kafka,節點存活有以下兩個條件:
1.節點必須維護它與ZooKeeper的session(通過ZooKeeper的心跳機制)
2.如果是slave,就必須復制leader,而且不能落后太遠。
滿足上述兩個條件的節點,我們更愿意叫“已同步(in sync)”而不是模糊不清的“存活”或“失敗”。leader保持跟蹤這些“已同步”的節點。如果follower掛了,或者卡住了,或者落后太遠了,leader會講起從已同步的副本名單中移除。是有e replica.lag.time.max.ms這配置去控制卡住多長時間和落后多少副本數量。
在分布式系統術語中,我們只嘗試處理一個“失敗/恢復”模型,即節點突然停止工作,然后恢復(可能不知道它們已經死亡)。kafka沒有處理所謂的“拜占庭式”的失敗,即節點產生任意或惡意的響應(可能是由于某些錯誤)。
現在,我們可以更精確地定義一個消息的提交,當所有副本都同步到分區,分區并且應用到其日志中時,就會被認為是提交的。只有提交的消息才會分發給消費者。這就意味著消費者不用擔心當leader崩潰時,消息會丟失。另一方面,生產者可以選擇等待消息提交或不提交,這取決與它們對延遲和持久化之間的權衡。生產者可以使用acks這配置來控制這權衡。注意,這“最小數據量(minimun number)”同步副本的數量設置,是指當消息都同步到所有副本后,kafka再去檢查時,檢查的最小數量。如果生產者對確認要求不太嚴格,則消息一發布就可以被使用了,即使同步副本數量還沒達到最小值。(這最小值可以低到只有一個,那就是leader)。
kafka保證消息不會丟,只要任何時候至少有一個已同步的副本存在。
kafka可以在節點故障的情況下可用。但存在網絡分區時,就可能無法使用了。
分區就是一個副本日志。副本日志是分布式數據系統的最基本的原語(primitvie)之一,而且有很多種實現方式。其他系統可以使用副本日志作為一種原語,用于在狀態機形式的分布式系統。
對于一系列值的順序達成一致的過程(通常編號為0、1、2、…),副本日志就是將其模型化。有很多方法可以實現這一點,但最簡單和最快的是leader來選擇序值。只有leader還存活,所喲follower都只需要復制值即可,順序由leader決定。
當然,如果leader不掛,那我們沒必要要follower。當leader崩潰時,我從follower中選擇出新的leader。但follower自己可能落后或崩潰,所以我們必須保證我們選擇的是最新的follower。日志復制算法必須這最基本的保證時,如果我們告訴客戶端消息已經提交了,而此時leader掛了,我們選擇的新leader也必須包含剛剛那個已經提交了的消息。這就產生了一個權衡:如果leader等待過多的follower確認消息,This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
如果你指定確認的數量和日志(與leader對比過的)的數量,這樣就保證有重疊性,那么這就叫法定人數(Quorums)。
這種權衡最常見的方法是,在提交決策和leader選舉中使用大多數投票。這不是kafka做的,但讓我們去探索它,了解它的利弊。假設我們有2f+1個副本。如果f+1節點收到消息,沒有超過f個節點失敗,則leader就保證所有消息都被提交,我們選擇新leader時也一樣。這是因為我們在任意節點上選擇f+1個節點,這f+1里必須至少有一個節點包含所有已提交消息的副本。副本最完整的結點將會被選中為新leader。這里還有很多算法細節需要處理(如明確定義日志的完整性,leader崩潰時怎么保證一致性,修改集群中的服務器),這些我們先暫時忽略。
多數投票方法有個非常好的特性:延遲僅僅取決于多臺最快的服務器。也就是說,如果副本因子時3,那么延遲由最快的一個slave決定,而不是最慢的slave(leader一個、最快的slave一個,這就達到法定人數了)。
這個家族有很多算法,包括ZooKeeper的Zab,Raft和Viewstamped副本算法。我們知道的,更接近kafka的用的算法的學術出版是來自微軟的PacificA。
多數投票的不足之處就是,它不需要很多失敗的節點,就可以讓你選擇不到leader。為了容忍一個節點失敗,則需要3個節點,容忍2個,則需要5個節點。在我們經驗里,以為只要剛剛好夠冗余的副本,就能容忍一個節點的失敗,但這是不實際的,在5倍硬盤空間(5個硬盤,每個硬盤占1/5吞吐)情況下,每次都要寫5次,這對于大量數據的問題時不切實際的。這也是為什么法定人數算法比較常用在集群的配置文件如ZooKeeper,而很少用在原數據存儲上。例如在HDFS的namenode的高可用是建立在多數人投票,但這成本很高的算法不會用在它的數據存儲上。
Kafka使用了一個稍微不太一樣的方法去選擇法定人數。kafka動態的維護一個ISR(in-sync replicas)集合,集合里面的節點都是已同步。只有這集合里面的人才適合選舉為leader。只有所有ISR都收到寫入分區,則這分區的寫入就會被認為已提交。這ISR保存在ZooKeeper。對于kafka的使用模型來說,這是一個重要的因素,那里有許多分區,并且確保leader的平衡很重要。ISR模型和f+1副本,一個kafka主題可以容忍f個失敗(總共就f+1個節點)。
我們想處理更多的用例,所以這個權衡我們覺得是合理的。在實際情況,對于容忍f個節點失敗,多數投票和ISR方法都是需要通用數量的副本確認(比如,容忍1個節點失敗,多數投票方法則需要3個副本和1個確認,ISR方法需要2個副本和1個確認)。確認提交而不需要由最慢的節點來確認這是多數投票方法的好處。但我們覺得這是可以通過由客戶端選擇是否阻塞消息提交,以及控制副本因子(降低)而增加吞吐量和磁盤空間來優化這個問題(這問題就是與多數投票對比)。
另一個重要的設計是,kafka不要求崩潰節點在所有數據完整的情況下恢復。在這個空間中,副本算法依賴于“穩定存儲”的存在并不少見,這種“穩定存儲”在任何故障恢復場景中都不能丟失,要保證一致性。這有兩個主要問題。首先,硬盤故障是我們在持久化數據系統的實際操作中最常見的問題,問題發生后,通常也不會完整地保留數據。其次,即使這不是一個問題,我們也不希望在每次寫入時都需要使用fsync,因為這樣會減少兩到三個數量級的性能。我們允許一個副本重新加入ISR的協議,這協議確保在重新加入之前,它必須完全重新同步,即使它在崩潰中丟失了未刷新的數據。
注意,Kafka對數據不丟失的保證是基于至少一個保持同步的副本。如果一個分區的副本都丟失了,則無法保證數據不丟失。
然而在實際情況下的系統當所有副本掛之后必須做一些合理的事情。如果很不辛遇到這種情況,意識到后面會發生什么這是很重要。可能會出現以下兩種情況:
1.等待ISR里的所有節點恢復,并選擇出新的leader(希望這leader還保存著所有的數據)。
2.選擇第一個副本(不需要是ISR里面的)恢復,作為leader。
以下是可用性和持久化的權衡。一、如果我們等待所有ISR副本恢復,則我們會等很長的時間。。二、如果副本的數據都丟了,則永遠無法恢復。最后一個就是,如果一個沒有同步的副本恢復,我們允許它為leader,則認為它的日志是最新的,哪怕它沒有包含所有已提交的消息。在0.11.0.0版本里默認的選擇第一個權衡,用等待來換取數據的一致性。這個是可以配置的,如果啟動時間比一致性重要,則修改這個 unclean.leader.election.enable。
這個困惑不僅僅kafka有。它存在與任何基于法定人數算法的場景。例如,在多數投票的場景,如果你是去大多數服務器,在剩余的服務器,你就必須在兩者選其中一個,不是失去100%的數據就是丟失數據的一致性。
生產者生成消息時,可以選擇0個,1個或者全部副本確認。注意這里的“全部副本確認”不能保證所有被分配副本的結點都能收到消息。默認的,當acks=all時,只要所有當前所有ISR都收到消息,則可以確認消息。例如,一個主題被設置為兩個副本和一個失敗(只有剩下一個ISR),然后所有acks=all的寫入都會是成功的。如果剩余的副本也失敗,這樣消息就會被丟失。盡管這確保了分區的最大可用性,但是這種行為可能不適合某些喜歡持久化而不是可用性的用戶。因此,我們提供了兩種頂級的配置,可用于更傾向于消息持久化而不是可用性:
1.關閉不清晰的leader選舉——如果所有副本變得不可用,直到最近的leader變得可用,所有分區才可以變得可以用。這有效地避免了消息丟失的風險。請參閱上一節不清晰的Leader選舉。
2.指定最小的ISR數量——只有高過這最小數量,消息才會被確認,這是為了避免在寫入一個副本時,而且副本掛了,導致消息丟失的風險。這個設置僅僅在生產者使用acks=all生效或保證消息在這數量以上的ISR確認。這個設置提供了一致性和高可用的權衡。ISR最小數量設置高一點,這樣更好的保證一致性。然而這樣會減少可用性,因為在ISR沒滿足這數量時,分區是不可用的。
上訴討論副本,也僅僅是一份日志,也就是主題的一個分區。然而kafka是管理成千上萬的分區。我們試圖以循環(round-robin)方式在集群中平衡分區,以避免在大數據量的主題的所有分區都在少量節點上。同樣地,我們試圖平衡leader,使每個節點都是其一定份額分區的leader。
對ledaer選舉過程進行優化也很重要,因為這是服務不可用的窗口期。一個簡單的leader選舉會在一個節點失敗后,在該節點內所有分區,每個分區都會舉行一次選舉。相反,我們選擇一個broker作為“controller”。這controller檢測broker層次的失敗,負責修改受故障影響的分區的leader。其結果是,我們能夠將許多需要的leadr變更批量處理,這使得選舉過程在大量的分區上變得更加便宜和快速。如果controller失敗了,其中一個存活的節點會變成新的controller。
日志壓縮保證kafka在每個分區,對于每個key,至少保存其最近的一條消息。這解決了那些需要當應用或系統崩潰后,重啟時需重新加載數據的場景。
到目前位置,我們只討論了簡單的數據保存方法,那就是當舊日志數據超過一定時間或達到一定大小的時候會被刪除。這個適用于每條相對獨立的消息,如臨時事件。然而,還有一類很重要的數據,那就是根據key修改數據,一種可變的數據(例如在數據庫表數據的變更那樣)。
我們討論一個具體的例子。一個主題包含了用戶emial信息,每次用戶更新他們的email信息,我們都會發送消息到topic,是根據他們的userid做主鍵。以下是我們發送的消息,userid是123,每條信息都對應著一次的email信息修改(省略號是省略其他userid的消息)。
123 => bill@microsoft.com . . . 123 => bill@gatesfoundation.org . . . 123 => bill@gmail.com
日志壓縮給了我們更細顆粒度保留數據機制,這樣我們就可以保證只保留每一個key最后的一次變更(如123 => bill@gmail.com)。這樣我們保證了日志里都包含了所有key的最后一個值的快照。這就意味著下游的消費者可以重建狀態而不需要保存所有的更變日志。
讓我們一些日志壓縮有用的場景,然后我們在看看是怎么被使用上。
1.數據庫變更訂閱(Database chagne subscription)。我們很常見到一份數據集會存在多種數據系統里,而且這系統里有一個類似數據庫那樣的(如RDBMS或新潮的key-value系統)。舉個例子,你有一個數據庫、一個緩存、一個搜索集群和一個Hadoop集群。這樣每次數據庫的修改,都得映射到那緩存、那搜索集群和最后在Hadoop里。在這個場景里,你只是需要實時最新更新的日志。但如果需要重新加載進緩存或恢復宕機的搜索節點,就可能需要完整的數據集。
2.事件源(Event sourcing)。這是一種應用設計風格,它將查詢和應用設計結合在一起,并使用日志作為程序的主要存儲。
3.高可用日志(Journaling for high-availability)。一個本地計算的進程可以通過變更日志來做到容錯,這樣另一個進程就能重新加載這些變更繼續處理。一個具體的例子就是流式查詢系統,如計數、匯總和其他“分組”操作。實時流式處理框架Samza就是使用這功能達到目的的。
在上述場景中,主要處理實時的變更,偶爾需要重新加載或重新處理時,能做的就只有重新加載所有數據。日志壓縮提供了這兩個功能,處理實時數據變更,和重新加載數據。這種使用日志的風格,詳情可參看點擊。
這思路很簡單。如果我們保存無窮無盡的日志,保存上述場景中每個變更日志,而且還是一開始就獲取每個系統的狀態。使用這個完整的日志,我們就可以恢復到任何一個時間點的狀態。但這種完整日志的假設時不切實際的,因為對于那些每一行記錄都在變更多次的系統,即使數據很小,日志也會無限的增長下去。那我們就簡單的丟棄舊日志,雖然可以限制空間的增長,但也無法重建狀態——因為舊日志被丟棄,可能一部分記錄的狀態無法重建。
相對于粗粒度的基于時間的數據保留策略,日志壓縮的策略是一種更細顆粒度,基于每一條記錄保存。這個想法是,有選擇性的刪除那些有多個變更記錄的同樣的key。這樣的日志就保證每個key都至少有一個最新的狀態。
數據保留策略可以為每個主題設置,所以一個集群里有些主題的保存策略可以設置為大小和時間來保存數據,有主題也可以通過壓縮保留。
這個功能的靈感是來自于LinkedIn里最古老且最成功的基礎架構——一個被稱為Databus的數據庫變更日志緩存系統。
跟大多數日志結構存儲系統不一樣的時,Kafka是為了訂閱而設計的,組織數據的形式也是為了更快的線性讀取和寫入。跟Databus不一樣之處是,kafka作為真實源(source-of-truth)存儲,即使上游數據源不具備可重用性的情況下,它還是挺有用的。
不管是傳統的RDBMS還是分布式的NoSQL存儲在數據庫中的數據總是會更新的,相同key的新記錄更新數據的方式簡單來說有兩種:
1.直接更新(找到數據庫中的已有位置以最新的值替換舊的值)。
2.追加記錄(保留舊的值,查詢時再合并,或者也有一個后臺線程會定期合并)。
采用追加記錄的做法可以在節點崩潰時用于恢復數據,還有一個好處是寫性能很高,因為是線性寫。
以下是各個數據系統的更新數據方式:
數據系統 | 更新數據追加到哪里 | 數據文件 | 是否需要壓縮 |
---|---|---|---|
ZooKeeper | log | snapshot | 不要,因為數據量不大 |
Redis | aof | rdb | 不需要,因為是內存數據庫 |
Cassandra | commit log | data.db | 需要,數據存在本地文件 |
HBase | commit log | HFile | 需要,數據存在HDFS |
Kafka | commit log | commit log | 需要,數據存在分區中的Segment里 |
這里有個更高層次的圖,展示kafka日志的邏輯存儲結構,框框的每個數字都是一條消息的偏移量(offset):
日志的頭部(Log Head)就是傳統的kafka日志。日志的尾部(Log Tail)則是被壓縮過的日志。Log Head是很密集的,偏移量時連續的,保留了所有的消息。值得注意的是在Log Tail的消息雖然被壓縮,但依然保留它一開始被寫入時的偏移量,這個偏移量是永遠不會被改變。而且這壓縮日志里的偏移量,在日志里依然時有效的。所以,時無法區分下一個更高的偏移量是什么,比如說,上面的例子,36、 37、 38都是屬于同一個位置。
以上說的都是數據更新時的日志壓縮,當然日志壓縮也支持刪除。當發送某個Key的最新版本的消息的內容為null,這個Key將被刪除(某種程度上也算是更新,如上面的例子就是把email信息置為null)。這個消息也稱刪除標志(delete marker),這個刪除標志會把之前跟這key相同的消息刪掉。但這刪除標志比較特殊,特殊之處是它是過一段時間才被刪除,從而騰出磁盤空間。而數據刪除的時間點會被標志為“刪除保留點(delte retention point)”,也就是如上圖所示,這個圖展示也很特別,你看看兩個是point而不是pointer,也不是指向某個消息,而是消息與消息之間。說明它是個時間點,而不是指向某個消息的指針pointer。
壓縮時通過后臺定期復制日志段(log segment)完成的。清除時并不會阻塞讀操作,而且還可以配置不超過一定的IO,從而避免影響消費者和生產者。壓縮日志段的過程如下:
日志壓縮提供了什么保證?(What guarantees does log compaction provide?)日志壓縮保證:
1.任何消費者只要是讀取日志的頭部的,都可以看到所有消息,頭部的消息不會被刪除。這些消息都是有連續的偏移量。Topic的min.compaction.lag.ms參數可用于保證在指定時間內該消息的存在,而不會被壓縮。這提供了消息呆在頭部(未被壓縮)的時間的底線。
2.依然保持則消息的有序性。壓縮永遠不會重新給消息排序,而僅僅是刪除其部分而已。
3.消息的偏移量永遠不會改變。它永遠標志著消息所在的位置。
4.任何從日志最開始的地方開始處理都會至少看到每個key的最終狀態。另外,只要消費者在delete.retention.ms(默認是24小時)這時間內達到日志的頭部,則將會看到所有刪除記錄的刪除標志。也就是說:由于刪除標志的移除和讀取是同時發生,所以如果錯過delete.retention.ms這時間,消費者會錯過刪除標志。
日志壓縮通過日志清除器(log cleaner)執行,后臺線程池復制日志段,移除那些存在于Log Head中的記錄。每個壓縮線程工作如下:
1.選擇Log Head中相對比Log Tail的比例高的日志。
2.創建Log Head中每個Key對應的最后一個偏移量的日志摘要。
3.從頭到尾的開始復制,在復制過程中刪除相同key的日志。新的、干凈的日志段將立刻被交換(swap)到日志里,所以只需一個額外的日志段大小的硬盤空間就可以(不需要全部日志的空間)。
4.Log Head的日志摘要實際上是一個空間緊湊的哈希表。每個實體只需要24個字節空間。所以8G的cleaner空間,可以處理大概366G的Log Head(假設每個消息大小為1k)。
Kafka是默認啟用日志清除器,是個線程池。如果要開啟指定主題的清理功能,你可以在日志里添加以下屬性:
log.cleanup.policy=compact
這個可以在創建主題時指定或修改主題時指定。
日志清除器可以設置多少消息在Log Head而不被刪除。這個啟用是通過設置壓縮時間段:
log.cleaner.min.compaction.lag.ms
如果不設置,則默認是除了最后一個segment之外,其余日志段都會被壓縮,即最后一個日志段不會被壓縮。任何已激活的日志段都不會被壓縮,就算消息的時間已經超過了上面配置的時間,這里的激活,是指有在消費。
配額(Quotas)Kafka集群有能力強制性地要求控制broker中客戶端使用的資源。以下是兩類客戶的quotas:
1.網絡帶寬quotas,具體到字節(從0.9版本開始)。
2.請求速率quotas,具體到CPU的利用率(網絡和IO的比值)。
生產者和消費者有可能生成/消費大量的數據或請求速率非常高,以致于占滿了broker的資源,導致網絡飽和broker拒絕給其他客戶端服務。使用quotas就能避免這個問題,在多租戶集群上尤為重要,因為一部分低質量的客戶可能會降低高質量客戶的用戶體驗。實際上,可以對API進行這樣的限制。
客戶組(Client groups)Kafka客戶標識是用戶主體(user principal),用于代表用戶在這安全的集群上的權限。在無鑒權的時候,broker通過可配置的PrincipalBuilder來提供用戶主體,用來分組。由客戶端應用選擇client-id作為客戶的邏輯分組。元組(user,client-id)則定義了一個安全邏輯組,共享user principal和chient-id。
quotas可以被應用到元組(user,client-id),user或client-id組。對于一個連接,匹配上的quota將會應用到此連接上。例如(user="test-user",client-id="test-client")擁有生產者quota是10MB/s,這個10MB的帶寬將會被user是“test-user”并且client-id是"test-client"的生產者進行共用。
quota可以按(user,client-id)配置,也可以按user組配置,也可以按client-id組配置。默認quota可以被任何級別的quota給覆蓋。這個機制類似于每個Topic可以覆蓋自己的。ZooKeeper的/config/users的quota可以覆蓋user和(user,client-id)的quota。/config/clients下的則可以覆蓋client-id的quota。這些ZooKeeper的覆蓋會即可在所以broker中生效,這樣我們就不需要修改配置時重啟服務器。詳情請點擊。
quota配置的優先級如下:
1./config/users//clients/ 2./config/users/ /clients/ 3./config/users/ 4./config/users/ /clients/ 5./config/users/ /clients/ 6./config/users/ 7./config/clients/ 8./config/clients/
broker的(quota.producer.default, quota.consumer.default)屬性來給每個client-id組設置默認的網絡帶寬。但后面的版本會刪除這些屬性。
client-id組的默認quota可以在ZooKeeper中配置。
網絡帶寬quota,具體到字節,而且是有組里的客戶一起共享。默認的,每個獨立的客戶組都有一個固定的網絡帶寬的quota。這quota配置在每個broker。
請求速率配額(Request Rate Quotas)請求速率quota,具體到時間的百分比,時間是在quota窗口里每個broker的處理請求的IO線程和網絡線程。 n%的quota代表一個線程的n%,所以quota總數是((num.io.threads+num.network.threads)×100)%。每個客戶組在一個quota窗口中最多使用n%的IO線程和網絡線程。由于分配給IO和網絡的線程數是根據broker主機的cpu個數,則每個請求速率quota代表著CPU的百分比。
實施(Enforcement)默認情況下,每個唯一的客戶組都會有一個集群配置好的固定的quota。這個quota是定義在每個broker上。我們決定由每個broer定義這些quota,而不是由集群為每個client統一設置一個quota的原因,是因為為了方便共享quota的設置。
如果Broker檢測到超過quota了,會怎么處理?在我們的解決方案中,我們是選擇降低速率,而不是直接返回錯誤。broker會去計算處理這問題的延遲時間,這段時間則不會立刻響應客戶端。這種超過quota的處理,對于客戶端來說是透明的。客戶端不需要做額外的操作。實際上,客戶端額外的動作,如果操作不好,還會加劇超過quota的問題。
字節率和線程利用率都會在多個小窗口中監測(一秒鐘有30個窗口),以便快速準確的糾正quota違規行為。
客戶端字節率在多個小窗口(例如每個1秒的30個窗口)上進行測量,以便快速檢測和糾正配額違規。 通常,大的測量窗口(例如,每30秒10個窗口)會導致大量的流量,然后是長時間的延遲,這對用戶體驗方面并不好。
參考和翻譯:
RabbitMQ官方 https://www.rabbitmq.com
Kafka官方 http://kafka.apache.org/docum...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/69265.html
閱讀 2756·2023-04-25 14:15
閱讀 2704·2021-11-04 16:11
閱讀 3395·2021-10-14 09:42
閱讀 442·2019-08-30 15:52
閱讀 2826·2019-08-30 14:03
閱讀 3546·2019-08-30 13:00
閱讀 2112·2019-08-26 11:40
閱讀 3308·2019-08-26 10:25