RabbitMQ的发布/订阅模式(Publish/Subscribe Mode)详解

       RabbitMQ中的发布/订阅模式是一种常见的消息传递模式,用于将消息广播给多个消费者。在这种模式下,一个生产者将消息发送到一个交换机(Exchange),而交换机将消息广播给所有与之绑定的队列(Queue)。每个队列都有一个消费者来接收消息并进行处理。

发布/订阅模式的特点

  • 消息广播:消息被广播到所有与交换机绑定的队列,而不是直接发送到特定的队列。
  • 解耦合:发布者和订阅者之间通过交换机进行解耦,发布者无需知道消息将被传递到哪些队列。
  • 多播:支持多个消费者同时处理同一条消息,以实现消息的多播效果。
  • 灵活性:可以根据需要使用不同类型的交换机和绑定规则,以满足不同的消息传递需求。

​ 发布/订阅模式适用于需要将消息广播给多个消费者的场景,例如实时通知、日志记录、事件处理等。

交换机类型

​ 在发布/订阅模式中,常用的交换机类型是Fanout。Fanout类型的交换机特别简单,把所有接收到的消息广播到所有它所知道的队列,类似于子网广播,每台子网内的主机都获得了一份复制的消息。

Java代码示例

​ 以下是一个使用Java代码实现RabbitMQ发布/订阅模式的示例:

import com.rabbitmq.client.*;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
public class RabbitMQPublishSubscribeExample {  
  
    private final static String EXCHANGE_NAME = "my_exchange";  
  
    public static void main(String[] argv) throws Exception {  
        // 创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
  
        // 创建连接  
        try (Connection connection = factory.newConnection();  
             Channel channel = connection.createChannel()) {  
  
            // 声明交换机  
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
  
            // 创建队列并绑定到交换机  
            String queueName1 = channel.queueDeclare().getQueue();  
            channel.queueBind(queueName1, EXCHANGE_NAME, "");  
  
            String queueName2 = channel.queueDeclare().getQueue();  
            channel.queueBind(queueName2, EXCHANGE_NAME, "");  
  
            // 发送消息  
            String message = "Hello, RabbitMQ!";  
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());  
            System.out.println(" [x] Sent '" + message + "'");  
  
            // 创建消费者1  
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {  
                String receivedMessage = new String(delivery.getBody(), "UTF-8");  
                System.out.println("Received message from queue 1: " + receivedMessage);  
            };  
            channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });  
  
            // 创建消费者2  
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {  
                String receivedMessage = new String(delivery.getBody(), "UTF-8");  
                System.out.println("Received message from queue 2: " + receivedMessage);  
            };  
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });  
        }  
    }  
}






次阅读

扫描下方二维码,关注公众号:程序进阶之路,实时获取更多优质文章推送。


扫码关注

评论