国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

【譯】RabbitMQ系列(二)-Work模式

lcodecorex / 3146人閱讀

摘要:每個消費者會得到平均數(shù)量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發(fā)送,會認為該消息沒有完整的執(zhí)行,會將該消息重新入隊。該消息會被發(fā)送給其他的。當消費者中斷退出,會重新分派。

Work模式

原文地址

在第一章中,我們寫了通過一個queue來發(fā)送和接收message的簡單程序。在這一章中,我們會創(chuàng)建一個workqueue,來將執(zhí)行時間敏感的任務分發(fā)到多個worker中。

work模式主要的意圖是要避免等待完成一個耗時的任務。取而代之地,我們延遲任務的執(zhí)行,將任務封裝成消息,將之發(fā)送到queue。一個運行著的worker進程會彈出這個任務并執(zhí)行它。當運行多個worker進程時,任務會在它們之間分派。

這種模式在web應用中特別有用,因為在一個較短的HTTP請求窗口中不會去執(zhí)行一個復雜的任務。

準備工作

在上一章中,我們發(fā)送了一個”Hello World!"的message。現(xiàn)在我們將發(fā)送一個代表了復雜任務的字符串。這不是一個實際的任務,比如像調整圖片大小或是重新渲染pdf文檔,我們通Thead.sleep() 來模擬一個耗時的任務。message中的小圓點表示其復雜度,圓點越多則任務的執(zhí)行越耗時。比如“Hello..."的message將耗時3秒。

我們簡單的修改上一章的Send.java代碼,允許在命令行發(fā)送任意message。新的類叫做NewTask.java

String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);

同樣的,我們修改上一章中的Recv.java,讓它在處理message的時候根據(jù)小圓點進行睡眠。新的類叫Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");
  System.out.println(" [x] Received "" + message + """);
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == ".") Thread.sleep(1000);
    }
}

像在第一章一樣編譯這兩個類

javac -cp $CP NewTask.java Worker.java
Round-robin分派

使用Task模式的一個明顯的優(yōu)勢是讓并行執(zhí)行任務變得簡單。我們只需要啟動更多的worker就可以消減堆積的message,系統(tǒng)水平擴展簡單。

首先,我們在同一時間啟動兩個worker。他們都會從queue獲得message,來看一下具體細節(jié)。

打開了三個終端,兩個是跑worker的。

java -cp $CP Worker
java -cp $CP Worker

第三個終端里來發(fā)布新的任務message。

java -cp $CP NewTask First message.
java -cp $CP NewTask Second message..
java -cp $CP NewTask Third message...
java -cp $CP NewTask Fourth message....
java -cp $CP NewTask Fifth message.....

讓我們看看worker的處理message的情況.第一個worker收到了第1,3,5message,第二個worker收到了第2,4個message。

默認情況下,RabbitMQ會順序的將message發(fā)給下一個消費者。每個消費者會得到平均數(shù)量的message。這種方式稱之為round-robin(輪詢).

Message 確認

執(zhí)行任務需要一定的時間。你可能會好奇如果一個worker開始執(zhí)行任務,但是中途異常退出,會是什么結果。在我們現(xiàn)在的代碼中,一旦RabbitMQ將消息發(fā)送出去了,它會立即將該message刪除。這樣的話,就可能丟失message。

在實際場景中,我們不想丟失任何一個task。如果一個worker異常中斷了,我們希望這個task能分派給另一個worker。

為了確保不會丟失message,RabbitMQ采用message確認機制。RabbitMQ只有收到該message的Ack之后,才會刪除該消息。

如果worker中斷退出了( channel關閉了,connection關閉了,或是TCP連接丟失了)而沒有發(fā)送Ack,RabbitMQ會認為該消息沒有完整的執(zhí)行,會將該消息重新入隊。該消息會被發(fā)送給其他的worker。這樣就不用message丟失,即使是在worker經常異常中斷退出的場景下。

不會有任何message會timeout。當消費者中斷退出,RabbitMQ會重新分派message。即使消息的執(zhí)行會花費很長的時間。

