摘要:在種可能的狀態中,狀態是最容易理解的,可以給對應的副本發送多個消息不超過滑動窗口的限制,并適時地將窗口向前滑動。這是因為僅關心日志的部分,至于如何把日志中的內容更新到真正的狀態機中,是應用程序的任務。
作者:屈鵬
在 《TiKV 源碼解析(二)raft-rs proposal 示例情景分析》 中,我們主要介紹了 raft-rs 的基本 API 使用,其中,與應用程序進行交互的主要 API 是:
RawNode::propose 發起一次新的提交,嘗試在 Raft 日志中追加一個新項;
RawNode::ready_since 從 Raft 節點中獲取最近的更新,包括新近追加的日志、新近確認的日志,以及需要給其他節點發送的消息等;
在將一個 Ready 中的所有更新處理完畢之后,使用 RawNode::advance 在這個 Raft 節點中將這個 Ready 標記為完成狀態。
熟悉了以上 3 個 API,用戶就可以寫出基本的基于 Raft 的分布式應用的框架了,而 Raft 協議中將寫入同步到多個副本中的任務,則由 raft-rs 庫本身的內部實現來完成,無須應用程序進行額外干預。本文將對數據冗余復制的過程進行詳細展開,特別是關于 snapshot 及流量控制的機制,幫助讀者更深刻地理解 Raft 的原理。
一般 MsgAppend 及 MsgAppendResponse 的處理在 Raft leader 上,應用程序通過 RawNode::propose 發起的寫入會被處理成一條 MsgPropose 類型的消息,然后調用 Raft::append_entry 和 Raft::bcast_append 將消息中的數據追加到 Raft 日志中并廣播到其他副本上。整體流程如偽代碼所示:
fn Raft::step_leader(&mut self, mut m: Message) -> Result<()> { if m.get_msg_type() == MessageType::MsgPropose { // Propose with an empty entry list is not allowed. assert!(!m.get_entries().is_empty()); self.append_entry(&mut m.mut_entries()); self.bcast_append(); } }
這段代碼中 append_entry 的參數是一個可變引用,這是因為在 append_entry 函數中會為每一個 Entry 賦予正確的 term 和 index。term 由選舉產生,在一個 Raft 系統中,每選舉出一個新的 Leader,便會產生一個更高的 term。而 index 則是 Entry 在 Raft 日志中的下標。Entry 需要帶上 term 和 index 的原因是,在其他副本上的 Raft 日志是可能跟 Leader 不同的,例如一個舊 Leader 在相同的位置(即 Raft 日志中具有相同 index 的地方)廣播了一條過期的 Entry,那么當其他副本收到了重疊的、但是具有更高 term 的消息時,便可以用它們替換舊的消息,以便達成與最新的 Leader 一致的狀態。
在 Leader 將新的寫入追加到自己的 Raft log 中之后,便可以調用 bcast_append 將它們廣播到其他副本了。注意這個函數并沒有任何參數,那么 Leader 如何知道應該給每一個副本從哪一個位置開始廣播呢?原來在 Leader 上對每一個副本,都關聯維護了一個 Progress,該結構體定義如下:
pub struct Progress { pub matched: u64, // 該副本期望接收的下一個 Entry 的 index pub next_idx: u64, // 未 commit 的消息的滑動窗口 pub ins: Inflights, // ProgressState::Probe:Leader 每個心跳間隔中最多發送一條 MsgAppend // ProgressState::Replicate:Leader 在每個心跳間隔中可以發送多個 MsgAppend // ProgressState::Snapshot:Leader 無法再繼續發送 MsgAppend 給這個副本 pub state: ProgressState, // 是否暫停給這個副本發送 MsgAppend 了 pub paused: bool, // 一些其他字段…… }
如代碼注釋中所說的那樣,Leader 在給副本廣播新的日志時,會從對應的副本的 next_idx 開始。這就蘊含了兩個問題:
在剛開始啟動的時候,所有副本的 next_idx 應該如何設置?
在接收并處理完成 Leader 廣播的新寫入后,其他副本應該如何向 Leader 更新 next_idx?
第一個問題的答案在 Raft::reset 函數中。這個函數會在 Raft 完成選舉之后選出的 Leader 上調用,會將 Leader 的所有其他副本的 next_idx 設置為跟 Leader 相同的值。之后,Leader 就可以會按照 Raft 論文里的規定,廣播一條包含了自己的 term 的空 Entry 了。
第二個問題的答案在 Raft::handle_append_response 函數中。我們繼續考察上面的情景,Leader 的其他副本在收到 Leader 廣播的最新的日志之后,可能會采取兩種動作:
fn Raft::handle_append_entries(&mut self, m: &Message) { let mut to_send = Message::new_message_append_response(); match self.raft_log.maybe_append(...) { // 追加日志成功,將最新的 last index 上報給 Leader Some(last_index) => to_send.set_index(last_index), // 追加日志失敗,設置 reject 標志,并告訴 Leader 自己的 last index None => { to_send.set_reject(true); to_send.set_reject_hint(self.raft_log.last_index()); } } } self.send(to_send);
其他副本調用 maybe_append 失敗的原因可能是比 Leader 的日志更少,但是 Leader 在剛選舉出來的時候將所有副本的 next_idx 設置為與自己相同的值了。這個時候這些副本就會在 MsgAppendResponse 中設置拒絕的標志。在 Leader 接收到這樣的反饋之后,就可以將對應副本的 next_idx 設置為正確的值了。這個邏輯在 Raft::handle_append_response 中:
fn Raft::handle_append_response(&mut self, m: &Message, …) { if m.get_reject() { let pr: &mut Progress = self.get_progress(m.get_from()); // 將副本對應的 `next_idx` 回退到一個合適的值 pr.maybe_decr_to(m.get_index(), m.get_reject_hint()); } else { // 將副本對應的 `next_idx` 設置為 `m.get_index() + 1` pr.maybe_update(m.get_index()); } }
以上偽代碼中我們省略了一些丟棄亂序消息的代碼,避免過多的細節造成干擾。
pipeline 優化和流量控制機制上一節我們重點觀察了 MsgAppend 及 MsgAppendResponse 消息的處理流程,原理是非常簡單、清晰的。然而,這個未經任何優化的實現能夠工作的前提是在 Leader 收到某個副本的 MsgAppendResponse 之前,不再給它發送任何 MsgAppend。由于等待響應的時間取決于網絡的 TTL,這在實際應用中是非常低效的,因此我們需要引入 pipeline 優化,以及配套的流量控制機制來避免“優化”帶來的網絡壅塞。
Pipeline 在 Raft::prepare_send_entries 函數中被引入。這個函數在 Raft::send_append 中被調用,內部會直接修改對目標副本的 next_idex 值,這樣,后續的 MsgAppend 便可以在此基礎上繼續發送了。而一旦之前的 MsgAppend 被該目標副本拒絕掉了,也可以通過上一節中介紹的 maybe_decr_to 機制將 next_idx 重置為正確的值。我們來看一下這段代碼:
// 這個函數在 `Raft::prepare_send_entries` 中被調用 fn Progress::update_state(&mut self, last: u64) { match self.state { ProgressState::Replicate => { self.next_idx = last + 1; self.ins.add(last); }, ProgressState::Probe => self.pause(), _ => unreachable!(), } }
Progress 有 3 種不同的狀態,如這個結構體的定義的代碼片段所示。其中 Probe 狀態和 Snapshot 狀態會在下一節詳細介紹,現在只需要關注 Replicate 狀態。我們已經知道 Pipeline 機制是由更新 next_idx 的那一行引入的了,那么下面更新 ins 的一行的作用是什么呢?
從 Progress 的定義的代碼片段中我們知道,ins 字段的類型是 Inflights,可以想象成一個類似 TCP 的滑動窗口:所有 Leader 發出了,但是尚未被目標副本響應的消息,都被框在該副本在 Leader 上對應的 Progress 的 ins 中。這樣,由于滑動窗口的大小是有限的,Raft 系統中任意時刻的消息數量也會是有限的,這就實現了流量控制的機制。更具體地,Leader 在給某一副本發送 MsgAppend 時,會檢查其對應的滑動窗口,這個邏輯在 Raft::send_append 函數中;在收到該副本的 MsgAppendResponse 之后,會適時調用 Inflights 的 free_to 函數,使窗口向前滑動,這個邏輯在 Raft::handle_append_response 中。
ProgressState 相關優化我們已經在 Progress 結構體的定義以及上面一些代碼片段中見過了 ProgressState 這個枚舉類型。在 3 種可能的狀態中,Replicate 狀態是最容易理解的,Leader 可以給對應的副本發送多個 MsgAppend 消息(不超過滑動窗口的限制),并適時地將窗口向前滑動。然而,我們注意到,在 Leader 剛選舉出來時,Leader 上面的所有其他副本的狀態卻被設置成了 Probe。這是為什么呢?
從 Progress 結構體的字段注釋中,我們知道當某個副本處于 Probe 狀態時,Leader 只能給它發送 1 條 MsgAppend 消息。這是因為,在這個狀態下的 Progress 的 next_idx 是 Leader 猜出來的,而不是由這個副本明確的上報信息推算出來的。它有很大的概率是錯誤的,亦即 Leader 很可能會回退到某個地方重新發送;甚至有可能這個副本是不活躍的,那么 Leader 發送的整個滑動窗口的消息都可能浪費掉。因此,我們引入 Probe 狀態,當 Leader 給處于這一狀態的副本發送了 MsgAppend 時,這個 Progress 會被暫停掉(源碼片段見上一節),這樣在下一次嘗試給這個副本發送 MsgAppend 時,會在 Raft::send_append 中跳過。而當 Leader 收到了這個副本上報的正確的 last index 之后,Leader 便知道下一次應該從什么位置給這個副本發送日志了,這一過程在 Progress::maybe_update 函數中:
fn Progress::maybe_update(&mut self, n: u64) { if self.matched < n { self.matched = n; self.resume(); // 取消暫停的狀態 } if self.next_idx < n + 1 { self.next = n + 1; } }
ProgressState::Snapshot 狀態與 Progress 中的 pause 標志十分相似,一個副本對應的 Progress 一旦處于這個狀態,Leader 便不會再給這個副本發送任何 MsgAppend 了。但是仍有細微的差別:事實上在 Leader 收到 MsgHeartbeatResponse 時,也會調用 Progress::resume 來將取消對該副本的暫停,然而對于 ProgressState::Snapshot 狀態的 Progress 則沒有這個邏輯。這個狀態會在 Leader 成功發送完成 Snapshot,或者收到了對應的副本的最新的 MsgAppendResponse 之后被改變,詳細的邏輯請參考源代碼,這里就不作贅述了。
我們把篇幅留給在 Follower 上收到 Snapshot 之后的處理邏輯,主要是 Raft::restore_raft 和 RaftLog::restore 兩個函數。前者中主要包含了對 Progress 的處理,因為 Snapshot 包含了 Leader 上最新的信息,而 Leader 上的 Configuration 是可能跟 Follower 不同的。后者的主要邏輯偽代碼如下所示:
fn RaftLog::restore(&mut self, snapshot: Snapshot) { self.committed = snapshot.get_metadata().get_index(); self.unstable.restore(snapshot); }
可以看到,內部僅更新了 committed,并沒有更新 applied。這是因為 raft-rs 僅關心 Raft 日志的部分,至于如何把日志中的內容更新到真正的狀態機中,是應用程序的任務。應用程序需要從上一篇文章中介紹的 Ready 接口中把 Snapshot 拿到,然后自行將其應用到狀態機中,最后再通過 RawNode::advance 接口將 applied 更新到正確的值。
總結Raft 日志復制及相關的流量控制、Snapshot 流程就介紹到這里,代碼倉庫仍然在 https://github.com/pingcap/raft-rs,source-code 分支。下一期 raft-rs 源碼解析我們會繼續為大家帶來 configuration change 相關的內容,敬請期待!
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/18008.html
摘要:作者屈鵬本文為源碼解析系列的第二篇,按照計劃首先將為大家介紹依賴的周邊庫。值得注意的是,這個中并不包括持久化,也不會將應用到應用程序自己的狀態機的接口。在下一次這個節點調用時,便可以取出這部分被確認的消息,并應用到狀態機中了。 作者:屈鵬 本文為 TiKV 源碼解析系列的第二篇,按照計劃首先將為大家介紹 TiKV 依賴的周邊庫 raft-rs 。raft-rs 是 Raft 算法的 R...
摘要:而源碼解析系列文章則是會從源碼層面給大家抽絲剝繭,讓大家知道我們內部到底是如何實現的。我們希望通過該源碼解析系列,能讓大家對有一個更深刻的理解。 作者:唐劉 TiKV 是一個支持事務的分布式 Key-Value 數據庫,有很多社區開發者基于 TiKV 來開發自己的應用,譬如 titan、tidis。尤其是在 TiKV 成為 CNCF 的 Sandbox 項目之后,吸引了越來越多開發者的...
閱讀 1672·2021-11-12 10:35
閱讀 1618·2021-08-03 14:02
閱讀 2688·2019-08-30 15:55
閱讀 2028·2019-08-30 15:54
閱讀 762·2019-08-30 14:01
閱讀 2430·2019-08-29 17:07
閱讀 2254·2019-08-26 18:37
閱讀 3034·2019-08-26 16:51