RabbitMQ的工作队列模式(Work Queues Mode)

RabbitMQ的工作队列模式是一种消息处理模型,主要用于解决耗时任务的异步执行和负载均衡问题。该模式由生产者(Producer)和多个消费者(Consumer)组成,生产者将任务以消息的形式发送到特定的工作队列中,而多个消费者则并发地从队列中获取并处理这些任务。

  1. 生产者:将任务封装成消息,并通过AMQP协议将消息发布到指定的工作队列。
  2. 工作队列:队列中的消息代表待处理的任务。默认情况下,RabbitMQ会尽力按照“轮询”方式公平地分配消息给所有在线的消费者,确保每个消费者都有机会获得消息进行处理。
  3. 消费者:多个消费者可以同时监听同一个队列,每个消费者都可以尝试从队列中取出一条未被其他消费者获取过的消息进行处理。消费者接收到消息后开始处理任务,并在完成任务后向RabbitMQ发送一个确认信号(acknowledgement)。只有收到确认,RabbitMQ才会将消息从队列中移除。

工作队列模式具备以下特性:

  • 消息持久化:为了防止消息在RabbitMQ服务器重启时丢失,可以设置消息的持久化属性,确保即使在服务器宕机的情况下,重要的任务消息仍能保存下来并在服务器恢复后继续投递给消费者。
  • 负载均衡:通过多个消费者并发处理队列中的消息,实现任务的分布式处理和负载均衡。
  • 消息重试:如果消费者在处理过程中失败或断开连接而没有确认消息,RabbitMQ会在消费者重新连接后重新投递该消息,保证任务至少会被执行一次。

Java代码示例

​ 以下是一个使用Java和RabbitMQ Java客户端库实现的简单工作队列模式示例。

生产者代码

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
public class ProducerWorkQueues {  
    public static void main(String[] args) throws IOException, TimeoutException {  
        // 创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
        // 设置连接参数  
        factory.setHost("localhost"); // 设置RabbitMQ服务器地址  
        factory.setPort(5672); // 设置RabbitMQ服务器端口  
        factory.setVirtualHost("/"); // 设置连接的虚拟机  
        factory.setUsername("guest"); // 设置用户名  
        factory.setPassword("guest"); // 设置密码  
  
        // 创建连接  
        Connection connection = factory.newConnection();  
        // 创建Channel  
        Channel channel = connection.createChannel();  
  
        // 声明队列  
        channel.queueDeclare("work_queues", true, false, false, null);  
  
        // 发送消息  
        for (int i = 0; i <= 10; i++) {  
            String body = i + "Hello RabbitMQ! Nice to meet you!";  
            channel.basicPublish("", "work_queues", null, body.getBytes());  
        }  
  
        // 释放资源  
        channel.close();  
        connection.close();  
    }  
}

消费者代码

import com.rabbitmq.client.*;  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
public class ConsumerWorkQueues {  
    public static void main(String[] args) throws IOException, TimeoutException {  
        // 创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
        // 设置连接参数  
        factory.setHost("localhost"); // 设置RabbitMQ服务器地址  
        factory.setPort(5672); // 设置RabbitMQ服务器端口  
        factory.setVirtualHost("/"); // 设置连接的虚拟机  
        factory.setUsername("guest"); // 设置用户名  
        factory.setPassword("guest"); // 设置密码  
  
        // 创建连接  
        Connection connection = factory.newConnection();  
        // 创建Channel  
        Channel channel = connection.createChannel();  
  
        // 声明队列  
        channel.queueDeclare("work_queues", true, false, false, null);  
  
        // 创建消费者  
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
            String message = new String(delivery.getBody(), "UTF-8");  
            System.out.println("Received: '" + message + "'");  
              
            // 处理消息(模拟耗时任务)  
            try {  
                Thread.sleep(1000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
  
            // 确认消息处理完成  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
        };  
  
        // 开始消费消息  
        channel.basicConsume("work_queues", false, deliverCallback, consumerTag -> { });  
    }  
}






次阅读

扫描下方二维码,关注公众号:程序进阶之路,实时获取更多优质文章推送。


扫码关注

评论