java中如何保证RabbitMq的消息不丢失呢?
下文笔者讲述RabbitMq消息不丢失的方法分享,如下所示
RabbitMQ消息持久化的实现思路
如果你想保证RabbitMQ消息不丢失 这里面则涉及很多方面 如:消息持久化、确认机制、队列和交换机的配置等 具体的操作方式,如下所示 1.消息持久化 确保消息在发送到RabbitMQ时 被标记为持久化 即使 RabbitMQ 服务器重启,消息也不会丢失。 生产者端配置 在生产者端 使用 `delivery_mode=2` 来标记消息为持久化 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private final static String QUEUE_NAME = "durable_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列为持久化队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); String message = "Hello World!"; // 发送持久化消息 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } 队列持久化 确保队列在声明时被标记为持久化 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 交换机持久化 如果使用交换机,确保交换机也被声明为持久化。 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); 消费者确认机制 使用消费者确认机制(acknowledgments)来确保消息被正确处理 消费者在处理完消息后发送确认信号给RabbitMQ RabbitMQ收到确认后才会删除消息 消费者端配置 在消费者端 启用手动确认模式 import com.rabbitmq.client.*; public class Consumer { private final static String QUEUE_NAME = "durable_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); // 处理消息 // 发送确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 启用手动确认模式 boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } } 高可用性配置 配置RabbitMQ高可用性(HA)集群 以确保在某个节点故障时,消息不会丢失 配置HA集群 1.启用HA插件 rabbitmq-plugins enable rabbitmq_ha_policy 2.设置HA策略: rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' 监控和日志 定期监控RabbitMQ状态和日志 及时发现和解决问题。 查看RabbitMQ状态 rabbitmqctl status 查看RabbitMQ日志 RabbitMQ日志通常位于/var/log/rabbitmq/目录下 使用以上步骤 可显著提高RabbitMQ消息可靠性 减少消息丢失风险例:
生产者
消息确定机制 import com.rabbitmq.client.*; public class PersistentProducer { private final static String QUEUE_NAME = "persistent_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明一个持久化队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 启用生产者确认 channel.confirmSelect(); String message = "Persistent message with producer confirm!"; channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 检查消息是否成功发送 if (channel.waitForConfirms()) { System.out.println("Message sent successfully!"); } else { System.out.println("Message failed to send!"); } } } }
channel.queueDeclare(QUEUE_NAME, true, false, false, null):队列是持久化的,确保 RabbitMQ 重启后队列不会丢失。 MessageProperties.PERSISTENT_TEXT_PLAIN:确保消息持久化存储。 channel.confirmSelect():启用生产者确认,确保消息成功送达 RabbitMQ。 channel.waitForConfirms():等待确认,如果生产者发送消息时发生失败,会捕获错误。
交换机
选择合适交换机类型: 常见的交换机类型包括direct、fanout、topic 和 headers 选择正确类型可确保消息路由正确 使用死信队列(Dead Letter Exchange, DLX): 当消息因某些原因无法被消费 可将消息转发到死信队列进行进一步处理
import com.rabbitmq.client.*; public class DirectExchangeProducer { private final static String EXCHANGE_NAME = "direct_logs"; private final static String QUEUE_NAME = "persistent_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明交换机和队列 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); String message = "Hello, Direct Exchange!"; channel.basicPublish(EXCHANGE_NAME, "info", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("Sent: " + message); } } } public class DeadLetterConsumer { private final static String DLX_QUEUE = "dlx_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明死信队列 channel.queueDeclare(DLX_QUEUE, true, false, false, null); // 创建消费者回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Dead Letter Queue Received: " + message); }; // 设置死信队列消费者 channel.basicConsume(DLX_QUEUE, true, deliverCallback, consumerTag -> {}); } } }
import com.rabbitmq.client.*; public class AckConsumer { private final static String QUEUE_NAME = "persistent_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明一个持久化队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 创建一个消费者回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Received: " + message); try { // 模拟消息处理 if (message.contains("error")) { throw new Exception("Error while processing message"); } // 手动确认消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); System.out.println("Message processed and acknowledged"); } catch (Exception e) { // 如果消息处理失败,可以将消息重新放回队列 System.out.println("Error processing message, requeueing: " + e.getMessage()); channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); } }; // 设置手动确认 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}); } } }
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) 消息处理成功后确认消息 channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true) 当消费失败 重新将消息投递到队列中 供其他消费者处理
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。