摘要:已經取消了參數,都用來執行。取數據的過程事物處理已經打開。取得符合條件的隊列后程序會更新該條數據,并且更新完后即。
"connections" => [ .... "database" => [ "driver" => "database", "table" => "jobs", "queue" => "default", "expire" => 60, ], "redis" => [ "driver" => "redis", "connection" => "default", "queue" => "default", "expire" => 180, ], .... ],
Laravel5.2隊列驅動config/queue.php配置文件,“database”和“redis”有個expire參數,手冊上解釋是“隊列任務過期時間(秒)”,默認為60秒。
(注:5.2和之后的配置文件發生了變化,改為"retry_after" 參數,具體見手冊)
網上搜了一下這個配置,沒有太多說明,但是實際使用的過程中,發現對于執行時間超過expire設置時間的隊列進程,還有使用隊列進行分布式程序部署,這個參數和這種設計模式是個大坑。。。
發現這個問題是想使用分布式程序部署處理隊列,兩臺服務器部署Laravel框架artisan腳本,連接一個MYSQL數據庫,使用一張jobs隊列表。
部署的后,分別啟動兩臺服務器的腳本,發現后執行的腳本,在隊列驅動中取數據,如MYSQL的jobs表,遇到先執行的腳本隊列數據時不會跳過,而是把這條數據視為Failed,儲存一條新數據到failed_jobs表(Laravel隊列失敗時會將隊列數據儲存到failed_jobs表),造成數據重復。
之前在一臺服務器啟動3個進程執行腳本,并不會發生這種錯誤,后執行的腳本不會取得前一個進程的隊列數據,更不會判斷成Failed,多服務處理時是什么原因造成隊列驅動中的數據錯誤呢?
根據隊列執行的流程,程序執行時,隊列到隊列驅動中取任務,獲得任務的過程隊列驅動應該做事物處理,這樣第二個進程取任務會跳過正在執行的隊列數據。
查了一些資料,了解了Laravel隊列的原理,最后還得看Queue的源碼。
Laravel的Queue的源碼都在IlluminateQueue目錄下。
先分析以MYSQL為驅動的jobs表:
CREATE TABLE `jobs` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `queue` varchar(255) COLLATE utf8_unicode_ci NOT NULL, `payload` longtext COLLATE utf8_unicode_ci NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserved` tinyint(3) unsigned NOT NULL, `reserved_at` int(10) unsigned DEFAULT NULL, `available_at` int(10) unsigned NOT NULL, `created_at` int(10) unsigned NOT NULL, PRIMARY KEY (`id`), KEY `jobs_queue_reserved_reserved_at_index` (`queue`,`reserved`,`reserved_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
手冊中主要介紹了隊列任務的保存,payload字段儲存的是序列化后的任務,Laravel隊列可以將數據模型序列化,執行時候隊列系統會自動從數據庫中獲取整個模型實例,具體說明見手冊。
但是其他幾個狀態和時間字段才是保證隊列事物處理的關鍵字段。
“attempts”執行次數,“reserved”執行的狀態,“reserved_at”執行開始時間,‘available_at’預訂執行時間,‘created_at’是隊列創建時間。
監聽事件的腳本有Listener.php和Worker.php兩個腳本,看源碼說明Listener可以處理指定的隊列,connection參數,但是實際上最后都是通過work來處理隊列的。Laravel5.4已經取消了queue:listen參數,都用queue:work來執行。不過我這里說的是Laravel5.2的問題,不知道是不是下面的原因使Laravel優化去掉了listen。
繼續分析隊列處理的Worker類的源碼,取隊列數據時用pop方法,這個方法會根據傳遞的驅動類型如database或redis,調用該驅動的pop方法。
$connection = $this->manager->connection($connectionName); $job = $this->getNextJob($connection, $queue); // If we"re able to pull a job off of the stack, we will process it and // then immediately return back out. If there is no job on the queue // we will "sleep" the worker for the specified number of seconds. if (! is_null($job)) { return $this->process( $this->manager->getName($connectionName), $job, $maxTries, $delay ); }
下面是DatabaseQueue.php的pop方法。
/** * Pop the next job off of the queue. * * @param string $queue * @return IlluminateContractsQueueJob|null */ public function pop($queue = null) { $queue = $this->getQueue($queue); $this->database->beginTransaction(); if ($job = $this->getNextAvailableJob($queue)) { $job = $this->markJobAsReserved($job); $this->database->commit(); return new DatabaseJob( $this->container, $this, $job, $queue ); } $this->database->commit(); }
取數據的過程事物處理已經打開。
取隊列數據的核心還是$this->getNextAvailableJob($queue)。
打開sql日志,看看隊列數據是如何查詢出來的。
/** * Get the next available job for the queue. * * @param string|null $queue * @return StdClass|null */ protected function getNextAvailableJob($queue) { $this->database->enableQueryLog(); $job = $this->database->table($this->table) ->lockForUpdate() ->where("queue", $this->getQueue($queue)) ->where(function ($query) { $this->isAvailable($query); $this->isReservedButExpired($query); }) ->orderBy("id", "asc") ->first(); var_dump($this->database->getQueryLog()); return $job ? (object) $job : null; } array(1) { [0] => array(3) { "query" => string(165) "select * from `jobs` where `queue` = ? and ((`reserved` = ? and `available_at` <= ?) or (`reserved` = ? and `reserved_at` <= ?)) order by `id` asc limit 1 for update" "bindings" => array(5) { [0] => string(7) "default" [1] => int(0) [2] => int(1493634233) [3] => int(1) [4] => int(1493634173) } "time" => double(1.55) }
從sql語句中可以看出,取隊列數據有兩個條件
reserved為0時,available_at時間小于當前時間,這個條件是待執行的隊列;reserved為1時,reserved_at執行開始時間小于計算出的時間($this->isReservedButExpired),即當前時間減去超時秒Carbon::now()->subSeconds($this->expire)->getTimestamp(),這個條件是判斷隊列任務是否過期。
整個select過程是 “for update”的,有排他鎖。
取得符合條件的隊列后
/** * Mark the given job ID as reserved. * * @param stdClass $job * @return stdClass */ protected function markJobAsReserved($job) { $job->reserved = 1; $job->attempts = $job->attempts + 1; $job->reserved_at = $this->getTime(); $this->database->table($this->table)->where("id", $job->id)->update([ "reserved" => $job->reserved, "reserved_at" => $job->reserved_at, "attempts" => $job->attempts, ]); return $job; }
程序會更新該條數據,并且更新完后即commit。
同一服務器,第二個進程取數據時候遇到悲觀鎖,需要等第一個進程取數據更新reserved和時間后執行。也就是說Laravel隊列使用database時,并發的進程并不是同時取多條數據,而是取同一條數據等待其中一個進程update數據狀態和執行時間,隊列取得數據成功后第一個操作就是更新,所以第二個進程不會取到第一進程的同樣數據,除非是隊列過期。
在DatabaseQueue.php的pop方法中,取得隊列數據后,“$this->database->commit(); ”前 sleep(10),會很明顯的看到第二隊列沒有獲取其他隊列數據,說明“for update”只是update級排他鎖,不會排斥select。
Laravel使用database隊列有時候會有阻塞現象,不知道是不是這個原因造成的。
如果執行時間過長,超過‘expire’參數設置時間,第二隊列會取得第一個隊列數據,判斷超時,這時候就會根據設置的最大執行次數tries來判斷是插入新隊列數據繼續嘗試執行,還是插入到錯誤隊列“failed_jobs”表判斷隊列執行失敗。
以上就是Laravel使用mysql執行隊列的邏輯,之前提到的兩臺服務器部署Laravel框架執行artisan腳本,一個jobs表隊列Failed的問題就是服務器時間不一致的原因,后一臺服務器執行時候將前一隊列數據判斷為超時而插入到“failed_jobs”一條新數據,已經達到最大失敗次數的情況,否則還會插入新的數據繼續嘗試。
最后是隊列執行完的處理邏輯。
如果隊列執行成功會刪除jobs的數據,這沒什么問題。如果失敗,包括超時、異常等,會根據設置的最大失敗次數判斷是否插入一條新數據,或者插入一條Failed數據到“failed_jobs”表。
出現錯誤時,handleJobException的異常處理調用DatabaseQueue.php的release方法,$job->release($delay),最終是pushToDatabase實現。
插入新數據時候,attempts是失敗次數,reserved為0,available_at為當前時間戳加上延時時間參數,這樣整個隊列處理就形成了完整的數據邏輯操作。
Laravel5.4對隊列功能進行了很大的修改,手冊中的提示
任務過期和超時任務執行時間
在配置文件 config/queue.php 中,每個連接都定義了 retry_after 項。該配置項的目的是定義任務在執行以后多少秒后釋放回隊列。如果retry_after 設定的值為 90, 任務在運行 90 秒后還未完成,那么將被釋放回隊列而不是刪除掉。毫無疑問,你需要把 retry_after 的值設定為任務執行時間的最大可能值。
Laravel5.4去掉了queue的listen命令,work也增加了超時參數。Laravel5.5出來的時候應該升級上去。
附錄:Laravel5.2測試的腳本,之前網上搜出來的都比較早,還是把job寫成命令的方式,其實5.2以后job使用非常簡單。
jobs下定義job任務,handle可以增加一些測試方案,比如我這種拋出異常,直接Failed的
class MyJob extends Job implements ShouldQueue { use InteractsWithQueue, SerializesModels; private $key; private $value; /** * Create a new job instance. * * @return void */ public function __construct($key, $value) { $this->key = $key; $this->value = $value; } /** * Execute the job. * * @return void */ public function handle() { for($i=0;$i<20;$i++){ echo "$i "; sleep(1); } echo "sss ".$this->key." ".date("Y-m-d H:i:s")." "; throw new Exception("測試 "); // Redis::hset("queue.test", $this->key, $this->value); } public function failed() { dump("failed"); } }
控制器訪問設置任務隊列,key和value之前用了測試redis插入的,可以按自己的測試方案設置job參數。
for($i = 0; $i < 5; $i ++) { echo "$i"; $job = (new MyJob($i, $i))->delay(20); $this->dispatch($job); }
我的例子設置了5個隊列,開啟多個shell并發執行artisan測試吧。
本來想將redis隊列代碼讀完,一起發出來的,最近事情太多,redis代碼也沒怎么看。
redis驅動可以參考 http://www.cnblogs.com/z12987... 這篇文章對Laravel隊列redis驅動邏輯介紹的很詳細了,redis驅動使用的list和zset結構儲存隊列,執行過程會移除轉存隊列,沒有數據庫的“for update” 操作,所以應該不是存在隊列阻塞的情況。
BUT隊列任務過期時間設置和數據庫驅動是一樣的,所以同樣
queue:listen的執行時間參數 --timeout=60,一定要設置小于隊列任務過期時間expire參數!終于寫完了。。。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/22916.html
摘要:配置項用于配置失敗隊列任務存放的數據庫及數據表。要使用隊列驅動,需要在配置文件中配置數據庫連接。如果應用使用了,那么可以使用時間或并發來控制隊列任務。你可以使用命令運行這個隊列進程。如果隊列進程意外關閉,它會自動重啟啟動隊列進程。 一、概述 在Web開發中,我們經常會遇到需要批量處理任務的場景,比如群發郵件、秒殺資格獲取等,我們將這些耗時或者高并發的操作放到隊列中異步執行可以有效緩解系...
摘要:我在前面的文章中也提到了應該怎么做自我介紹與項目介紹,詳情可以查看這篇文章備戰春招秋招系列初出茅廬的程序員該如何準備面試。因此基于事件消息對象驅動的業務架構可以是一系列流程。 showImg(https://user-gold-cdn.xitu.io/2018/11/14/16711ac29c2ae52c?w=928&h=531&f=png&s=798562); 一 消息隊列MQ的...
摘要:深入淺出一直想致力于寫一篇關于廣義講解系統的文章,苦于時間有限,資源有限。事件驅動機制是通過內部單線程高效率地維護事件循環隊列來實現的,沒有多線程的資源占用和上下文的切換。 深入淺出Node.js 一直想致力于寫一篇關于廣義講解Node.js系統的文章,苦于時間有限,資源有限。這篇文章是在結合自己的學習心得以及與行業大佬共同探討下爭對于熟練掌握JS語言后的廣義Node.js.至于為什么...
閱讀 1879·2019-08-30 15:53
閱讀 3204·2019-08-30 15:44
閱讀 2813·2019-08-26 13:31
閱讀 1958·2019-08-26 12:10
閱讀 804·2019-08-26 11:01
閱讀 2133·2019-08-23 15:32
閱讀 1591·2019-08-23 13:43
閱讀 2547·2019-08-23 11:58