摘要:作者屈鵬本篇源碼解析將為大家介紹的另一周邊組件。這個函數會往完成隊列中注冊若干個,相當于用往一個中注冊一些事件的關注。在函數返回之后,服務端的初始化及啟動過程便結束了。
作者:屈鵬
本篇 TiKV 源碼解析將為大家介紹 TiKV 的另一周邊組件—— grpc-rs。grpc-rs 是 PingCAP 實現的一個 gRPC 的 Rust 綁定,其 Server/Client 端的代碼框架都基于 Future,事件驅動的 EventLoop 被隱藏在了庫的內部,所以非常易于使用。本文將以一個簡單的 gRPC 服務作為例子,展示 grpc-rs 會生成的服務端代碼框架和需要服務的實現者填寫的內容,然后會深入介紹服務器在啟動時如何將后臺的事件循環與這個框架掛鉤,并在后臺線程中運行實現者的代碼。
基本的代碼生成及服務端 APIgRPC 使用 protobuf 定義一個服務,之后調用相關的代碼生成工具就可以生成服務端、客戶端的代碼框架了,這個過程可以參考我們的 官方文檔。客戶端可以直接調用這些生成的代碼,向服務端發送請求并接收響應,而服務端則需要服務的實現者自己來定制對請求的處理邏輯,生成響應并發回給客戶端。舉一個例子:
#[derive(Clone)] struct MyHelloService {} impl Hello for MyHelloService { // trait 中的函數簽名由 grpc-rs 生成,內部實現需要用戶自己填寫 fn hello(&mut self, ctx: RpcContext, req: HelloRequest, sink: UnarySink) { let mut resp = HelloResponse::new(); resp.set_to(req.get_from()); ctx.spawn( sink.success(resp) .map(|_| println!("send hello response back success")) .map_err(|e| println!("send hello response back fail: {}", e)) ); } }
我們定義了一個名為 Hello 的服務,里面只有一個名為 hello 的 RPC。grpc-rs 會為服務生成一個 trait,里面的方法就是這個服務包含的所有 RPC。在這個例子中唯一的 RPC 中,我們從 HelloRequest 中拿到客戶端的名字,然后再將這個名字放到 HelloResponse 中發回去,非常簡單,只是展示一下函數簽名中各個參數的用法。
然后,我們需要考慮的是如何把這個服務運行起來,監聽一個端口,真正能夠響應客戶端的請求呢?下面的代碼片段展示了如何運行這個服務:
fn main() { // 創建一個 Environment,里面包含一個 Completion Queue let env = Arc::new(EnvBuilder::new().cq_count(4).build()); let channel_args = ChannelBuilder::new(env.clone()).build_args(); let my_service = MyHelloWorldService::new(); let mut server = ServerBuilder::new(env.clone()) // 使用 MyHelloWorldService 作為服務端的實現,注冊到 gRPC server 中 .register_service(create_hello(my_service)) .bind("0.0.0.0", 44444) .channel_args(channel_args) .build() .unwrap(); server.start(); thread::park(); }
以上代碼展示了 grpc-rs 的足夠簡潔的 API 接口,各行代碼的意義如其注釋所示。
Server 的創建和啟動下面我們來看一下這個 gRPC server 是如何接收客戶端的請求,并路由到我們實現的服務端代碼中進行后續的處理的。
第一步我們初始化一個 Environment,并設置 Completion Queue(完成隊列)的個數為 4 個。完成隊列是 gRPC 的一個核心概念,grpc-rs 為每一個完成隊列創建一個線程,并在線程中運行一個事件循環,類似于 Linux 網絡編程中不斷地調用 epoll_wait 來獲取事件,進行處理:
// event loop fn poll_queue(cq: Arc) { let id = thread::current().id(); let cq = CompletionQueue::new(cq, id); loop { let e = cq.next(); match e.event_type { EventType::QueueShutdown => break, EventType::QueueTimeout => continue, EventType::OpComplete => {} } let tag: Box = unsafe { Box::from_raw(e.tag as _) }; tag.resolve(&cq, e.success != 0); } }
事件被封裝在 Tag 中。我們暫時忽略對事件的具體處理邏輯,目前我們只需要知道,當這個 Environment 被創建好之后,這些后臺線程便開始運行了。那么剩下的任務就是監聽一個端口,將網絡上的事件路由到這幾個事件循環中。這個過程在 Server 的 start 方法中:
/// Start the server. pub fn start(&mut self) { unsafe { grpc_sys::grpc_server_start(self.core.server); for cq in self.env.completion_queues() { let registry = self .handlers .iter() .map(|(k, v)| (k.to_owned(), v.box_clone())) .collect(); let rc = RequestCallContext { server: self.core.clone(), registry: Arc::new(UnsafeCell::new(registry)), }; for _ in 0..self.core.slots_per_cq { request_call(rc.clone(), cq); } } } }
首先調用 grpc_server_start 來啟動這個 Server,然后對每一個完成隊列,復制一份 handler 字典。這個字典的 key 是一個字符串,而 value 是一個函數指針,指向對這個類型的請求的處理函數——其實就是前面所述的服務的具體實現邏輯。key 的構造方式其實就是 /
接著我們創建一個 RequestCallContext,然后對每個完成隊列調用幾次 request_call。這個函數會往完成隊列中注冊若干個 Call,相當于用 epoll_ctl 往一個 epoll fd 中注冊一些事件的關注。Call 是 gRPC 在進行遠程過程調用時的基本單元,每一個 RPC 在建立的時候都會從完成隊列里取出一個 Call 對象,后者會在這個 RPC 結束時被回收。因此,在 start 函數中每一個完成隊列上注冊的 Call 個數決定了這個完成隊列上可以并發地處理多少個 RPC,在 grpc-rs 中默認的值是 1024 個。
小結以上代碼基本都在 grpc-rs 倉庫中的 src/server.rs 文件中。在 start 函數返回之后,服務端的初始化及啟動過程便結束了。現在,可以快速地用幾句話回顧一下:首先創建一個 Environment,內部會為每一個完成隊列啟動一個線程;接著創建 Server 對象,綁定端口,并將一個或多個服務注冊到這個 Server 上;最后調用 Server 的 start 方法,將服務的具體實現關聯到若干個 Call 上,并塞進所有的完成隊列中。在這之后,網絡上新來的 RPC 請求便可以在后臺的事件循環中被取出,并根據具體實現的字典分別執行了。最后,不要忘記 start 是一個非阻塞的方法,調用它的主線程在之后可以繼續執行別的邏輯或者掛起。
本篇源碼解析就到這里,下篇關于 grpc-rs 的文章我們會進一步介紹一個 Call 或者 RPC 的生命周期,以及每一階段在 Server 端的完成隊列中對應哪一種事件、會被如何處理,這一部分是 grpc-rs 的核心代碼,敬請期待!
原文鏈接:https://www.pingcap.com/blog-cn/tikv-source-code-reading-7/
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/18027.html
摘要:要說明的實現,需要先介紹的運行方式。封裝與實現細節通過上文的分析可以明顯看到,的通知機制其實和的通知機制非常類似。是用來處理上文提到的返回結果的。儲存的是和收到的消息。小結最后簡要總結一下的封裝和實現過程。 作者: 李建俊 上一篇《gRPC Server 的初始化和啟動流程》為大家介紹了 gRPC Server 的初始化和啟動流程,本篇將帶大家深入到 grpc-rs 這個庫里,查看 R...
摘要:而源碼解析系列文章則是會從源碼層面給大家抽絲剝繭,讓大家知道我們內部到底是如何實現的。我們希望通過該源碼解析系列,能讓大家對有一個更深刻的理解。 作者:唐劉 TiKV 是一個支持事務的分布式 Key-Value 數據庫,有很多社區開發者基于 TiKV 來開發自己的應用,譬如 titan、tidis。尤其是在 TiKV 成為 CNCF 的 Sandbox 項目之后,吸引了越來越多開發者的...
閱讀 2884·2021-10-14 09:50
閱讀 1229·2021-10-08 10:21
閱讀 3663·2021-10-08 10:16
閱讀 3069·2021-09-27 14:02
閱讀 3145·2021-09-23 11:21
閱讀 2134·2021-09-07 10:17
閱讀 416·2019-08-30 14:00
閱讀 2119·2019-08-29 17:26