摘要:一旦這一切完成,方法會運行在類屬性在命令構造后設置容器解析實例,在中我們設置了將使用的緩存驅動,我們也根據命令來決定我們調用什么方法。作業只在以上起效在上也無效處理作業方法調用觸發事件觸發事件。
譯文GitHub https://github.com/yuansir/diving-laravel-zh
原文鏈接https://divinglaravel.com/queue-system/workers
現在,我們知道了Laravel如何將作業推到不同的隊列中,讓我們來深入了解workers如何運作你的作業。 首先,我將workers定義為一個在后臺運行的簡單PHP進程,目的是從存儲空間中提取作業并針對多個配置選項運行它們。
php artisan queue:work
運行此命令將指示Laravel創建應用程序的一個實例并開始執行作業,這個實例將一直存活著,啟動Laravel應用程序的操作只在運行命令時發生一次,同一個實例將被用于執行你的作業,這意味著:
避免在每個作業上啟動整個應用程序來節省服務器資源。
在應用程序中所做的任何代碼更改后必須手動重啟worker。
你也可以這樣運行:
php artisan queue:work --once
這將啟動應用程序的一個實例,處理單個作業,然后干掉腳本。
php artisan queue:listen
queue:listen 命令相當于無限循環地運行 queue:work --once 命令,這將導致以下問題:
每個循環都會啟動一個應用程序實例。
分配的worker將選擇一個工作并執行。
worker進程將被干掉。
使用 queue:listen 確保為每個作業創建一個新的應用程序實例,這意味著代碼更改以后不必手動重啟worker,同時也意味著將消耗更多的服務器資源。
queue:work 命令我們來看看 QueueConsoleWorkCommand 類的 handle() 方法,這是當你運行 php artisan queue:work 時會執行的方法:
public function handle() { if ($this->downForMaintenance() && $this->option("once")) { return $this->worker->sleep($this->option("sleep")); } $this->listenForEvents(); $connection = $this->argument("connection") ?: $this->laravel["config"]["queue.default"]; $queue = $this->getQueue($connection); $this->runWorker( $connection, $queue ); }
首先,我們檢查應用程序是否處于維護模式,并使用 --once 選項,在這種情況下,我們希望腳本正常運行,因此我們不執行任何作業,我們只需要在完全殺死腳本前讓worker在一段時間內休眠。
QueueWorker 的 sleep() 方法看起來像這樣:
public function sleep($seconds) { sleep($seconds); }為什么我們不能在 handle() 方法中返回null來終止腳本?
如前所述, queue:listen 命令在循環中運行 WorkCommand :
while (true) { // This process simply calls "php artisan queue:work --once" $this->runProcess($process, $options->memory); }
如果應用程序處于維護模式,并且 WorkCommand 立即終止,這將導致循環結束,下一個在很短的時間內啟動,最好在這種情況下導致一些延遲,而不是通過創建我們不會真正使用的大量應用程序實例。
監聽事件在 handle() 方法里面我們調用 listenForEvents() 方法:
protected function listenForEvents() { $this->laravel["events"]->listen(JobProcessing::class, function ($event) { $this->writeOutput($event->job, "starting"); }); $this->laravel["events"]->listen(JobProcessed::class, function ($event) { $this->writeOutput($event->job, "success"); }); $this->laravel["events"]->listen(JobFailed::class, function ($event) { $this->writeOutput($event->job, "failed"); $this->logFailedJob($event); }); }
在這個方法中我們會監聽幾個事件,這樣我們可以在每次作業處理中,處理完或處理失敗時向用戶打印一些信息。
記錄失敗作業一旦作業失敗 logFailedJob() 方法會被調用
$this->laravel["queue.failer"]->log( $event->connectionName, $event->job->getQueue(), $event->job->getRawBody(), $event->exception );
queue.failer 容器別名在 QueueQueueServiceProvider::registerFailedJobServices() 中注冊:
protected function registerFailedJobServices() { $this->app->singleton("queue.failer", function () { $config = $this->app["config"]["queue.failed"]; return isset($config["table"]) ? $this->databaseFailedJobProvider($config) : new NullFailedJobProvider; }); } /** * Create a new database failed job provider. * * @param array $config * @return IlluminateQueueFailedDatabaseFailedJobProvider */ protected function databaseFailedJobProvider($config) { return new DatabaseFailedJobProvider( $this->app["db"], $config["database"], $config["table"] ); }
如果配置了 queue.failed ,則將使用數據庫隊列失敗,并將有關失敗作業的信息簡單地存儲在數據庫表中的:
$this->getTable()->insertGetId(compact( "connection", "queue", "payload", "exception", "failed_at" ));運行worker
要運行worker,我們需要收集兩條信息:
worker的連接信息從作業中提取
worker找到作業的隊列
如果沒有使用 queue.default 配置定義的默認連接。您可以為 queue:work 命令提供 --connection=default 選項。
隊列也是一樣,您可以提供一個 --queue=emails 選項,或選擇連接配置中的 queue 選項。一旦這一切完成, WorkCommand::handle() 方法會運行 runWorker():
protected function runWorker($connection, $queue) { $this->worker->setCache($this->laravel["cache"]->driver()); return $this->worker->{$this->option("once") ? "runNextJob" : "daemon"}( $connection, $queue, $this->gatherWorkerOptions() ); }
在worker類屬性在命令構造后設置:
public function __construct(Worker $worker) { parent::__construct(); $this->worker = $worker; }
容器解析 QueueWorker 實例,在runWorker()中我們設置了worker將使用的緩存驅動,我們也根據--once 命令來決定我們調用什么方法。
如果使用 --once 選項,我們只需調用 runNextJob 來運行下一個可用的作業,然后腳本就會終止。 否則,我們將調用 daemon 方法來始終保持進程處理作業。
在開始工作時,我們使用 gatherWorkerOptions() 方法收集用戶給出的命令選項,我們稍后會提供這些選項,這個工具是 runNextJob 或 daemon 方法。
protected function gatherWorkerOptions() { return new WorkerOptions( $this->option("delay"), $this->option("memory"), $this->option("timeout"), $this->option("sleep"), $this->option("tries"), $this->option("force") ); }守護進程
讓我看看 Worker::daemon() 方法,這個方法的第一行調用了 Worker::daemon() 方法
protected function listenForSignals() { if ($this->supportsAsyncSignals()) { pcntl_async_signals(true); pcntl_signal(SIGTERM, function () { $this->shouldQuit = true; }); pcntl_signal(SIGUSR2, function () { $this->paused = true; }); pcntl_signal(SIGCONT, function () { $this->paused = false; }); } }
這種方法使用PHP7.1的信號處理, supportsAsyncSignals() 方法檢查我們是否在PHP7.1上,并加載 pcntl 擴展名。
之后pcntl_async_signals() 被調用來啟用信號處理,然后我們為多個信號注冊處理程序:
當腳本被指示關閉時,會引發SIGTERM。
SIGUSR2是用戶定義的信號,Laravel用來表示腳本應該暫停。
當暫停的腳本繼續進行時,會引發SIGCONT。
這些信號從Process Monitor(如 Supervisor )發送并與我們的腳本進行通信。
Worker::daemon() 方法中的第二行讀取最后一個隊列重新啟動的時間戳,當我們調用queue:restart 時該值存儲在緩存中,稍后我們將檢查是否和上次重新啟動的時間戳不符合,來指示worker在之后多次重啟。
最后,該方法啟動一個循環,在這個循環中,我們將完成其余獲取作業的worker,運行它們,并對worker進程執行多個操作。
while (true) { if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $this->pauseWorker($options, $lastRestart); continue; } $job = $this->getNextJob( $this->manager->connection($connectionName), $queue ); $this->registerTimeoutHandler($job, $options); if ($job) { $this->runJob($job, $connectionName, $options); } else { $this->sleep($options->sleep); } $this->stopIfNecessary($options, $lastRestart); }確定worker是否應該處理作業
調用 daemonShouldRun() 檢查以下情況:
應用程序不處于維護模式
Worker沒有暫停
沒有事件監聽器阻止循環繼續
如果應用程序在維護模式下,worker使用--force選項仍然可以處理作業:
php artisan queue:work --force
確定worker是否應該繼續的條件之一是:
$this->events->until(new EventsLooping($connectionName, $queue)) === false)
這行觸發 QueueEventLooping 事件,并檢查是否有任何監聽器在 handle() 方法中返回false,這種情況下你可以強制您的workers暫時停止處理作業。
如果worker應該暫停,則調用 pauseWorker() 方法:
protected function pauseWorker(WorkerOptions $options, $lastRestart) { $this->sleep($options->sleep > 0 ? $options->sleep : 1); $this->stopIfNecessary($options, $lastRestart); }
sleep 方法并傳遞給控制臺命令的 --sleep 選項,這個方法調用
public function sleep($seconds) { sleep($seconds); }
腳本休眠了一段時間后,我們檢查worker是否應該在這種情況下退出并殺死腳本,稍后我們看一下stopIfNecessary 方法,以防腳本不能被殺死,我們只需調用 continue; 開始一個新的循環:
if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $this->pauseWorker($options, $lastRestart); continue; }Retrieving 要運行的作業
$job = $this->getNextJob( $this->manager->connection($connectionName), $queue );
getNextJob() 方法接受一個隊列連接的實例,我們從隊列中獲取作業
protected function getNextJob($connection, $queue) { try { foreach (explode(",", $queue) as $queue) { if (! is_null($job = $connection->pop($queue))) { return $job; } } } catch (Exception $e) { $this->exceptions->report($e); $this->stopWorkerIfLostConnection($e); } }
我們簡單地循環給定的隊列,使用選擇的隊列連接從存儲空間(數據庫,redis,sqs,...)獲取作業并返回該作業。
要從存儲中retrieve作業,我們查詢滿足以下條件的最舊作業:
推送到 queue ,我們試圖從中找到作業
沒有被其他worker reserved
可以在給定的時間內運行,有些作業在將來被推遲運行
我們也取到了很久以來被凍結的作業并重試
一旦我們找到符合這一標準的作業,我們將這個作業標記為reserved,以便其他workers獲取到,我們還會增加作業監控次數。
監控作業超時下一個作業被retrieved之后,我們調用 registerTimeoutHandler() 方法:
protected function registerTimeoutHandler($job, WorkerOptions $options) { if ($this->supportsAsyncSignals()) { pcntl_signal(SIGALRM, function () { $this->kill(1); });the $timeout = $this->timeoutForJob($job, $options); pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0); } }
再次,如果 pcntl 擴展被加載,我們將注冊一個信號處理程序干掉worker進程如果該作業超時的話,在配置了超時之后我們使用 pcntl_alarm() 來發送一個 SIGALRM 信號。
如果作業所花費的時間超過了超時值,處理程序將會終止該腳本,如果不是該作業將通過,并且下一個循環將設置一個新的報警覆蓋第一個報警,因為進程中可能存在單個報警。
作業只在PHP7.1以上起效,在window上也無效 ˉ_(ツ)_/ˉ
處理作業runJob() 方法調用 process():
public function process($connectionName, $job, WorkerOptions $options) { try { $this->raiseBeforeJobEvent($connectionName, $job); $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( $connectionName, $job, (int) $options->maxTries ); $job->fire(); $this->raiseAfterJobEvent($connectionName, $job); } catch (Exception $e) { $this->handleJobException($connectionName, $job, $options, $e); } }
raiseBeforeJobEvent() 觸發 QueueEventsJobProcessing 事件, raiseAfterJobEvent() 觸發 QueueEventsJobProcessed 事件。 markJobAsFailedIfAlreadyExceedsMaxAttempts() 檢查進程是否達到最大嘗試次數,并將該作業標記為失敗:
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) { $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; if ($maxTries === 0 || $job->attempts() <= $maxTries) { return; } $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( "A queued job has been attempted too many times. The job may have previously timed out." )); throw $e; }
否則我們在作業對象上調用 fire() 方法來運行作業。
從哪里獲取作業對象getNextJob() 方法返回一個 ContractsQueueJob 的實例,這取決于我們使用相應的Job實例的隊列驅動程序,例如如果數據庫隊列驅動則選擇 QueueJobsDatabaseJob 。
循環結束在循環結束時,我們調用 stopIfNecessary() 來檢查在下一個循環開始之前是否應該停止進程:
protected function stopIfNecessary(WorkerOptions $options, $lastRestart) { if ($this->shouldQuit) { $this->kill(); } if ($this->memoryExceeded($options->memory)) { $this->stop(12); } elseif ($this->queueShouldRestart($lastRestart)) { $this->stop(); } }
shouldQuit 屬性在兩種情況下設置,首先listenForSignals() 內部的作為 SIGTERM 信號處理程序,其次在 stopWorkerIfLostConnection() 中
protected function stopWorkerIfLostConnection($e) { if ($this->causedByLostConnection($e)) { $this->shouldQuit = true; } }
在retrieving和處理作業時,會在幾個try ... catch語句中調用此方法,以確保worker應該處于被干掉的狀態,以便我們的Process Control可能會啟動一個新的數據庫連接。
causedByLostConnection() 方法可以在 DatabaseDetectsLostConnections trait中找到。
memoryExceeded() 檢查內存使用情況是否超過當前設置的內存限制,您可以使用 --memory 選項設置限制。
轉載請注明:?轉載自Ryan是菜鳥 | LNMP技術棧筆記
如果覺得本篇文章對您十分有益,何不 打賞一下
本文鏈接地址:?剖析Laravel隊列系統--Worker
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/23262.html
摘要:原文鏈接我們推送到隊列的每個作業都存儲在按執行順序排序的某些存儲空間中,該存儲位置可以是數據庫,存儲或像這樣的第三方服務。這個數字從開始,在每次運行作業時不斷增加。 原文鏈接https://divinglaravel.com/queue-system/preparing-jobs-for-queue Every job we push to queue is stored in som...
摘要:有幾種有用的方法可以使用將作業推送到特定的隊列在給定的秒數之后推送作業延遲后將作業推送到特定的隊列推送多個作業推送特定隊列上的多個作業調用這些方法之后,所選擇的隊列驅動會將給定的信息存儲在存儲空間中,供按需獲取。 原文鏈接https://divinglaravel.com/queue-system/pushing-jobs-to-queue There are several ways...
摘要:配有內置的隊列系統,可幫助您在后臺運行任務,并通過簡單的來配置系統在不同情況下起作用。您可以在中管理隊列配置,默認情況下它有使用不同隊列驅動的幾個連接,您可以看到項目中可以有多個隊列連接,也可以使用多個隊列驅動程序。 原文鏈接https://divinglaravel.com/queue-system/before-the-dive Laravel receives a request...
摘要:基于擴展實現真正的數據庫連接池這種方案中,項目占用的連接數僅僅為。一種是連接暫時不再使用,其占用狀態解除,可以從使用者手中交回到空閑隊列中這種我們稱為連接的歸隊。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。Swoft Github: https:...
摘要:已經取消了參數,都用來執行。取數據的過程事物處理已經打開。取得符合條件的隊列后程序會更新該條數據,并且更新完后即。 connections => [ .... database => [ driver => database, table => jobs, queue => defaul...
閱讀 1053·2023-04-25 17:51
閱讀 2858·2021-11-23 09:51
閱讀 1483·2021-11-08 13:21
閱讀 2457·2021-09-22 15:14
閱讀 1522·2019-08-30 12:48
閱讀 1086·2019-08-29 12:44
閱讀 1146·2019-08-26 12:21
閱讀 1403·2019-08-26 10:47