摘要:任務隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。我們將任務封裝成一個消息發送給隊列,后臺的任務進程會得到這個任務并執行它,而且可以配置多個任務進程,進一步加大吞吐率。為了確保消息不丟失,支持消息確認。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現會更新在上面
前言在第一篇中我們描述了如何最簡單的RabbitMQ操作,如何發送、接受消息。在今天這篇文章中我們將描述如何創建一個任務隊列,來將高耗時的任務分發到多個消費者,從而提高處理效率。
任務隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。反之我們會把這個操作交給隊列,讓它延后再做。我們將任務封裝成一個消息發送給隊列,后臺的任務進程會得到這個任務并執行它,而且可以配置多個任務進程,進一步加大吞吐率。
特別是對于網絡請求,一次短短的HTTP請求是要求迅速響應的,不可能讓它一直停頓在高耗時操作上。
準備工作在第一章中我們發送了“Hello World!”。現在來完成更復雜一點的,因為這里并沒有真正的高耗時操作,比如縮放圖像或輸出一個pdf。因此我們只是用Thread.sleep()來假裝我們很繁忙,而且會用"."來表示需要停頓的秒數,比如一個叫Hello...的任務將停頓3秒鐘。
我們簡單的更改下Send.java,稱之為 NewTask.java.
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """);
然后是工具類
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
當然,我們的Recv.java也需要進行一些改造,它需要對每一個"."停頓1秒,Work.java如下
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == ".") Thread.sleep(1000); } }
編譯上面這些代碼
javac -cp $CP NewTask.java Worker.java輪詢調度
任務隊列的一個最大優點是可以并行工作,能夠非常容易的水平擴張。
首先,讓我們同時運行兩個工作線程,他們能夠同時從隊列獲取消息。我們也需要同時開啟3個console:1個生產者,2個消費者
消費者C1
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
消費者C2
# shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
讓我們運行生產者
# shell 3 java -cp $CP NewTask # => First message. java -cp $CP NewTask # => Second message.. java -cp $CP NewTask # => Third message... java -cp $CP NewTask # => Fourth message.... java -cp $CP NewTask # => Fifth message.....
讓我們看看消費者們
消費者C1
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received "First message." # => [x] Received "Third message..." # => [x] Received "Fifth message....."
消費者C2
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received "Second message.." # => [x] Received "Fourth message..
RabbitMQ默認有序的將會發送消息給下一個消費者,所以每一個消費者都會得到相同數量的消息,這種方式就叫做輪詢調度(round-robin),你可以嘗試下更多的消費者
消息確認一個任務可能非常耗時,如果消費者在做一個高耗時任務時掛掉了,我們將會丟失所有發送到這個消費者上的消息。這是非常不可取的,所以我們希望能夠明確的知道消息是否消費成功,如果一個消費掛了,我們能夠知道,并且將消息發送給下一個消費者。
為了確保消息不丟失,RabbitMQ支持消息確認。收到消息后消費者會給RabbitMQ服務器發送一個ack(我已經收到消息了),RabbitMQ就會在服務上刪除這個消息了。
如果一個消費者掛了(連接關閉,channel關閉,或者是TCP連接丟失)而沒有發送ack,RabbitMQ就會知道消息并沒有消費成功,于是乎消息會被放到消息隊列重新消費。如果此時還有其它消費者的話,消息會發送給其它消費者來消費,確保消息不會丟失
消息并沒有超時時間這個概念,消息只會在消費者掛掉了時候重發,即使是一個非常非常耗時的的消費者也不會發生重發
手動消息確認(Manual message acknowledgments)默認是打開的,雖然我們之前關閉了它:autoAck=true。讓我們先將它設置為false
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
這樣一來,即使你使用CTRL+C強制殺死了一個消費者,消費者所丟失的消息也將會被重發,會被另一個消費者所接受并消費。
忘記應答很容易犯忘記應答的錯誤,但會導致非常嚴重的后果。Messages會被重發,RabbitMQ會消耗越來越多的內存因為unacked的消息無法釋放(甚至更嚴重,RabbitMQ內部維護了一個最大打開線程數,如果太多的消息沒有應答,RabbitMQ甚至會整個崩潰掉)
你可以用Rabbitmqctl查看未被應答的消息數
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
windows下:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged消息持久化
我們現在知道了可以通過應答來保證消息不丟失,但萬一RabbitMQ掛了呢?還是可能會導致消息丟失。因此我們可以通過持久化的機制,包括將隊列以及隊列中的消息持久化的方式,來保證即便RabbitMQ掛了,當它重啟的時候,隊列以及消息也能夠恢復
首先做隊列的持久化,聲明隊列為durable
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
但很可惜的是,這種聲明方式并不適用與上面的方法,因為我們已經將“Hello”定義為一個非持久化的隊列了,是不能再將他改為持久化的,如果這樣做,將會直接返回一個error信息。所以,我們需要重新再定義一個隊列
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
在保證隊列的持久化后需要保證消息的持久化-將消息設置為PERSISTENT_TEXT_PLAIN
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());公平分發
但這樣還是存在問題:假設有如下的情形,一個消費者非常耗時,而一個消費者非常快,由于消息都是公平的發送,所以它們都是接收到相同數量的消息,會導致一個消費者非常忙碌,而另外一個消費者非常空閑,而RabbitMQ無法得知這一點。
為了解決這個缺陷我們引入了basicQos方法以及prefetchCount =1的設置。這會告訴RabbitMQ一次只給消費者一個消息:如果這個消息未確認,將不會發送新的消息,從而它會將消息發送給其它并不那么忙的消費者
int prefetchCount = 1; channel.basicQos(prefetchCount);留意queue size
如果所有的消費者都非常忙,隊列可能會很快被填滿,所以你需要留意這一點,要么增加更多的消費者,或者采取其它的策略。
整合NewTask.java
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent "" + message + """); channel.close(); connection.close(); } //... }
Worker.java
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == ".") { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用消息確認和prefetchCount你就能設置一個持久化隊列了,同時,使用durable和persist,,即使RabbitMQ掛掉了,重啟后也能夠重發消息
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/68099.html
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發送給相應的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https...
摘要:作為消息隊列的一個典型實踐,完全實現了標準,與的快快快不同,它追求的穩定可靠。同一個隊列不僅可以綁定多個生產者,而且能夠發送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的...
摘要:可以參考源碼,項目支持網站,最新文章或實現會更新在上面前言在訂閱發布中我們建立了一個簡單的日志系統,從而將消息廣播給一些消費者。因此,發送到路由鍵的消息會發送給隊列,發送到路由鍵或者的消息會發送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決...
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。主題交換機也可以當成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性...
摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考...
閱讀 3118·2021-11-23 09:51
閱讀 1983·2021-09-09 09:32
閱讀 1094·2019-08-30 15:53
閱讀 2965·2019-08-30 11:19
閱讀 2475·2019-08-29 14:15
閱讀 1443·2019-08-29 13:52
閱讀 560·2019-08-29 12:46
閱讀 2827·2019-08-26 12:18