摘要:值得一提的是目前的服務即服務,暫沒有其他的服務功能,所以基本上相關的配置指代的就是。會將請求傳遞給各個中間件,最終最終傳遞給處理。源碼剖析系列目錄
作者:bromine
鏈接:https://www.jianshu.com/p/411...
來源:簡書
著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。
Swoft Github: https://github.com/swoft-clou...
Swoft提供了一個自建RPC(遠程方法調用)實現,讓你可以方便的調用其他Swoft上的服務。
RPC服務端的初始化RPC有兩種啟動方式Http伴隨啟動和RPC多帶帶啟動。值得一提的是目前swoole的tcp服務即RPC服務,暫沒有其他的tcp服務功能,所以基本上tcp相關的配置指代的就是RPC。
Http伴隨啟動Swoft 的 RPC 服務在Http服務啟動時候伴隨啟動
//SwoftHttpServerHttpHttpServer.php /** * Http Server */ class HttpServer extends AbstractServer /** * Start Server * * @throws SwoftExceptionRuntimeException */ public function start() { //code ... //根據.env配置文件Server區段的TCPABLE字段決定是否啟動RPC服務 if ((int)$this->serverSetting["tcpable"] === 1) { $this->registerRpcEvent(); } //code .... } }Swoole監聽
初始化流程即根據相關注解注冊一個swoole監聽
//SwoftHttpServerHttpHttpServer.php /** * Register rpc event, swoft/rpc-server required * * @throws SwoftExceptionRuntimeException */ protected function registerRpcEvent() { //含有@SwooleListener且type為SwooleEvent::TYPE_PORT的Bean,即RpcEventListener $swooleListeners = SwooleListenerCollector::getCollector(); if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) { throw new RuntimeException("Please use swoft/rpc-server, run "composer require swoft/rpc-server""); } //添加swoole RPC相關的tcp監聽端口,使用的是.env文件中的TCP區段配置 $this->listen = $this->server->listen($this->tcpSetting["host"], $this->tcpSetting["port"], $this->tcpSetting["type"]); $tcpSetting = $this->getListenTcpSetting(); $this->listen->set($tcpSetting); //根據RpcEventListener的相關注解添加監聽處理句柄 $swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0]; $this->registerSwooleEvents($this->listen, $swooleRpcPortEvents); }
由于是初版,根據@SwooleListener獲取RPC監聽Bean的相關處理暫時還有點生硬。
目前swoft中type為SwooleEvent::TYPE_PORT的@SwooleListener只有RpcEventListener一個,如果添加了同類Bean容易出問題,穩定版出的時候應該會有相關優化。
入口從SwoftHttpServerCommandServerCommand換成SwoftRpcServerCommandRpcCommand,流程和Http大同小異,就是swoole的設定監聽,僅僅是少了HTTP相關的監聽接口和事件而已,此處不再贅述。
RPC請求處理RPC服務器和HTTP服務器的區別僅僅在于與客戶端交互報文格式和報文所在的網絡層(Swoft的RPC基于TCP層次),運行原理基本相通,都是路由,中間件,RPC Service(對應Http的Controller),你完全可以以Http服務的思路去理解他。
Swoole的RPC-TCP監聽設置好后,RPC服務端就可以開始接受請求了。RpcEventListener的負責的工作僅僅是把收到的數據轉發給SwoftRpcServerServiceDispatcher分發。Dispatcher會將請求傳遞給各個Middleware中間件,最終最終傳遞給HandlerAdapterMiddleware處理。
PackerMiddlewarePackerMiddleware 是RPC中比較重要的一個中間件,負責將TCP請求中數據流解包和數據流封包。
getAttribute(self::ATTRIBUTE_DATA); $data = $packer->unpack($data); // 觸發一個RpcServerEvent::BEFORE_RECEIVE事件,默認只有一個用于添加請求上下文信息的BeforeReceiveListener // 利用中間件觸發流程關鍵事件的做法耦合有點高,猜測以后會調整 App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data); //替換解包后的解包到Request中,提供給后續中間件和Handler使用 $request = $request->withAttribute(self::ATTRIBUTE_DATA, $data); /* @var SwoftRpcServerRpcResponse $response */ $response = $handler->handle($request); //為Response封包返回給RPC客戶端 $serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE); $serviceResult = $packer->pack($serviceResult); return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult); } }RouterMiddleware
RouterMiddleware負責根據RPC請求的method,version,interface 獲取處理的RPC服務類,充當了路由的作用
getAttribute(PackerMiddleware::ATTRIBUTE_DATA); $method = $data["method"]??""; $version = $data["version"]??""; $interface = $data["interface"]??""; /* @var SwoftRpcServerRouterHandlerMapping $serviceRouter */ $serviceRouter = App::getBean("serviceRouter"); //路由匹配,即向SwoftRpcServerRouterHandlerMapping->$routes獲取RPC服務信息 $serviceHandler = $serviceRouter->getHandler($interface, $version, $method); // deliver service data $request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler); return $handler->handle($request); } }
Swoft啟動階段會掃描并初始化注解信息(參考注解章節),注解初始化完畢后會觸發一個AppEvent::APPLICATION_LOADER事件,此時會將來自@Service的所有RPC的路由信息注冊到SwoftRpcServerRouterHandlerMapping->$routes中,用于serviceRouter Bean的路由匹配。
HandlerAdapterMiddlewareHandlerAdapterMiddleware最終轉發請求給HandlerAdapter處理,HandlerAdapter會使用剛剛RouterMiddleware匹配到的服務類信息轉發請求并封裝Response最終返回給ServiceDispatcher,ServiceDispatcher會返回TCP流給客戶端然后結束本次請求。
getAttribute(PackerMiddleware::ATTRIBUTE_DATA); $params = $data["params"] ?? []; //路由解析出來的,處理該請求的服務Bean和方法 list($serviceClass, $method) = $handler; $service = App::getBean($serviceClass); // execute handler with params $response = PhpHelper::call([$service, $method], $params); $response = ResponseHelper::formatData($response); // 構造Response返回客戶端 if (! $response instanceof Response) { $response = (new Response())->withAttribute(self::ATTRIBUTE, $response); } return $response; } }RPC客戶端的實現
在Bean的屬性中聲明@Reference,Swoft即會根據@var聲明的類型注入相應的RPC客戶端實例。
/** * @Reference(name="useraaaaaa") * * @var DemoInterface */ private $demoService;
依賴注入的實現會專門另外用一篇文章多帶帶解釋,這里先看看RPC客戶端的相關代碼。
遠程代理namespace SwoftRpcClientService; /** * The proxy of service */ class ServiceProxy { /** * @param string $className * @param string $interfaceClass */ public static function loadProxyClass(string $className, string $interfaceClass) { $reflectionClass = new ReflectionClass($interfaceClass); $reflectionMethods = $reflectionClass->getMethods(ReflectionMethod::IS_PUBLIC); $template = "class $className extends SwoftRpcClientService implements {$interfaceClass} {"; //SwoftRpcClientService::class // the template of methods $template .= self::getMethodsTemplate($reflectionMethods); $template .= "}"; eval($template); } //code ... }
和AOP一樣,原理一樣是使用了動態代理,更具體的說法是動態遠程代理。
RPC動態客戶端類實現了客戶端聲明的Interface類型(如DemoInterface)并繼承了SwoftRpcClientService類。
動態類的實現很簡單,對于接口顯式聲明的方法,實際上都是調用SwoftRpcClientService->call()方法。
interface DemoInterface { /** * @param array $ids * @return array */ public function getUsers(array $ids); }
class 動態生成RPC客戶端類 extends SwoftRpcClientService implements AppLibDemoInterface { public function getUsers ( array $ids ) { $params = func_get_args(); return $this->call("getUsers", $params); } //code ... }
對于自動生成的defer方法,則是通過魔術方法__call(),調用SwoftRpcClientService->deferCall()
/** * @param string $name * @param array $arguments * * @return ResultInterface * @throws RpcClientException */ function __call(string $name, array $arguments) { $method = $name; $prefix = self::DEFER_PREFIX;//"defer" if (strpos($name, $prefix) !== 0) { throw new RpcClientException(sprintf("the method of %s is not exist! ", $name)); } if ($name == $prefix) { $method = array_shift($arguments); } elseif (strpos($name, $prefix) === 0) { $method = lcfirst(ltrim($name, $prefix)); } return $this->deferCall($method, $arguments); }
我們這里只看具有代表性的call()方法,deferCall()大致相同。
RPC客戶端動態類的本質是將客戶端的參數和接口信息根據Swoft自己的格式傳遞給RPC服務端,然后將服務器返回的數據解包取出返回值返回給RPC的調用者,對外偽裝成一個普通的對象,屏蔽遠程調用操作。
// SwoftRpcClientService.php /** * Do call service * * @param string $func * @param array $params * * @throws Throwable * @return mixed */ public function call(string $func, array $params) { $profileKey = $this->interface . "->" . $func; //根據@reference的fallback屬性獲取降級處理句柄,在RPC服務調用失敗的時候可以會使用fallback句柄代替 $fallback = $this->getFallbackHandler($func); try { $connectPool = $this->getPool(); $circuitBreaker = $this->getBreaker(); /* @var $client AbstractServiceConnection */ $client = $connectPool->getConnection(); //數據封包,和RPC服務端一致 $packer = service_packer(); $type = $this->getPackerName(); $data = $packer->formatData($this->interface, $this->version, $func, $params); $packData = $packer->pack($data, $type); //通過熔斷器調用接口 $result = $circuitBreaker->call([$client, "send"], [$packData], $fallback); if ($result === null || $result === false) { return null; } //和defercall不一致這里直接收包,解包 App::profileStart($profileKey); $result = $client->recv(); App::profileEnd($profileKey); $connectPool->release($client); App::debug(sprintf("%s call %s success, data=%", $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE))); $result = $packer->unpack($result); $data = $packer->checkData($result); } catch (Throwable $throwable) { if (empty($fallback)) { throw $throwable; } //RPC調用失敗則調用降級句柄,代替實際RPC服務直接返回 $data = PhpHelper::call($fallback, $params); } return $data; }熔斷器
熔斷器的swoft-RPC的另一重要概念,RPC的所有請求都通過熔斷器發送。
熔斷器使用狀態模式實現,熔斷器有開啟,半開,關閉 3種狀態,不同狀態下熔斷器會持有不同的狀態實例,狀態根據RPC調用情況切換,熔斷器根據持有狀態實例的不同,行為也有所不同。
circuitBreaker->serviceName . "服務,連接建立失敗(null)"); } if ($class instanceof Client && $class->isConnected() == false) { throw new Exception($this->circuitBreaker->serviceName . "服務,當前連接已斷開"); } //調用swoole協程客戶端的send()方法發送數據 $data = $class->$method(...$params); } catch (Exception $e) { //遞增失敗計數 if ($this->circuitBreaker->isClose()) { $this->circuitBreaker->incFailCount(); } App::error($this->circuitBreaker->serviceName . "服務,當前[關閉狀態],服務端調用失敗,開始服務降級容錯處理,error=" . $e->getMessage()); //RPC調用失敗則使用降級接口 $data = $this->circuitBreaker->fallback($fallback); } //失敗次數過線則切換狀態 $failCount = $this->circuitBreaker->getFailCounter(); $switchToFailCount = $this->circuitBreaker->getSwitchToFailCount(); if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) { App::trace($this->circuitBreaker->serviceName . "服務,當前[關閉狀態],服務失敗次數達到上限,開始切換為開啟狀態,failCount=" . $failCount); $this->circuitBreaker->switchToOpenState(); } App::trace($this->circuitBreaker->serviceName . "服務,當前[關閉狀態],failCount=" . $this->circuitBreaker->getFailCounter()); return $data; } }熔斷器開啟狀態策略
circuitBreaker->fallback(); App::trace($this->getServiceName() . "服務,當前[開啟狀態],執行服務fallback服務降級容錯處理"); $nowTime = time(); if ($this->circuitBreaker->isOpen() && $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime() ) { $delayTime = $this->circuitBreaker->getDelaySwitchTimer(); // swoole定時器不是嚴格的,3s容錯時間 ,定時切換狀態的半開 $switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3; App::getTimer()->addAfterTimer("openState", $delayTime, [$this, "delayCallback"]); $this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime); App::trace($this->getServiceName() . "服務,當前[開啟狀態],創建延遲觸發器,一段時間后狀態切換為半開狀態"); } return $data; } }熔斷器半開狀態策略
半開熔斷器是熔斷器關閉狀態和熔斷器開啟狀態的過度狀態,半開熔斷器的所有RPC調用都是加鎖的,連續成功或者連續失敗到閾值后會切換到關閉狀態或者開啟狀態,代碼類似,此處不再累述,有興趣的讀者可以自行研究。
Swoft源碼剖析系列目錄:https://segmentfault.com/a/11...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/28694.html
摘要:和服務關系最密切的進程是中的進程組,絕大部分業務處理都在該進程中進行。隨后觸發一個事件各組件通過該事件進行配置文件加載路由注冊。事件每個請求到來時僅僅會觸發事件。服務器生命周期和服務基本一致,詳情參考源碼剖析功能實現 作者:bromine鏈接:https://www.jianshu.com/p/4c0...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。S...
摘要:作者鏈接來源簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。同時順手整理個人對源碼的相關理解,希望能夠稍微填補學習領域的空白。系列文章只會節選關鍵代碼輔以思路講解,請自行配合源碼閱讀。 作者:bromine鏈接:https://www.jianshu.com/p/2f6...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。Swoft...
摘要:作者鏈接來源簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。前言為應用提供一個完整的容器作為依賴管理方案,是功能,模塊等功能的實現基礎。的依賴注入管理方案基于服務定位器。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/a23...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。Swof...
摘要:作為定時任務的執行者,通過每喚醒自身一次,然后把執行表遍歷一次,挑選當下需要執行的任務,通過投遞出去并更新該任務執行表中的狀態。 作者:bromine鏈接:https://www.jianshu.com/p/b44...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。Swoft Github: https://github.com/swoft-clou.....
摘要:基于擴展實現真正的數據庫連接池這種方案中,項目占用的連接數僅僅為。一種是連接暫時不再使用,其占用狀態解除,可以從使用者手中交回到空閑隊列中這種我們稱為連接的歸隊。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。Swoft Github: https:...
閱讀 2955·2023-04-25 22:16
閱讀 2120·2021-10-11 11:11
閱讀 3255·2019-08-29 13:26
閱讀 601·2019-08-29 12:32
閱讀 3420·2019-08-26 11:49
閱讀 2998·2019-08-26 10:30
閱讀 1950·2019-08-23 17:59
閱讀 1514·2019-08-23 17:57