国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RabbitMQ學習筆記

zacklee / 1022人閱讀

摘要:消息持久化控制的屬性就是消息的持久化。當生產者發送的消息路由鍵為時,兩個消費者都會收到消息并處理當生產者發送的消息路由鍵為時,只有消費者可以接收到消息。八的消息確認機制在中,可以通過持久化數據解決服務器異常的數據丟失問題。

一、內容大綱&使用場景 1. 消息隊列解決了什么問題?

異步處理

應用解耦

流量削鋒

日志處理

......

2. rabbitMQ安裝與配置 3. Java操作rabbitMQ

simple 簡單隊列
. work queues 工作隊列 公平分發 輪詢分發
. publish/subscribe 發布訂閱
. routing 路由選擇 通配符模式
. Topics 主題

手動和自動確認消息

隊列的持久化和非持久化

rabbitMQ的延遲隊列

4. Spring AMQP Spring-Rabbit 5. DEMO

MQ實現搜索引擎DIH增量

未支付訂單30分鐘 取消

類似百度統計 cnzz 架構 消息隊列

二、用戶及vhost配置 2.1 添加用戶

2.2 virtual hosts管理

virtual hosts相當于mysql的db

一般以/開頭

2.3 用戶授權

需要對用戶進行授權

三、簡單隊列 3.1 模型

P:消息生產者

紅色:隊列

C:消息消費者

包含三個對象:生產者、隊列、消費者

3.2 獲取mq連接
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {
    /**
     * 獲取MQ的連接
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //定義一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("127.0.0.1");
        //AMQP的端口
        factory.setPort(5672);
        //vhost
        factory.setVirtualHost("/vhost_mmr");
        factory.setUsername("rabbit");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        return connection;
    }
}
3.3 生產消息
import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_simple_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個通道
        Channel channel = connection.createChannel();

        //創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello world!";

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

        System.out.println("---send msg :" + msg);
        channel.close();
        connection.close();
    }
}
3.4 消費消息
import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receive {
    private static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("msg receive : " + msg);
            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);
    }
}
3.5 簡單隊列的不足

耦合性高,生產者一一對應消費者,如果需要多個消費者消費隊列中的消息,此時簡單隊列就無能為力了。

隊列名變更,源碼需要同時變更

四、Work隊列 4.1 模型

一個生產者將消息放入隊列中,可以有多個消費者進行消費

為什么會出現工作隊列?

Simple隊列:是一一對應的,實際開發中,生產者改善消息是毫不費力的,而消費者一般需要跟業務相結合,消費者接收到消息之后就需要處理,可能需要花費時間,此時隊列就會積壓很多消息。

4.2 輪詢分發

生產消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個通道
        Channel channel = connection.createChannel();

        //創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            String msg = "hello " + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            System.out.println("---send msg :" + msg);

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        channel.close();
        connection.close();
    }
}

消費者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        boolean ack = true;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv1 : " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        boolean ack = true;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

現象

消費者1和消費者2處理的消息是一樣多的,這種分發方式稱為輪詢分發(round-robin),不管誰忙或者誰閑,都不會多給或者少給。任務均分。

4.3 公平分發 fair dispatch

保證一次發送給消費者的消息不超過一條

        /**
         * 每個消費者發送確認消息之前,消息隊列不發送下一個消息給消費者,消費者一次只處理一個消息
         *
         * 限制發送給同一個消費者不得超過一條消息
         */
        int preFetchCount = 1;
        channel.basicQos(preFetchCount);

使用公平分發,必須關閉自動應答ack,改為手動

channel.basicAck(envelope.getDeliveryTag(), false);
boolean ack = false;//自動應答改為false
channel.basicConsume(QUEUE_NAME, ack, consumer);

生產消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個通道
        Channel channel = connection.createChannel();

        //創建隊列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /**
         * 每個消費者發送確認消息之前,消息隊列不發送下一個消息給消費者,消費者一次只處理一個消息
         *
         * 限制發送給同一個消費者不得超過一條消息
         */
        int preFetchCount = 1;
        channel.basicQos(preFetchCount);

        for (int i = 0; i < 50; i++) {
            String msg = "hello " + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            System.out.println("---send msg :" + msg);

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        channel.close();
        connection.close();
    }
}

消費消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv1 : " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
4.4 消息應答與消息持久化 4.4.1 消息應答
boolean ack = false;//自動應答改為false
channel.basicConsume(QUEUE_NAME, ack, consumer);

ack = true時為自動確認模式,一旦rabbitMQ將消息分發給消費者,該消息就會在內存中刪除;這種情況下,如果殺死正在處理消息的消費者,會丟失正在處理的消息;

ack = false時為手動回執(消息應答)模式,如果有一個消費者掛掉,就會將會給其他消費者,rabbitMQ支持消息應答,消費者發送一個消息應答,告訴rabbitMQ這個消息已經被處理,然后rabbitMQ就刪除內存中的消息;

消息應答默認打開,即為false;

由于消息在內存中存儲,如果rabbitMQ掛掉,消息仍然會丟失。

4.4.2 消息持久化
boolean durable = false;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

durable控制的屬性就是消息的持久化。

已經聲明好的隊列,如果durable已經為false了,就無法修改為true,rabbitMQ不允許重新定義(不同參數)一個已存在的隊列

五、訂閱模式 Publish/Subscribe 5.1 模型

