java中如何保证RabbitMq的消息不丢失呢?

欣喜 rabbitmq 发布时间:2025-02-17 11:02:50 阅读数:13239 1
下文笔者讲述RabbitMq消息不丢失的方法分享,如下所示

RabbitMQ消息持久化的实现思路

  如果你想保证RabbitMQ消息不丢失
   这里面则涉及很多方面
      如:消息持久化、确认机制、队列和交换机的配置等
   
   具体的操作方式,如下所示
     1.消息持久化
       确保消息在发送到RabbitMQ时
	     被标记为持久化
		  即使 RabbitMQ 服务器重启,消息也不会丢失。

       生产者端配置
     在生产者端
	   使用 `delivery_mode=2` 来标记消息为持久化
	    
		import com.rabbitmq.client.Channel;
		import com.rabbitmq.client.Connection;
		import com.rabbitmq.client.ConnectionFactory;

		public class Producer {
			private final static String QUEUE_NAME = "durable_queue";

			public static void main(String[] argv) throws Exception {
				ConnectionFactory factory = new ConnectionFactory();
				factory.setHost("localhost");
				try (Connection connection = factory.newConnection();
					 Channel channel = connection.createChannel()) {
					// 声明队列为持久化队列
					channel.queueDeclare(QUEUE_NAME, true, false, false, null);
					String message = "Hello World!";
					// 发送持久化消息
					channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
					System.out.println(" [x] Sent '" + message + "'");
				}
			}
		}
 
      队列持久化
       确保队列在声明时被标记为持久化
	   
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    交换机持久化
      如果使用交换机,确保交换机也被声明为持久化。
 
      channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
 
      消费者确认机制
        使用消费者确认机制(acknowledgments)来确保消息被正确处理
		  消费者在处理完消息后发送确认信号给RabbitMQ
		   RabbitMQ收到确认后才会删除消息
		   
    消费者端配置
       在消费者端
	     启用手动确认模式
		  
		import com.rabbitmq.client.*;

		public class Consumer {
			private final static String QUEUE_NAME = "durable_queue";

			public static void main(String[] argv) throws Exception {
				ConnectionFactory factory = new ConnectionFactory();
				factory.setHost("localhost");
				Connection connection = factory.newConnection();
				Channel channel = connection.createChannel();

				channel.queueDeclare(QUEUE_NAME, true, false, false, null);
				System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

				DeliverCallback deliverCallback = (consumerTag, delivery) -> {
					String message = new String(delivery.getBody(), "UTF-8");
					System.out.println(" [x] Received '" + message + "'");
					// 处理消息
					// 发送确认
					channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
				};

				// 启用手动确认模式
				boolean autoAck = false;
				channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
			}
		}
 
     高可用性配置
        配置RabbitMQ高可用性(HA)集群
		  以确保在某个节点故障时,消息不会丢失
		  
     配置HA集群
        1.启用HA插件
        rabbitmq-plugins enable rabbitmq_ha_policy
        2.设置HA策略:
        rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
    
	监控和日志
        定期监控RabbitMQ状态和日志
		 及时发现和解决问题。
    
	查看RabbitMQ状态
         rabbitmqctl status

    查看RabbitMQ日志
         RabbitMQ日志通常位于/var/log/rabbitmq/目录下

    使用以上步骤
	  可显著提高RabbitMQ消息可靠性
	  减少消息丢失风险
例:

生产者

 
消息确定机制

import com.rabbitmq.client.*;
public class PersistentProducer {
    private final static String QUEUE_NAME = "persistent_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明一个持久化队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // 启用生产者确认
            channel.confirmSelect();

            String message = "Persistent message with producer confirm!";
            channel.basicPublish("", QUEUE_NAME, 
                MessageProperties.PERSISTENT_TEXT_PLAIN, 
                message.getBytes());

            // 检查消息是否成功发送
            if (channel.waitForConfirms()) {
                System.out.println("Message sent successfully!");
            } else {
                System.out.println("Message failed to send!");
            }
        }
    }
}
channel.queueDeclare(QUEUE_NAME, true, false, false, null):队列是持久化的,确保 RabbitMQ 重启后队列不会丢失。
MessageProperties.PERSISTENT_TEXT_PLAIN:确保消息持久化存储。
channel.confirmSelect():启用生产者确认,确保消息成功送达 RabbitMQ。
channel.waitForConfirms():等待确认,如果生产者发送消息时发生失败,会捕获错误。

交换机

 选择合适交换机类型:
    常见的交换机类型包括direct、fanout、topic 和 headers
	 选择正确类型可确保消息路由正确
 使用死信队列(Dead Letter Exchange, DLX):
    当消息因某些原因无法被消费
	 可将消息转发到死信队列进行进一步处理
import com.rabbitmq.client.*;

public class DirectExchangeProducer {
    private final static String EXCHANGE_NAME = "direct_logs";
    private final static String QUEUE_NAME = "persistent_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明交换机和队列
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");

            String message = "Hello, Direct Exchange!";
            channel.basicPublish(EXCHANGE_NAME, "info", 
                MessageProperties.PERSISTENT_TEXT_PLAIN, 
                message.getBytes());

            System.out.println("Sent: " + message);
        }
    }
}

public class DeadLetterConsumer {
    private final static String DLX_QUEUE = "dlx_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明死信队列
            channel.queueDeclare(DLX_QUEUE, true, false, false, null);

            // 创建消费者回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Dead Letter Queue Received: " + message);
            };

            // 设置死信队列消费者
            channel.basicConsume(DLX_QUEUE, true, deliverCallback, consumerTag -> {});
        }
    }
}
import com.rabbitmq.client.*;

public class AckConsumer {
    private final static String QUEUE_NAME = "persistent_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明一个持久化队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // 创建一个消费者回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received: " + message);

                try {
                    // 模拟消息处理
                    if (message.contains("error")) {
                        throw new Exception("Error while processing message");
                    }

                    // 手动确认消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    System.out.println("Message processed and acknowledged");
                } catch (Exception e) {
                    // 如果消息处理失败,可以将消息重新放回队列
                    System.out.println("Error processing message, requeueing: " + e.getMessage());
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                }
            };

            // 设置手动确认
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
   消息处理成功后确认消息

channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true)
   当消费失败
    重新将消息投递到队列中
	供其他消费者处理
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

本文链接: https://www.Java265.com/rabbitmq/2025/8298.html

最近发表

热门文章

好文推荐

Java265.com

https://www.java265.com

站长统计|粤ICP备14097017号-3

Powered By Java265.com信息维护小组

使用手机扫描二维码

关注我们看更多资讯

java爱好者