摘要:為了避免重復執行的問題,我們需要引入一個有序集合存放正在執行的任務,命名為。最后以上講述了一個任務調度程序的逐步演變,設計方案很大程度上參考了。
原文鏈接:https://blog.breezelin.cn/scheme-redis-task-queue.html
一個網關服務器就跟快餐店一樣,總是希望客人來得快、去得也快,這樣在相同時間內才可以服務更多的客人。如果快餐店的服務員在一個顧客點餐、等餐和結賬時都全程跟陪的話,那么這個服務員大部分時間都是在空閑的等待。應該有專門的服務員負責點餐,專門的服務員負責送餐,專門的服務員負責結賬,這樣才能提高效率。同樣道理,網關服務器中也需要分工明確。舉個例子:
假設有一個申請發送重置密碼郵件的網關接口,須知道發送一封郵件可能會花費上好幾秒鐘,如果網關服務器直接在線上給用戶發送重置密碼郵件,高并發的情況下就很容易造成網絡擁擠。但實際上,網關服務器并非一定要等待郵件發送成功后才能響應用戶,完全可以先告知用戶郵件會發送的,而后再在線下把郵件發送出去(就像快餐店里點餐的服務員跟顧客說先去找位置坐,飯菜做好后會有人給他送過去)。
那么是誰來把郵件發送出去呢?
任務隊列為了網關接口能夠盡快響應用戶請求,無需即時知道結果的耗時操作可以交由任務隊列機制來處理。
任務隊列機制中包含兩種角色,一個是任務生產者,一個是任務消費者,而任務隊列是兩者之間的紐帶:
生產者往隊列里放入任務;
消費者從隊列里取出任務。
任務隊列的整體運行流程是:任務生產者把當前操作的關鍵信息(后續可以根據這些信息還原出當前操作)抽象出來,比如發送重置密碼的郵件,我們只需要當前用戶郵箱和用戶名就可以了;任務生產者把任務放進隊列,實際就是把任務的關鍵信息存儲起來,這里會用到MySQL、Redis之類數據存儲工具,常用的是Redis;而任務消費者就不斷地從數據庫中取出任務信息,逐一執行。
任務生產者的工作是任務分發,一般由線上的網關服務程序執行;任務消費者的工作是任務調度,一般由線下的程序執行,這樣即使任務耗時再多,也不阻塞網關服務。
這里主要討論的是任務調度(任務消費者)的程序設計。
簡單直接假設我們用Redis列表List存儲任務信息,列表鍵名是queues:default,任務發布就是往列表queues:default后追加數據:
那么任務調度可以這樣簡單直接的實現:
handle($task); continue; } sleep(1); } } public function handle($task) { // do something time-consuming } } $worker = new Worker; $worker->schedule();意外保險上面代碼是直接從queues:default列表中移出第一個任務(lpop),因為handle($task)函數是一個耗時的操作,過程中若是遇到什么意外導致了整個程序退出,這個任務可能還沒執行完成,可是任務信息已經完全丟失了。保險起見,對schedule()函數進行以下修改:
handle($task); Redis::lpop("queues:default"); continue; } sleep(1); } } ...即在任務完成后才將任務信息從列表中移除。
延時執行queues:default列表中的任務都是需要即時執行的,但是有些任務是需要間隔一段時間后或者在某個時間點上執行,那么可以引入一個有序集合,命名為queues:default:delayed,來存放這些任務。任務發布時需要指明執行的時間點$time:
任務調度時,如果queues:default列表已經空了,就從queues:default:delayed集合中取出到達執行時間的任務放入queues:default列表中:
handle($task); Redis::lpop("queues:default"); continue; } $seri_arr = Redis::zremrangebyscore("queues:default:delayed", 0, time()); if($seri_arr) { Redis::rpush("queues:default", $seri_arr); continue; } sleep(1); } } ...任務超時預估任務正常執行所需的最大時間值,若是任務執行超過了這個時間,可能是過程中遇到一些意外,如果任由它繼續卡著,那么后面的任務就會無法被執行了。
首先我們給任務設定一個時限屬性timeout,然后在執行任務前先給進程本身設置一個鬧鐘信號,timeout后收到信號說明任務執行超時,需要退出當前進程(用supervisor守護進程時,進程自身退出,supervisor會自動再拉起)。
注意:pcntl_alarm($timeout)會覆蓋之前鬧鐘信號,而pcntl_alarm(0)會取消鬧鐘信號;任務超時后,當前任務放入queues:default:delayed集合中延時執行,以免再次阻塞隊列。timeoutHanle($task); $this->handle($task); Redis::lpop("queues:default"); continue; } $seri_arr = Redis::zremrangebyscore("queues:default:delayed", 0, time()); if($seri_arr) { Redis::rpush("queues:default", $seri_arr); continue; } pcntl_alarm(0); sleep(1); } } public function timeoutHanle($task) { $timeout = (int)$task->timeout; if ($timeout > 0) { pcntl_signal(SIGALRM, function () { $seri = Redis::lpop("queues:default"); Redis::zadd("queues:default:delayed", time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); } ...并發執行上面代碼,直觀上沒什么問題,但是在多進程并發執行的時候,有些任務可能會被重復執行,是因為沒能及時將當前執行的任務從queues:default列表中移出,其他進程也可以讀取到。為了避免重復執行的問題,我們需要引入一個有序集合SortedSet存放正在執行的任務,命名為queues:default:reserved。
首先任務是從queues:default列表中直接移出,然后開始執行任務前先把任務放進queues:default:reserved集合中,任務完成了再從queues:default:reserved集合中移出。
再結合任務超時,假設一個任務執行時間不可能超過60*60秒(可以按需調整),在queues:default列表為空的時候,queues:default:reserved集合中有任務已經存放超過了60*60秒,那么有可能是某些進程在執行任務是意外退出了,所以把這些任務放到queues:default:delayed集合中稍后執行。timeoutHanle($task); $this->handle($task); Redis::zrem("queues:default:reserved", $seri); continue; } $seri_arr = Redis::zremrangebyscore("queues:default:delayed", 0, time()); if($seri_arr) { Redis::rpush("queues:default", $seri_arr); continue; } $seri_arr = Redis::zremrangebyscore("queues:default:reserved", 0, time()-60*60); if($seri_arr) { foreach($seri_arr as $seri) { Redis::zadd("queues:default:delayed", time()+10, $seri); } } sleep(1); } } public function timeoutHanle($task) { $timeout = (int)$task->timeout; if ($timeout > 0) { pcntl_signal(SIGALRM, function () use ($task) { $seri = serialize($task); Redis::zrem("queues:default:reserved", $seri); Redis::zadd("queues:default:delayed", time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); } ...其他 失敗重試以上代碼沒有檢驗任務是否執行成功,應該有任務失敗的處理機制:比如給任務設定一個最多重試次數屬性retry_times,任務每執行一次retry_times,任務執行失敗時,若是retry_times等于0,則將任務放入queues:default:failed列表中不在執行;否則放入放到queues:default:delayed集合中稍后執行。
休眠時間以上代碼是進程忙時連續執行,閑時休眠一秒,可以按需調整優化。
事件監聽若是需要在任務執行成功或失敗時進行某些操作,可以給任務設定成功操作方法afterSucceeded()或失敗操作方法afterFailed(),在相應的時候回調。
最后以上講述了一個任務調度程序的逐步演變,設計方案很大程度上參考了Laravel Queue。
用工具,知其然,知其所以然。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/28333.html
摘要:架構消息代理,作為臨時儲存任務的中間媒介,為提供了隊列服務。生產者將任務發送到,消費者再從獲取任務。如果使用,則有可能發生突然斷電之類的問題造成突然終止后的數據丟失等后果。任務調度器,負責調度并觸發定時周期任務。 架構 showImg(https://segmentfault.com/img/bVbmDXa?w=831&h=413); Broker 消息代理,作為臨時儲存任務的中間媒...
閱讀 3819·2021-11-24 09:39
閱讀 1823·2021-11-02 14:41
閱讀 824·2019-08-30 15:53
閱讀 3487·2019-08-29 12:43
閱讀 1200·2019-08-29 12:31
閱讀 3093·2019-08-26 13:50
閱讀 803·2019-08-26 13:45
閱讀 996·2019-08-26 10:56