RabbitMQ 确认模式(Acknowledgements Mode)详解

       RabbitMQ 是一款流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。在消息传递过程中,确保消息被正确处理是至关重要的。RabbitMQ 提供了多种机制来确保消息的可靠性,其中确认模式(Acknowledgements Mode)是一个关键特性。

什么是确认模式?

​ 确认模式(Acknowledgements Mode)允许消费者在成功处理消息后显式地向 RabbitMQ 服务器发送确认信号(ack)。只有在收到确认信号后,RabbitMQ 服务器才会从队列中删除该消息。如果消费者未能发送确认信号(例如,由于消费者崩溃或网络故障),RabbitMQ 会认为消息尚未被处理,并在适当的时候重新发送消息。

RabbitMQ 提供了三种主要的确认模式:

  1. 手动确认(Manual Acknowledgement):消费者需要显式地发送确认信号。
  2. 自动确认(Automatic Acknowledgement):消息一旦被消费者接收,立即自动确认。
  3. 批量确认(Batch Acknowledgement):消费者可以一次确认多条消息。

为什么使用确认模式?

  • 确保消息不丢失:即使消费者崩溃,消息也会重新发送。
  • 提高可靠性:通过控制确认时机,可以更好地管理消息处理流程。
  • 灵活性:可以根据不同的业务需求选择不同的确认模式。

Java 代码示例

​ 下面是一个使用 Java 和 Spring AMQP 实现 RabbitMQ 确认模式的示例。

Maven 依赖

​ 首先,在你的 pom.xml 文件中添加 Spring AMQP 依赖:

<dependencies>  
    <dependency>  
        <groupId>org.springframework.boot</groupId>  
        <artifactId>spring-boot-starter-amqp</artifactId>  
    </dependency>  
    <!-- 其他依赖 -->  
</dependencies>

配置 RabbitMQ

​ 在 application.properties 文件中配置 RabbitMQ 连接信息:

spring.rabbitmq.host=localhost  
spring.rabbitmq.port=5672  
spring.rabbitmq.username=guest  
spring.rabbitmq.password=guest

配置类

​ 配置一个队列、交换机和绑定:

import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
@Configuration  
public class RabbitMQConfig {  
  
    public static final String QUEUE_NAME = "exampleQueue";  
    public static final String EXCHANGE_NAME = "exampleExchange";  
    public static final String ROUTING_KEY = "exampleRoutingKey";  
  
    @Bean  
    public Queue queue() {  
        return new Queue(QUEUE_NAME, true); // 持久化队列  
    }  
  
    @Bean  
    public DirectExchange exchange() {  
        return new DirectExchange(EXCHANGE_NAME);  
    }  
  
    @Bean  
    public Binding binding(Queue queue, DirectExchange exchange) {  
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);  
    }  
}

消息监听器

​ 使用手动确认模式来监听队列:

import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener.ReturnCallback;  
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;  
import org.springframework.amqp.rabbit.listener.config.SimpleRabbitListenerContainerFactory;  
import org.springframework.amqp.rabbit.listener.config.SimpleRabbitListenerEndpoint;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;  
import org.springframework.amqp.support.converter.MessageConverter;  
import org.springframework.amqp.support.converter.SimpleMessageConverter;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
import com.rabbitmq.client.Channel;  
  
@Configuration  
public class ListenerConfig {  
  
    @Autowired  
    private ConnectionFactory connectionFactory;  
  
    @Bean  
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {  
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();  
        factory.setConnectionFactory(connectionFactory);  
        factory.setMessageConverter(jsonMessageConverter());  
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置为手动确认模式  
        return factory;  
    }  
  
    @Bean  
    public MessageConverter jsonMessageConverter() {  
        return new SimpleMessageConverter();  
    }  
  
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, containerFactory = "rabbitListenerContainerFactory")  
    public void listen(String message, Channel channel, org.springframework.amqp.core.Message rabbitMessage) throws Exception {  
        try {  
            // 处理消息  
            System.out.println("Received <" + message + ">");  
              
            // 发送确认信号  
            channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false);  
        } catch (Exception e) {  
            // 发送拒绝信号,并设置为重新入队(requeue)  
            channel.basicNack(rabbitMessage.getMessageProperties().getDeliveryTag(), false, true);  
            throw e;  
        }  
    }  
}






次阅读

扫描下方二维码,关注公众号:程序进阶之路,实时获取更多优质文章推送。


扫码关注

评论