摘要:為了解決以上問題,我們需要使用的生產者確認模式。在這樣的機制下,即使有一個消費者崩潰也不會丟失任何消息。即使處理一條消息會花費很長的時間。一些問題這個庫提供了心跳檢測的功能選項,但是沒有做自動重連的。參考文章深入學習四的模式
數據的持久化
對于非常健壯穩定的后臺系統,我們必須得考慮到各種宕機的情況:物理宕機,應用自身出錯崩潰等,而這個時候我們的應用需要做到重啟后數據依舊不丟失,這個問題就是數據持久化,也就是說數據持久化到了磁盤。
在RabbitMQ中,如果要保證消息發送到broker,我們首先需要做到三點
持久化的exchange(交換器):聲明時開啟durable選項
持久化的queue(隊列):聲明時開啟durable選項
持久化的message:delivery_mode設置為2(php,python之類的庫,2可以換成更友好的常量),在node的amqp.node庫中是設置persistent為true
需要注意的一點是,持久化會造成性能損耗(寫磁盤操作),但為了保證生產環境的數據一致性,我們必須這么做。
發送消息的confirm機制其實光光做到以上三點,數據依舊有丟失的可能,因為在客戶端成功調用api存入消息之后,RabbitMQ還需要一段時間(很短,但不可忽略)才能落盤,RabbitMQ并不是為每條消息都做fsync的處理,可能僅僅保存到cache中而不是物理磁盤上,而在這段時間內RabbitMQ broker發生crash, 消息保存到cache但是還沒來得及落盤,那么這些消息將會丟失。
為了解決以上問題,我們需要使用RabbitMQ的生產者確認模式。
為了開啟確認模式,需要生產者將channel設置成confirm模式,一旦channel進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號。
簡單confirm示例</>復制代碼
confirm模式最大的好處在于他是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息 (來自參考1)
示例代碼使用NodeJS實現,RabbitMQ服務可以使用上一篇RabbitMQ二三事的docker-compose.yml快速啟動
</>復制代碼
const QUEUE_NAME = "test_queue"
const config = require("./config")
const amqp = require("amqplib")
async function getMQConnection() {
return await amqp.connect({
protocol: "amqp",
hostname: config.MQ.host,
port: config.MQ.port,
username: config.MQ.user,
password: config.MQ.pass,
locale: "en_US",
frameMax: 0,
heartbeat: 5, // 心跳
vhost: config.MQ.vhost,
})
}
async function run(rmqConn, msgArr) {
try {
const channel = await rmqConn.createConfirmChannel() // 開啟confirm
const exchangeName = `${QUEUE_NAME}_exchange`
await channel.assertExchange(exchangeName, "direct", { durable: true, autoDelete: false }) // 不存在exchange就新建exchange
await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) // 不存在queue就新建
await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) // 綁定交換器
// queue name當routing key
msgArr.forEach(str => {
channel.publish(exchangeName, QUEUE_NAME, Buffer.from(str), { persistent: true, mandatory: true })
})
await channel.waitForConfirms()
console.log("發送批量數據成功")
await channel.close()
} catch(err) {
// do something with err
console.log("發送批量數據失敗:" + err.message)
}
}
async function testSendBatchMsg() {
const conn = await getMQConnection()
await run(conn, [
"cat",
"dog",
"pig",
"mouse",
"mouse",
"penguin"
])
await conn.close()
}
testSendBatchMsg()
說明
assertExchange和assertQueue是保證交換器和隊列一定存在,這里的exchange是簡單的direct交換器
ConfirmChannel#publish方法不返回promise
現在我們需要考慮我們的消費者了,消費者也會遇到程序出錯或者物理宕機問題,RabbitMQ官方也給出了一套解決方案,和confirm機制類似,就是ack機制(Message acknowledgment).
在ack機制中,消費者在自己處理完業務邏輯后,需要發送一個ack消息,然后broker才認為這條消息被正確消費,然后從內存和磁盤中移除掉它,只要沒收到消費者的acknowledgment,broker就會一直保存著這條消息.如果一個消費者崩潰(斷開了連接)卻沒有發送ack,broker會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。在這樣的機制下,即使有一個消費者崩潰也不會丟失任何消息。
</>復制代碼
const QUEUE_NAME = "test_queue"
const config = require("./config")
const amqp = require("amqplib")
async function getMQConnection() {
return await amqp.connect({
protocol: "amqp",
hostname: config.MQ.host,
port: config.MQ.port,
username: config.MQ.user,
password: config.MQ.pass,
locale: "en_US",
frameMax: 0,
heartbeat: 5, // 心跳
vhost: config.MQ.vhost,
})
}
async function sleep(ms) {
return new Promise(resolve =>
setTimeout(resolve, ms))
}
async function start() {
const mqConn = await getMQConnection()
console.log("connecting RabbitMQ successfully!")
const channel = await mqConn.createChannel()
const exchangeName = `${QUEUE_NAME}_exchange`
await channel.assertExchange(exchangeName, "direct", { durable: true, autoDelete: false })
await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false})
await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME)
channel.consume(QUEUE_NAME, async function(msg) {
console.log("Received msg: %s from %s", QUEUE_NAME, msg.content.toString())
console.log("consuming message...")
try {
await sleep(500) // 模擬消費消息
console.log("consuming ends")
channel.ack(msg) // 消費成功,發送ack
} catch(e) {
console.log("consuming failed: " + e.message)
channel.nack(msg) // 消費失敗,發送nack
}
}, {noAck: false}) // ack
}
start()
注意
自動ack是默認打開的,也就是說消息發送到消費者的時候就被自動ack了,而很多情況下,我們想要手動ack,所以我們需要顯式設置autoAsk=false關閉這種機制(在示例中是noAck: false)
ack沒有任何超時限制;只有當消費者斷開時,broker才會重新投遞。即使處理一條消息會花費很長的時間。
一些問題amqp.node這個庫提供了心跳檢測的功能(heartbeat選項),但是沒有做自動重連的。
對于heartbeat的值,RabbitMQ官網有說明
</>復制代碼
Several years worth of feedback from the users and client library
maintainers suggest that values lower than 5 seconds are fairly likely
to cause false positives, and values of 1 second or lower are very
likely to do so. Values within the 5 to 20 seconds range are optimal
for most environments.
所以心跳不宜設置的太低(因為短暫的網絡擁塞或者流控制),太低容易導致誤報,根據經驗5s-20s是比較合理的。
參考文章:
深入學習RabbitMQ(四):channel的confirm模式
when-publishes-are-confirmed
Channel-oriented API reference
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/31527.html
摘要:前端之前端之前言前言昨天學習了標記式語言,也就是無邏輯語言。今天學習,被稱之為網頁的化妝師。為前端頁面的樣式,由選擇器作用域與樣式塊組成。年初,組織負責的工作組開始討論第一版中沒有涉及到的問題。其討論結果組成了年月出版的規范第二版。前端之 CSS 前言 昨天學習了標記式語言,也就是無邏輯語言。了解了網頁的骨架是什么構成的,了解了常用標簽,兩個指令以及轉義字符;其中標簽可以分為兩大類: 一類...
摘要:初始狀態對應二叉樹結構將頂點與最后一個結點調換即將頂點與最后一個結點交換,然后將索引為止置。 showImg(https://segmentfault.com/img/bVbgOtL?w=1600&h=800); 本文首發于一世流云專欄:https://segmentfault.com/blog... 一、PriorityBlockingQueue簡介 PriorityBlockin...
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網提供的安裝方式本人安裝成功的方式第三步查看是否已經安裝好了,能查到說明已經安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
閱讀 1605·2021-09-30 09:47
閱讀 3610·2021-09-22 15:05
閱讀 2844·2021-08-30 09:44
閱讀 3627·2019-08-30 15:55
閱讀 1377·2019-08-30 13:08
閱讀 1332·2019-08-29 16:40
閱讀 557·2019-08-29 12:45
閱讀 1394·2019-08-29 11:25