本文主要介绍如何在springcloud中通过spring-cloud-stream实现消息队列rabbitmq的消息生产和消费。本例使用的springcloud版本为:2021.0.3,springboot版本为:2.6.8。
本例通过创建一个生产者项目provide-stream-8011和两个消费者项目consumer-stream-8012、consumer-stream-8013来进行演示。
1、生产者provider项目配置
1.1 创建provider-stream-8011项目
打开idea新建项目,选择maven,创建springboot项目provider-stream-8011。
1.2、pom文件配置
在项目pom中引入spring-cloud-starter-netflix-eureka-client和spring-cloud-starter-stream-rabbit依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
1.3、application.yml文件配置
在项目resources文件夹下创建application.yml文件,并按如下内容进行配置:
server:
port: 8011
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
fetch-registry: true
instance:
instance-id: provider-stream-${server.port}
prefer-ip-address: true
spring:
application:
name: provider-stream
#消息中间件配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: spring
password: 123456
cloud:
stream:
bindings:
output-out-0: #通道名称
destination: message #exchange
1.4、主应用类配置
在项目src/main/java下创建主应用类ProviderStreamApplication.java,添加注解@EnableEurekaClient、@SpringBootApplication。
@EnableEurekaClient
@SpringBootApplication
public class ProviderStreamApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderStreamApplication.class, args);
}
}
1.5 service层配置
创建SendMessageService类实现生产者发送消息功能。streamBridge为消息队列的桥接对象,send方法的第一个参数是通道的名称,第二个参数为发送的数据。
@Service
@Slf4j
public class SendMessageService {
@Resource
private StreamBridge streamBridge;
// 发送消息
public void send(){
String data = UUID.randomUUID().toString();
log.info("发送的消息: {}", data);
streamBridge.send("output-out-0", data);
}
}
1.6 controller层配置
创建SendMessageController类实现web的访问,并通过调用SendMessageService的send方法进行消息发送。
@RestController
@Slf4j
public class SendMessageController {
@Resource
private SendMessageService sendMessageService;
@GetMapping("/send")
public void send(){
sendMessageService.send();
}
}
2、消费者consumer项目配置
2.1 创建consumer-stream-8012项目
本文主要介绍如何在springcloud中通过spring-cloud-stream实现消息队列rabbitmq的消息生产和消费。本例使用的springcloud版本为:2021.0.3,springboot版本为:2.6.8。
本例通过创建一个生产者项目provide-stream-8011和两个消费者项目consumer-stream-8012、consumer-stream-8013来进行演示。
1、生产者provider项目配置
1.1 创建provider-stream-8011项目
打开idea新建项目,选择maven,创建springboot项目provider-stream-8011。
1.2、pom文件配置
在项目pom中引入spring-cloud-starter-netflix-eureka-client和spring-cloud-starter-stream-rabbit依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
1.3、application.yml文件配置
在项目resources文件夹下创建application.yml文件,并按如下内容进行配置:
server:
port: 8011
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
fetch-registry: true
instance:
instance-id: provider-stream-${server.port}
prefer-ip-address: true
spring:
application:
name: provider-stream
#消息中间件配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: spring
password: 123456
cloud:
stream:
bindings:
output-out-0: #通道名称
destination: message #exchange
1.4、主应用类配置
在项目src/main/java下创建主应用类ProviderStreamApplication.java,添加注解@EnableEurekaClient、@SpringBootApplication。
@EnableEurekaClient
@SpringBootApplication
public class ProviderStreamApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderStreamApplication.class, args);
}
}
1.5 service层配置
创建SendMessageService类实现生产者发送消息功能。streamBridge为消息队列的桥接对象,send方法的第一个参数是通道的名称,第二个参数为发送的数据。
@Service
@Slf4j
public class SendMessageService {
@Resource
private StreamBridge streamBridge;
// 发送消息
public void send(){
String data = UUID.randomUUID().toString();
log.info("发送的消息: {}", data);
streamBridge.send("output-out-0", data);
}
}
1.6 controller层配置
创建SendMessageController类实现web的访问,并通过调用SendMessageService的send方法进行消息发送。
@RestController
@Slf4j
public class SendMessageController {
@Resource
private SendMessageService sendMessageService;
@GetMapping("/send")
public void send(){
sendMessageService.send();
}
}
2、消费者consumer项目配置
2.1 创建consumer-stream-8012项目
打开idea新建项目,选择maven,创建springboot项目consumer-stream-8012。
2.2、pom文件配置
在项目pom中引入spring-cloud-starter-netflix-eureka-client和spring-cloud-starter-stream-rabbit依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
2.3、application.yml文件配置
在项目resources文件夹下创建application.yml文件,并按如下内容进行配置:
server:
port: 8012
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
fetch-registry: true
instance:
instance-id: consumer-stream-${server.port}
prefer-ip-address: true
spring:
application:
name: consumer-stream
#消息中间件配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: spring
password: 123456
cloud:
stream:
bindings:
input-in-0: #通道名称
destination: message #exchange,需要与发送者的配置名称一致才能收到对应消息
group: consumer #分组,同一组的消息只会被一个消费者消费,避免重复消费。
2.4、主应用类配置
在项目src/main/java下创建主应用类 GatewayApplication.java,添加注解@EnableEurekaClient、@SpringBootApplication。
@SpringBootApplication
@EnableEurekaClient
public class ConsumerStreamApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerStreamApplication.class, args);
}
}
2.5 消费类配置
在config文件夹下创建配置类StreamConfig,并编写消费类bean实现消息消费。
@Configuration
@Slf4j
public class StreamConfig {
@Bean
public Consumer<String> input() {
return (data) -> log.info("收到的消息:{}", data);
}
}
2.6 创建consumer-stream-8013项目
参考consumer-stream-8012项目进行创建即可。
3、测试验证
同时启动并运行项目eueka-server-7001、provider-stream-8011、consumer-stream-8012、consumer-stream-8013。通过访问http://127.0.0.1:8011/send发送消息,可以分别在provider-stream-8011、consumer-stream-8012的控制台看到收到的消息日志。