摘要:把因執(zhí)行超時的隊列從集合重新到當前執(zhí)行的隊列中。從要執(zhí)行的隊列中取任務可以看到在取要執(zhí)行的隊列的時候,同時會放一份到一個有序集合中,并使用過期時間戳作為分值。
(原文鏈接:https://blog.tanteng.me/2017/...)
在 Laravel 中使用 Redis 處理隊列任務,框架提供的功能非常強大,但是最近遇到一個問題,就是發(fā)現(xiàn)一個任務被多次執(zhí)行,這是為什么呢?
先說原因:因為在 Laravel 中如果一個隊列(任務)執(zhí)行時間大于 60 秒,就會被認為執(zhí)行失敗并重新加入隊列中,這樣就會導致重復執(zhí)行同一個任務。
這個任務的邏輯就是給用戶推送內(nèi)容,需要根據(jù)隊列內(nèi)容取出用戶并遍歷,通過請求后端 HTTP 接口發(fā)送。比如有 10000 個用戶,在用戶數(shù)量多或接口處理速度沒那么快的情況下,執(zhí)行時間肯定會大于 60 秒,于是這個任務就被重新加入隊列。情況更糟糕一點,前面的任務如果都沒有在 60 秒執(zhí)行完,就都會重新加入隊列,這樣同一個任務就不止重復執(zhí)行一次了,而是多次。
下面從 Laravel 源代碼找一下罪魁禍首。
源代碼文件:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php
/** * The expiration time of a job. * * @var int|null */ protected $expire = 60;
這個 $expire 成員變量是一個固定的值,Laravel 認為一個隊列再怎么 60 秒也該執(zhí)行完了吧。取隊列方法:
public function pop($queue = null) { $original = $queue ?: $this->default; $queue = $this->getQueue($queue); $this->migrateExpiredJobs($queue.":delayed", $queue); if (! is_null($this->expire)) { $this->migrateExpiredJobs($queue.":reserved", $queue); } list($job, $reserved) = $this->getConnection()->eval( LuaScripts::pop(), 2, $queue, $queue.":reserved", $this->getTime() + $this->expire ); if ($reserved) { return new RedisJob($this->container, $this, $job, $reserved, $original); } }
取隊列有幾步操作,因為隊列執(zhí)行失敗,或執(zhí)行超時等都會放入另外的集合保存起來,以便重試,過程如下:
1.把因執(zhí)行失敗的隊列從 delayed 集合重新 rpush 到當前執(zhí)行的隊列中。
2.把因執(zhí)行超時的隊列從 reserved 集合重新 rpush 到當前執(zhí)行的隊列中。
3.然后才是從隊列中取任務開始執(zhí)行,同時把隊列放入 reserved 的有序集合。
這里使用了 eval 命令執(zhí)行這個過程,用到了幾個 lua 腳本。
從要執(zhí)行的隊列中取任務:
local job = redis.call("lpop", KEYS[1]) local reserved = false if(job ~= false) then reserved = cjson.decode(job) reserved["attempts"] = reserved["attempts"] + 1 reserved = cjson.encode(reserved) redis.call("zadd", KEYS[2], ARGV[1], reserved) end return {job, reserved}
可以看到 Laravel 在取 Redis 要執(zhí)行的隊列的時候,同時會放一份到一個有序集合中,并使用過期時間戳作為分值。
只有當這個任務完成后,再把有序集合中這個任務移除。從這個有序集合移除隊列的代碼就省略,我們看一下 Laravel 如何處理執(zhí)行時間大于 60 秒的隊列。
也就是這段 lua 腳本執(zhí)行的操作:
local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1]) if(next(val) ~= nil) then redis.call("zremrangebyrank", KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return true
這里 zrangebyscore 找出分值從無限小到當前時間戳的元素,也就是 60 秒之前加入到集合的任務,然后通過 zremrangebyrank 從集合移除這些元素并 rpush 到隊列中。
看到這里應該就恍然大悟了。
如果一個隊列 60 秒沒執(zhí)行完,那么進程在取隊列的時候從 reserved 集合中把這些任務又重新 rpush 到隊列中。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/26260.html
摘要:已經(jīng)取消了參數(shù),都用來執(zhí)行。取數(shù)據(jù)的過程事物處理已經(jīng)打開。取得符合條件的隊列后程序會更新該條數(shù)據(jù),并且更新完后即。 connections => [ .... database => [ driver => database, table => jobs, queue => defaul...
摘要:對于定時任務的基本用法,官網(wǎng)文檔已經(jīng)描述得很詳細了,這里不再多說。這種情況下如果定時任務能夠并行執(zhí)行,就不會有這樣的問題。這個時候我們希望能夠像隊列那樣,將定時任務分散到多臺服務器上。 定時任務 Scheduled Tasks 是 Laravel 提供的組件之一,稍微上點規(guī)模的項目應該都會用到,比如開發(fā)微信應用時通過定時任務去刷新access token,比如每天定時發(fā)推送提現(xiàn)用戶要記...
摘要:當查詢數(shù)據(jù)時,本地范圍允許我們創(chuàng)建自己的查詢構(gòu)造器鏈式方法。這樣便會知道這是一個本地范圍并且可以在查詢構(gòu)造器中使用。某些查詢構(gòu)造器不可用或者說可用但是方法名不同,關(guān)于這些請查閱所有集合的方法。 showImg(https://segmentfault.com/img/remote/1460000017877956?w=800&h=267); Laravel 因可編寫出干凈,可用可調(diào)試的...
摘要:高性能高精度定時服務,輕松管理千萬級定時任務。支持任務到期觸發(fā)和。支持創(chuàng)建延時任務和定時到期任務,和原生保持相同接口,輕松使用。不支持任務輸出任務鉤子及維護模式。是不指定任務名時自動生成,每個任務名必須唯一,相同任務名重復定義將會自動覆蓋。 Forsun高性能高精度定時服務,輕松管理千萬級定時任務。 定時服務項目地址:https://github.com/snower/forsun l...
閱讀 2474·2021-09-27 13:36
閱讀 2170·2019-08-29 18:47
閱讀 2136·2019-08-29 15:21
閱讀 1401·2019-08-29 11:14
閱讀 1987·2019-08-28 18:29
閱讀 1631·2019-08-28 18:04
閱讀 578·2019-08-26 13:58
閱讀 3214·2019-08-26 12:12