解讀:

1、一個生產者,多個消費者;

2、每個消費者都有自己的隊列;

3、生產者沒有直接把消息發送到隊列,而是發送至交換機(eXchange)

4、每個隊列都要綁定到交換機上

5、生產者發送的消息,經過交換機,到達隊列,就能實現一個消息被多個消費者消費

5.2 實現

生產消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String msg = "hello ps";

        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        System.out.println("Send " + msg);

        channel.close();
        connection.close();
    }
}

消息哪去了?丟失了!因為交換機沒有存儲能力,在rabbitMQ中,只有隊列有存儲能力。此時并沒有完成隊列綁定到交換機,所以數據丟失了。

消費消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME = "test_ps_fanout_email";
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

不同的隊列做不同的事情。

5.3 Exchange(交換機、轉發器)

一方面接收生產者的消息,另一方面向隊列推送消息

rabbitMQ提供了四種Exchange:fanout,direct,topic,header? header模式在實際使用中較少。

fanout:不處理路由鍵

direct:處理路由鍵

topic

將路由鍵和某模式進行匹配

任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的Queue上

六、路由模式 6.1 模型

聲明exchange時指定為direct模式

綁定隊列時,指定路由鍵

6.2 實現

生產者

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String msg = "hello direct";

        //指定路由鍵
        String routingKey = "warning";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        System.out.println("send msg:" + msg);
        channel.close();
        connection.close();
    }
}

消費者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        //綁定隊列與交換機時,指定路由鍵
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");


        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        //綁定隊列與交換機時,指定路由鍵
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv2 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
七、Topic模式 7.1 模型

# 匹配一個或多個

* 匹配一個

7.2 實現

生產者

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange,指定模式為topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String msg = "商品....";

        String routingKey = "goods.delete";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        System.out.println("send msg:" + msg);
        channel.close();
        connection.close();
    }
}

消費者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by wangbin on 2018/6/26.
 */
public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by wangbin on 2018/6/26.
 */
public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv2 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動應答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

其中,消費者1綁定路由鍵為goods.#,消費者2綁定路由鍵為goods.add。當生產者發送的消息路由鍵為goods.add時,兩個消費者都會收到消息并處理;當生產者發送的消息路由鍵為goods.update時,只有消費者1可以接收到消息。

八、RabbitMQ的消息確認機制

在rabbitMQ中,可以通過持久化數據解決rabbitMQ服務器異常的數據丟失問題。

問題:生產者將消息發送出去之后,消息到底有沒有到達rabbitMQ服務器;默認情況是不知道消息已到達的

兩種方式:

AMQP實現了事務機制

confirm模式

8.1 事務機制

txSelect

用于將當前channel設置成transaction模式

txCommit

用于提交事務

txRollback

回滾事務

生產者發送消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_queue_tx";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello tx msg!";

        try {
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            channel.txCommit();
        } catch (IOException e) {
            channel.txRollback();
            System.out.println("發生異常,事務已回滾");
        }
    }
}

事務機制會降低rabbitMQ的吞吐量。

8.2 Confirm模式

生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號,此外broker也可以設置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經得到了處理;

confirm模式最大的好處在于他是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息。

編程模式:

1、普通,發一條

2、批量,發一批

3、異步confirm模式,提供一個回調方法

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/71406.html

相關文章

  • RabbitMq 最全的性能調優筆記

    摘要:性能調優筆記避免雷區要避免流控機制觸發服務端默認配置是當內存使用達到,磁盤空閑空間小于,即啟動內存報警,磁盤報警報警后服務端觸發流控機制。最佳線程生產者使用多線程發送數據到三到五個線程性能發送最佳,超過它也不能提高生產的發送速率。 RabbitMq 性能調優筆記 [TOC] 避免雷區 要避免流控機制觸發 服務端默認配置是當內存使用達到40%,磁盤空閑空間小于50M,即啟動內存報警,磁...

    Tony 評論0 收藏0
  • 慕課網_《RabbitMQ消息中間件極速入門與實戰》學習總結

    摘要:慕課網消息中間件極速入門與實戰學習總結時間年月日星期三說明本文部分內容均來自慕課網。 慕課網《RabbitMQ消息中間件極速入門與實戰》學習總結 時間:2018年09月05日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:無 學習源碼:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 評論0 收藏0
  • Android工程師轉型Java后端開發之路,自己選的路,跪著也要走下去!

    本文是公眾號讀者jianfeng投稿的面試經驗恭喜該同學成功轉型目錄:毅然轉型,沒頭蒼蠅制定目標,系統學習面試經歷毅然轉崗,沒頭蒼蠅首先,介紹一下我的背景。本人坐標廣州,2016年畢業于一個普通二本大學,曾經在某機構培訓過Android。2018年初的時候已經在兩家小公司工作干了兩年的android開發,然后會一些Tomcat、Servlet之類的技術,當時的年薪大概也就15萬這樣子。由于個人發展...

    番茄西紅柿 評論0 收藏0
  • SpringBoot RabbitMQ 整合使用

    摘要:可以在地址看到如何使用講解下上面命令行表示控制臺端口號,可以在瀏覽器中通過控制臺來執行的相關操作。同時從控制臺可以看到發送的速率多線程測試性能開了個線程,每個線程發送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...

    yuanxin 評論0 收藏0

發表評論

0條評論

zacklee

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<