摘要:場景說明用于處理比較耗時的請求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁觸發(fā)執(zhí)行發(fā)送,程序會出現(xiàn)超時高并發(fā)場景,當(dāng)某個時刻請求瞬間增加時,可以把請求寫入到隊列,后臺在去處理這些請求搶購場景,先入先出的模式命令或往列表右側(cè)推入數(shù)據(jù)客戶端阻塞直到隊列有
場景說明:
用于處理比較耗時的請求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁觸發(fā)執(zhí)行發(fā)送,程序會出現(xiàn)超時
高并發(fā)場景,當(dāng)某個時刻請求瞬間增加時,可以把請求寫入到隊列,后臺在去處理這些請求
搶購場景,先入先出的模式
命令:rpush + blpop 或 lpush + brpop
rpush : 往列表右側(cè)推入數(shù)據(jù)簡單隊列: simple.php
blpop : 客戶端阻塞直到隊列有值輸出
$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush("goods:task", json_encode($row)); } $redis->close();
獲取20000萬個商品,并把json化后的數(shù)據(jù)推入goods:task隊列
queueBlpop.php// 出隊 while (true) { // 阻塞設(shè)置超時時間為3秒 $task = $redis->blPop(array("goods:task"), 3); if ($task) { $redis->rPush("goods:success:task", $task[1]); $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
設(shè)置blpop阻塞時間為3秒,當(dāng)有數(shù)據(jù)出隊時保存到goods:success:task表示執(zhí)行成功,當(dāng)隊列沒有數(shù)據(jù)時,程序睡眠10秒重新檢查goods:task是否有數(shù)據(jù)出隊
php simple.php php queueBlpop.php優(yōu)先級隊列
blpop 有多個鍵時,blpop會從左至右遍歷鍵,一旦一個鍵能彈出元素,客戶端立即返回。例如:
blpop key1 key2 key3 key4
從key1到key4遍歷,如果哪個key有值,則彈出這個值,若多個key同時有值時,優(yōu)先彈出排在左邊的key。
priority.php// 設(shè)置優(yōu)先級隊列 $high = "goods:high:task"; $mid = "goods:mid:task"; $low = "goods:low:task"; $stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { // cid 小于100放在低級隊列 if ($row["cid"] < 100) { $redis->rPush($low, json_encode($row)); } // cid 100到600之間放在中級隊列 elseif ($row["cid"] > 100 && $row["cid"] < 600) { $redis->rPush($mid, json_encode($row)); } // cid 大于600放在高級隊列 else { $redis->rPush($high, json_encode($row)); } } $redis->close();priorityBlop.php
// 優(yōu)先級隊列 $high = "goods:high:task"; $mid = "goods:mid:task"; $low = "goods:low:task"; // 出隊 while(true){ // 優(yōu)先級高的隊列放在左側(cè) $task = $redis->blPop(array($high, $mid, $low), 3); if ($task) { $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
優(yōu)先級高的隊列放在blpop命令左側(cè),依次排序,blpop命令會依次彈出high, mid, low隊列的值
php priority.php php priorityBlpop.php延遲隊列
可以用一個有序集合來保存延遲任務(wù),member保存任務(wù)內(nèi)容,score保存(當(dāng)前時間 + 延時時間)。用時間作為score。程序只要用有序集合的第一條任務(wù)的score和當(dāng)前時間做比較,如果當(dāng)前時間比score小,說明有序集合的所有任務(wù)還沒到執(zhí)行時間。
delay.php$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000"); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->zAdd("goods:delay:task", time() + rand(1, 300), json_encode($row)); }
將20萬條任務(wù)導(dǎo)入有序集合goods:delay:task,所有任務(wù)延遲到之后的1秒到300秒內(nèi)執(zhí)行
delayHandle.phpwhile (true) { // 因為是有序集合,只要判斷第一條記錄的延時時間,例如第一條未到執(zhí)行時間 // 相對說明集合的其他任務(wù)未到執(zhí)行時間 $rs = $redis->zRange("goods:delay:task", 0, 0, true); // 集合沒有任務(wù),睡眠時間設(shè)置為5秒 if (empty($rs)) { echo "no tasks , sleep 5 seconds" . PHP_EOL; sleep(5); continue; } $taskJson = key($rs); $delay = $rs[$taskJson]; $task = json_decode($taskJson, true); $now = time(); // 到時間執(zhí)行延時任務(wù) if ($delay <= $now) { // 對當(dāng)前任務(wù)加鎖,避免移動移動延時任務(wù)到任務(wù)隊列時被其他客戶端修改 if (!($identifier = acquireLock($task["id"]))) { continue; } // 移動延時任務(wù)到任務(wù)隊列 $redis->zRem("goods:delay:task", $taskJson); $redis->rPush("goods:task", $taskJson); echo $task["id"] . " run " . PHP_EOL; // 釋放鎖 releaseLock($task["id"], $identifier); } else { // 延時任務(wù)未到執(zhí)行時間 $sleep = $delay - $now; // 最大值設(shè)置為2秒,保證如果有新的任務(wù)(延時時間1秒)進入集合時能夠及時的被處理 // $sleep = $sleep > 2 ? 2 :$sleep; echo "wait " . $sleep . " seconds " . PHP_EOL; sleep($sleep); } }
這個文件對有序集合內(nèi)的延遲任務(wù)做處理,如果延遲任務(wù)到了執(zhí)行時間,則把延遲任務(wù)移動到任務(wù)隊列中
queueBlpop.php// 出隊 while (true) { // 阻塞設(shè)置超時時間為3秒 $task = $redis->blPop(array("goods:task"), 3); if ($task) { $redis->rPush("goods:success:task", $task[1]); $task = json_decode($task[1], true); echo $task["id"] . ":" . $task["cid"] . ":" . "handle success"; echo PHP_EOL; } else { echo "nothing" . PHP_EOL; sleep(5); } }
處理任務(wù)隊列中的任務(wù)
php delay.php php delayHanlde.php php queueBlpop.php
完整代碼:https://github.com/wuzhc/demo...
相關(guān)項目: https://github.com/wuzhc/gmq
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70394.html
摘要:如果任務(wù)沒有在規(guī)定時間內(nèi)完成,那么該有序集合的任務(wù)將會被重新放入隊列中。這兩個進程操縱了三個隊列,其中一個,負責(zé)即時任務(wù),兩個,負責(zé)延時任務(wù)與待處理任務(wù)。如果任務(wù)執(zhí)行成功,就會刪除中的任務(wù),否則會被重新放入隊列中。 在實際的項目開發(fā)中,我們經(jīng)常會遇到需要輕量級隊列的情形,例如發(fā)短信、發(fā)郵件等,這些任務(wù)不足以使用 kafka、RabbitMQ 等重量級的消息隊列,但是又的確需要異步、重試...
摘要:配置項用于配置失敗隊列任務(wù)存放的數(shù)據(jù)庫及數(shù)據(jù)表。要使用隊列驅(qū)動,需要在配置文件中配置數(shù)據(jù)庫連接。如果應(yīng)用使用了,那么可以使用時間或并發(fā)來控制隊列任務(wù)。你可以使用命令運行這個隊列進程。如果隊列進程意外關(guān)閉,它會自動重啟啟動隊列進程。 一、概述 在Web開發(fā)中,我們經(jīng)常會遇到需要批量處理任務(wù)的場景,比如群發(fā)郵件、秒殺資格獲取等,我們將這些耗時或者高并發(fā)的操作放到隊列中異步執(zhí)行可以有效緩解系...
閱讀 1881·2021-11-25 09:43
閱讀 3174·2021-11-15 11:38
閱讀 2715·2019-08-30 13:04
閱讀 491·2019-08-29 11:07
閱讀 1502·2019-08-26 18:37
閱讀 2738·2019-08-26 14:07
閱讀 591·2019-08-26 13:52
閱讀 2285·2019-08-26 12:09