摘要:大家知道,一個消息隊列處理系統主要分為兩大部分消費者和生產者。任務系統實時的對任務隊列進行,出來一個任務就一個子進程,由子進程完成具體的任務邏輯。新的設計為了解決并發的問題,我們計劃做一個更加高效強壯的隊里處理系統。
背景
由于PHP不支持多線程,但是作為一個完善的系統,有很多操作都是需要異步完成的。為了完成這些異步操作,我們做了一個基于Redis隊列任務系統。
大家知道,一個消息隊列處理系統主要分為兩大部分:消費者和生產者。
在我們的系統中,主系統作為生產者,任務系統作為消費者。
具體的工作流程如下: 1、主系統將需要需要處理的任務名稱+任務參數push到隊列中。 2、任務系統實時的對任務隊列進行pop,pop出來一個任務就fork一個子進程,由子進程完成具體的任務邏輯。
具體代碼如下:
/** * 啟動守護進程 */ public function runAction() { Tools::log_message("ERROR", "daemon/run" . " | action: restart", "daemon-"); while (true) { $this->fork_process(); } exit; } /** * 創建子進程 */ private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子進程 $pid = posix_getpid(); //echo "* Process {$pid} was created "; $this->mq_process(); exit; } else {//主進程 $pid = pcntl_wait($status, WUNTRACED); //取得子進程結束狀態 if (pcntl_wifexited($status)) { //echo " * Sub process: {$pid} exited with {$status}"; //Tools::log_message("INFO", "daemon/run succ" . "|status:" . $status . "|pid:" . $ppid . "|childpid:" . $pid ); } else { Tools::log_message("ERROR", "daemon/run fail" . "|status:" . $status . "|pid:" . $ppid . "|childpid:" . $pid, "daemon-"); } } } /** * 業務任務隊列處理 */ private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = "_task_" . $data["worker"]; $class_name = isset($data["class"]) ? $data["class"] : "TaskproModel"; $params = $data["params"]; $class = new $class_name(); $class->$worker($params); return TRUE; }
這是一個簡單的任務處理系統。
通過這個任務系統幫助我們實現了異步,到目前為止已經穩定運行了將近一年。
但很可惜,它是一個單進程的系統。它是一直在不斷的fork,如果有任務就處理,沒有任務就跳過。
這樣很穩定。
但問題有兩個:一是不斷地fork、pop會浪費服務器資源,二是不支持并發!
第一個問題還好,但第二個問題就很嚴重。
當主系統 同時 拋過來大量的任務時,任務的處理時間就會無限的拉長。
新的設計
為了解決并發的問題,我們計劃做一個更加高效強壯的隊里處理系統。
因為在PHP7之前不支持多線程,所以我們采用多進程。
從網上找了不少資料,大多所謂的多進程都是N個進程同時在后臺運行。
顯然這是不合適的。
我的預想是:每pop出一個任務就fork一個任務,任務執行完成后子進程結束。
遇到的問題
1、如何控制最大進程數
這個問題很簡單,那就是每fork一個子進程就自增一次。而當子進程執行完成就自減一次。
自增沒有問題,我們就在主進程中操作就完了。那么該如何自減呢?
可能你會說,當然是在子進程中啊。但這里你需要注意:當fork的時候是從主進程復制了一份資源給子進程,這就意味著你無法在子進程中操作主進程中的計數器!
所以,這里就需要了解一個知識點:信號。
具體的可以自行Google,這里直接看代碼。
// install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, "sig_handler"));
這就安裝了一個信號處理器。當然還缺少一點。
declare(ticks = 1);
declare是一個控制結構語句,具體的用法也請去Google。
這句代碼的意思就是每執行一條低級語句就調用一次信號處理器。
這樣,每當子進程結束的時候就會調用信號處理器,我們就可以在信號處理器中進行自減。
2、如何解決進程殘留
在多進程開發中,如果處理不當就會導致進程殘留。
為了解決進程殘留,必須得將子進程回收。
那么如何對子進程進行回收就是一個技術點了。
在pcntl的demo中,包括很多博文中都是說在主進程中回收子進程。
但我們是基于Redis的brpop的,而brpop是阻塞的。
這就導致一個問題:當執行N個任務之后,任務系統空閑的時候主進程是阻塞的,而在發生阻塞的時候子進程還在執行,所以就無法完成最后幾個子進程的進程回收。。。
這里本來一直很糾結,但當我將信號處理器搞定之后就也很簡單了。
進程回收也放到信號處理器中去。
新系統的評估
pcntl是一個進程處理的擴展,但很可惜它對多進程的支持非常乏力。
所以這里采用Swoole擴展中的Process。
具體代碼如下:
declare(ticks = 1); class JobDaemonController extends Yaf_Controller_Abstract{ use Trait_Redis; private $maxProcesses = 800; private $child; private $masterRedis; private $redis_task_wing = "task:wing"; //待處理隊列 public function init(){ // install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, "sig_handler")); set_time_limit(0); ini_set("default_socket_timeout", -1); //隊列處理不超時,解決redis報錯:read error on connection } private function redis_client(){ $rds = new Redis(); $rds->connect("redis.master.host",6379); return $rds; } public function process(swoole_process $worker){// 第一個處理 $GLOBALS["worker"] = $worker; swoole_event_add($worker->pipe, function($pipe) { $worker = $GLOBALS["worker"]; $recv = $worker->read(); //send data to master sleep(rand(1, 3)); echo "From Master: $recv "; $worker->exit(0); }); exit; } public function testAction(){ for ($i = 0; $i < 10000; $i++){ $data = [ "abc" => $i, "timestamp" => time().rand(100,999) ]; $this->masterRedis->lpush($this->redis_task_wing, json_encode($data)); } exit; } public function runAction(){ while (1){ // echo " now we de have $this->child child processes "; if ($this->child < $this->maxProcesses){ $rds = $this->redis_client(); $data_pop = $rds->brpop($this->redis_task_wing, 3);//無任務時,阻塞等待 if (!$data_pop){ continue; } echo " Starting new child | now we de have $this->child child processes "; $this->child++; $process = new swoole_process([$this, "process"]); $process->write(json_encode($data_pop)); $pid = $process->start(); } } } private function sig_handler($signo) { // echo "Recive: $signo "; switch ($signo) { case SIGCHLD: while($ret = swoole_process::wait(false)) { // echo "PID={$ret["pid"]} "; $this->child--; } } } }
最終,經過測試,單核1G的服務器執行1到3秒的任務可以做到800的并發。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/30967.html
摘要:下文如無特殊聲明將使用進程同時表示進程線程。收到數據后服務器程序進行處理然后使用向客戶端發送響應。現在各種高并發異步的服務器程序都是基于實現的,比如。 并發 IO 問題一直是服務器端編程中的技術難題,從最早的同步阻塞直接 Fork 進程,到 Worker 進程池/線程池,到現在的異步IO、協程。PHP 程序員因為有強大的 LAMP 框架,對這類底層方面的知識知之甚少,本文目的就是詳細介...
摘要:在版本中我們將的進程管理模塊封裝成了類,現在可以在代碼中使用的進程管理器了。提供的進程管理器來自于,經過大量生產項目驗證,穩定性和健壯性都非常高。三任務投遞進程管理器自帶了消息隊列和消息投遞的支持。 在Swoole-2.1.2版本中我們將Server的進程管理模塊封裝成了PHP類,現在可以在PHP代碼中使用Swoole的進程管理器了。 在實際項目中經常需要寫一些長期運行的腳本,如基于r...
摘要:易用穩定,本次想通過對的學習和個人解析,吸收框架的思想和設計知識,加強自己對的認知和理解。當然,筆者能力水平有限,后續的文章如有錯誤,還請指出和諒解。目錄如下后續添加文章都會記錄在此服務啟動過程以及主體設計流程源碼解析 前言 swoole是什么?官網的原話介紹是這樣的: Swoole 使用純 C 語言編寫,提供了 PHP 語言的異步多線程服務器,異步 TCP/UDP 網絡客戶端,異步 ...
摘要:基于擴展實現真正的數據庫連接池這種方案中,項目占用的連接數僅僅為。一種是連接暫時不再使用,其占用狀態解除,可以從使用者手中交回到空閑隊列中這種我們稱為連接的歸隊。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。Swoft Github: https:...
摘要:為語言提供了強大的協程編程模式。提供的協程語法借鑒自,在此向開發組致敬協程可以與很好地互補。并發執行使用創建協程,可以讓和兩個函數變成并發執行。協程需要拿到請求的結果。 Swoole4為PHP語言提供了強大的CSP協程編程模式。底層提供了3個關鍵詞,可以方便地實現各類功能。 Swoole4提供的PHP協程語法借鑒自Golang,在此向GO開發組致敬 PHP+Swoole協程可以與...
閱讀 1696·2021-09-26 09:55
閱讀 3727·2021-09-22 15:31
閱讀 7409·2021-09-22 15:12
閱讀 2217·2021-09-22 10:02
閱讀 4679·2021-09-04 16:40
閱讀 1072·2019-08-30 15:55
閱讀 3027·2019-08-30 12:56
閱讀 1819·2019-08-30 12:44