RabbitMQ 是一个流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。在 RabbitMQ 中,有多种消息交换模式(Exchange Types),其中主题模式(Topics Mode)是一种非常灵活且强大的路由机制。
什么是主题模式?
主题模式允许你根据消息的路由键(Routing Key)将消息路由到一个或多个队列。路由键和绑定键(Binding Key)都是字符串,并且它们使用特定的通配符来匹配:
- *(星号):匹配一个单词。
- #(井号):匹配零个或多个单词。
主题模式的工作原理
- 生产者发送消息到一个交换机(Exchange),并指定一个路由键。
- 交换机根据路由键和绑定键的规则,将消息路由到一个或多个队列。
- 消费者从队列中接收消息。
示例场景
假设你有一个日志系统,需要根据不同的日志级别和来源路由日志消息。你可以使用主题模式来实现这一点。
你可以创建以下路由键:
error.auth
warning.payment
info.notification
Java 代码示例
以下是一个使用 RabbitMQ Java 客户端的示例,展示如何在主题模式下发布和订阅消息。
Maven 依赖
首先,你需要在你的 pom.xml 文件中添加 RabbitMQ Java 客户端的依赖:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>
</dependencies>
生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicProducer {
private final static String EXCHANGE_NAME = "logs_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "error.auth"; // 你可以改变这个路由键来测试不同的路由
String message = "Error log from authentication system";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
RabbitMQ 是一个流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。在 RabbitMQ 中,有多种消息交换模式(Exchange Types),其中主题模式(Topics Mode)是一种非常灵活且强大的路由机制。
什么是主题模式?
主题模式允许你根据消息的路由键(Routing Key)将消息路由到一个或多个队列。路由键和绑定键(Binding Key)都是字符串,并且它们使用特定的通配符来匹配:
- *(星号):匹配一个单词。
- #(井号):匹配零个或多个单词。
主题模式的工作原理
- 生产者发送消息到一个交换机(Exchange),并指定一个路由键。
- 交换机根据路由键和绑定键的规则,将消息路由到一个或多个队列。
- 消费者从队列中接收消息。
示例场景
假设你有一个日志系统,需要根据不同的日志级别和来源路由日志消息。你可以使用主题模式来实现这一点。
你可以创建以下路由键:
error.auth
warning.payment
info.notification
Java 代码示例
以下是一个使用 RabbitMQ Java 客户端的示例,展示如何在主题模式下发布和订阅消息。
Maven 依赖
首先,你需要在你的 pom.xml 文件中添加 RabbitMQ Java 客户端的依赖:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>
</dependencies>
生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicProducer {
private final static String EXCHANGE_NAME = "logs_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "error.auth"; // 你可以改变这个路由键来测试不同的路由
String message = "Error log from authentication system";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicConsumer {
private final static String EXCHANGE_NAME = "logs_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到交换机,使用不同的路由键
channel.queueBind(queueName, EXCHANGE_NAME, "*.auth");
channel.queueBind(queueName, EXCHANGE_NAME, "*.payment");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
CancelCallback cancelCallback = consumerTag -> { };
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
运行步骤
- 启动 RabbitMQ 服务器。
- 运行 TopicProducer 类,发送一些消息。
- 运行 TopicConsumer 类,监听并打印接收到的消息。
测试
你可以修改 TopicProducer 中的路由键,比如改为 warning.payment 或 info.notification,并观察 TopicConsumer 是否能正确接收到这些消息。
总结
主题模式提供了一种非常灵活的消息路由机制,适用于需要根据复杂规则将消息路由到多个队列的场景。通过合理使用通配符 * 和 #,你可以轻松实现复杂的路由逻辑。希望这个示例能帮助你更好地理解 RabbitMQ 的主题模式。