springcloud学习笔记(八):通过spring-cloud-stream实现消息队列rabbitmq的消息生产和消费

本文主要介绍如何在springcloud中通过spring-cloud-stream实现消息队列rabbitmq的消息生产和消费。本例使用的springcloud版本为:2021.0.3,springboot版本为:2.6.8。

本例通过创建一个生产者项目provide-stream-8011和两个消费者项目consumer-stream-8012、consumer-stream-8013来进行演示。

1、生产者provider项目配置

1.1 创建provider-stream-8011项目

​ 打开idea新建项目,选择maven,创建springboot项目provider-stream-8011。

image-20240830172032249

1.2、pom文件配置

​ 在项目pom中引入spring-cloud-starter-netflix-eureka-client和spring-cloud-starter-stream-rabbit依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
 
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
 
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
 
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
</dependencies>

1.3、application.yml文件配置

​ 在项目resources文件夹下创建application.yml文件,并按如下内容进行配置:

server:
  port: 8011
 
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
    fetch-registry: true
  instance:
    instance-id: provider-stream-${server.port}
    prefer-ip-address: true
 
spring:
  application:
    name: provider-stream
 
  #消息中间件配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: spring
    password: 123456
  cloud:
    stream:
      bindings:
        output-out-0:            #通道名称
          destination: message   #exchange

1.4、主应用类配置

​ 在项目src/main/java下创建主应用类ProviderStreamApplication.java,添加注解@EnableEurekaClient、@SpringBootApplication。

@EnableEurekaClient
@SpringBootApplication
public class ProviderStreamApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProviderStreamApplication.class, args);
    }
}

1.5 service层配置

​ 创建SendMessageService类实现生产者发送消息功能。streamBridge为消息队列的桥接对象,send方法的第一个参数是通道的名称,第二个参数为发送的数据。

@Service
@Slf4j
public class SendMessageService {
    @Resource
    private StreamBridge streamBridge;
 
    // 发送消息
    public void send(){
        String data = UUID.randomUUID().toString();
        log.info("发送的消息: {}", data);
        streamBridge.send("output-out-0", data);
    }
}

1.6 controller层配置

​ 创建SendMessageController类实现web的访问,并通过调用SendMessageService的send方法进行消息发送。

@RestController
@Slf4j
public class SendMessageController {
    @Resource
    private SendMessageService sendMessageService;
 
    @GetMapping("/send")
    public void send(){
        sendMessageService.send();
    }
}

2、消费者consumer项目配置

2.1 创建consumer-stream-8012项目







扫描下方二维码,关注公众号:程序进阶之路,实时获取更多优质文章推送。


扫码关注

评论