spring cloud stream升级到4.0版本后,消息收发的方式和以前有较大的差异,经过测试后,总结见下:
一、基本集成
application.yml内容见下:
spring:
  profiles:
    active: dev
  application:
    name: iios-keystone-broker-service
  #配置中心
  cloud:
    stream:
      rabbit:
        bindings:
          yypOutput-out-0:
            # 生产者配置信息
            producer:
              # 生产者使用的交换机类型   如果已存在交换机名称,该类型必须与交换机类型一致
              exchangeType: direct
              # 用于指定 routing key 表达式
              routing-key-expression: headers["routeTo"] # 该值表示使用头信息的routeTo字段作为 routing key
              queueNameGroupOnly: true
          yypInput-in-0:
            # 消费者配置信息
            consumer:
              # 消费者使用的交换机类型   如果已存在交换机名称,该类型必须与交换机类型一致
              exchangeType: direct
              # 消息确认模式   具体查看AcknowledgeMode
              acknowledge-mode: none
              #queueNameGroupOnly: true
      bindings:
        yypOutput-out-0: #通道的名称
          destination: yyp-test-001 #要使用的exchange名称
          content-type: application/json
          default-binder: iios_rabbit
        yypInput-in-0: #通道的名称
          destination: yyp-test-001 #要使用的exchange名称
          #content-type: application/json
          default-binder: iios_rabbit
          group: yyp-test-001-queue-001 # 要使用的消息队列名称
      binders:
        iios_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.1.1
                port: 
1312
                username: tiAAAAA
                password: gd6pFWQFEQEQFEW
                virtual-host: /
 发送消息的代码
    @GetMapping("/stream/{name}")
    public String sendMsg(@PathVariable String name) {
        String uuid = UUID.fastUUID().toString();
        streamBridge.send("rec-obj", JSONUtil.toJsonStr(new Person(name)));
        streamBridge.send("rec-str", uuid);
        streamBridge
                .send("yypOutput-out-0", MessageBuilder.withPayload("hello world!".getBytes(StandardCharsets.UTF_8))
                        .setHeader("routeTo", "routingkey-yyp-01").build());
        return uuid;
    }接收消息的代码
@Service
@Slf4j
public class RCom {
    @Bean
    public Consumer<Message<byte[]>> yypInput() throws IOException {
        return message -> {
            String routingKey = String.valueOf(message.getHeaders().get("amqp_receivedRoutingKey"));
            byte[] payload = message.getPayload();
            String str = new String(payload, StandardCharsets.UTF_8);
            log.info("yypInput Rev:"+str+"routingKey:"+routingKey);
        };
    }