默認情況下,message是需要人工確認的。在上面的例子中,我們通過autoAck=true來關閉了人工確認。像下面這樣,我們將該標志設置為false,worker就需要在完成了任務之后,發(fā)送確認。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received "" + message + """);
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

上面的代碼保證即使當worker還在處理一條消息,而強制它退出,也不會丟失message。然后不久,所有未被確認的消息都會被重新分派。

發(fā)送確認必須和接收相同的channel。使用不同的channel進行確認會導致channel-level protocol 異常。

忘記確認消息是一個比較常見的錯誤,但是其后果是很嚴重的。當client退出時,message會被重新分派,但是RabbitMQ會占用越來越多的內存,因它無法釋放那些未被確認的message。
可以通過rabbitmqctl來打印messages_unacknowledged:
##linux
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 
##windows
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Message 持久化

我們學習了在消費者出現(xiàn)問題的時候不丟失message。但是如果RabbitMQ服務器宕機了,我們還是會丟失message。

當RabbitMQ宕機時,默認情況下,它會”忘記“所有的queue和message。為了確保message不丟失,我們需要確認兩件事情:我們要使得queue和message都是持久的。

首先,我們要確保RabbitMQ不會丟失我們設置好的queue。所以,我們要把它聲明成持久的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

雖然代碼沒有任何問題,但是光這樣是無效的。因為我們之前已經定義過名字為hello的queue。RabbitMQ不允許你使用不同的參數(shù)去重新定義一個已經存在的queue,而且這還不會反悔任何錯誤信息。但是我們還是有別的方法,讓我們使用一個別的名字,比如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

聲明queue的改變要在生產者和消費者的代碼里都進行修改。

接著我們要設置message的持久性,我們通過設置MessageProperties為PERSISTENT_TEXT_PLAIN:

import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
將message標記成持久的不能100%保證message不會丟失,雖然這告訴RabbitMQ將message保存到磁盤,然而在RabbitMQ從接到message到保存之間,仍然有一小段時間。同時RabbitMQ不會給每一條message執(zhí)行fsync(2) -- 可能只是保存到了cache而沒有寫到磁盤上去。所以持久的保證也不是非常強,然后對我們簡單的task queue來說則足夠了。如果需要一個非常強的保證,則可以使用發(fā)布確認的方式。
Fair 分派

你可能已經注意到分派的工作沒有如我們所期望的來執(zhí)行。比如在有2個worker的情況系,所有偶數(shù)的message耗時很長,而所有奇數(shù)的message則耗時很短,這樣其中一個worker則一直被分派到偶數(shù)的message,而另一個則一直是奇數(shù)的message。RabbitMQ對此并不知曉,進而繼續(xù)這樣分派著message。

這樣的原因是RabbitMQ是在message入queue的時候確定分派的。它不關心消費者ack的情況。

我們可以通過basicQos方法和prefetchCount(1)來解決這個問題。這個設置是讓RabbitMQ給worker一次一個message。或者這么說,直到worker處理完之前的message并發(fā)送ack,才給worker下一個message。否則,Rabbitmq會將message發(fā)送給其它不忙的worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意queue的大小。如果所有的worker都處于忙碌狀態(tài),queue可能會被裝滿。必須監(jiān)控queue深度,可能要開啟更多的worker,或者采取其他的措施。
開始執(zhí)行

NewTask.java的最終版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent "" + message + """);
    }
  }

}

Worker.java的最終版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received "" + message + """);
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == ".") {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

使用message ack和prefetchCount,來設定work queue。持久化選項則在RabbitMQ重啟后能讓任務得以恢復。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/73901.html

相關文章

  • RabbitMQ系列(三) - 發(fā)布/訂閱模式

    摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實我們是用到了默認的,用空字符串來標識。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關系的建立我們稱之為從現(xiàn)在開始這個就會將推向我們的隊列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個task都會派發(fā)給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會...

    WrBug 評論0 收藏0
  • RabbitMQ系列(六)-RPC模式

    摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請求,并阻塞知道結果返回。當有消息時,進行計算并通過指定的發(fā)送給客戶端。當接收到,則檢查。如果和之前的匹配,則將消息返回給應用進行處理。 RPC模式 在第二章中我們學習了如何使用Work模式在多個worker之間派發(fā)時間敏感的任務。這種情況是不涉及到返回值的,worker執(zhí)行任務就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...

    894974231 評論0 收藏0
  • [] RabbitMQ tutorials (2) ---- 'work queue&#

    摘要:這樣的消息分發(fā)機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發(fā)。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經聲明的隊列中收發(fā)...

    joyvw 評論0 收藏0
  • RabbitMQ系列(五) - 主題模式

    摘要:主題模式在上一章我們改進了我們的日志系統(tǒng),如果使用我們只能簡單進行廣播,而使用則允許消費者可以進行一定程度的選擇。為的會同時發(fā)布到這兩個。當為時,會接收所有的。當中沒有使用通配符和時,的行為和一致。 主題模式 在上一章我們改進了我們的日志系統(tǒng),如果使用fanout我們只能簡單進行廣播,而使用direct則允許消費者可以進行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個...

    pingan8787 評論0 收藏0
  • RabbitMQ系列(四) - 路由模式

    摘要:路由模式在之前的文章中我們建立了一個簡單的日志系統(tǒng)。更形象的表示,如對中的感興趣。為了進行說明,像下圖這么來設置如圖,可以看到有兩個綁到了類型為的上。如圖的設置中,一個為的就會同時發(fā)送到和。接收程序可以選擇要接收日志的嚴重性級別。 路由模式 在之前的文章中我們建立了一個簡單的日志系統(tǒng)。我們可以通過這個系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們在這之上,添加一個新的功...

    liuchengxu 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<