Administrator
发布于 2024-01-24 / 5 阅读
0

SpringCloud Stream(消息驱动组件)

Sream能屏蔽底层的中间件,只需关注stream

1.引入依赖

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>
        <!-- SpringCloud Stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <!-- SpringCloud Stream + Kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <!-- SpringCloud Stream + RocketMQ -->
        <!--        <dependency>-->
        <!--            <groupId>com.alibaba.cloud</groupId>-->
        <!--            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>-->
        <!--        </dependency>-->

2.配置bootstrap文件

 # 消息驱动的配置
spring:
    stream:
      # SpringCloud Stream + Kafka
      kafka:
        binder:
          brokers: 127.0.0.1:9092
          auto-create-topics: true  # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
      # SpringCloud Stream + RocketMQ
      #      rocketmq:
      #        binder:
      #          name-server: 127.0.0.1:9876
      # 开启 stream 分区支持
      instanceCount: 1  # 消费者的总数
      instanceIndex: 0  # 当前消费者的索引
      bindings:
        # 默认发送方
        output:      # 这里用 Stream 给我们提供的默认 output 信道
          destination: ecommerce-stream-client-default    # 消息发往的目的地, Kafka 中就是 Topic
          content-type: text/plain    # 消息发送的格式, 接收端不用指定格式, 但是发送端要
          # 消息分区
          producer:
            # partitionKeyExpression: payload.author  # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
            partitionCount: 1   # 分区大小
            # 使用自定义的分区策略, 注释掉 partitionKeyExpression
            partitionKeyExtractorName: qinyiPartitionKeyExtractorStrategy
            partitionSelectorName: qinyiPartitionSelectorStrategy
        # 默认接收方
        input:      # 这里用 Stream 给我们提供的默认 input 信道
          destination: ecommerce-stream-client-default
          group: e-commerce-qinyi-default
          # 消费者开启分区支持
          consumer:
            partitioned: true

        # Qinyi 发送方
        qinyiOutput:
          destination: ecommerce-stream-client-qinyi
          content-type: text/plain
        # Qinyi 接收方
        qinyiInput:
          destination: ecommerce-stream-client-qinyi
          group: e-commerce-qinyi-qinyi

3.通信信道

(1)默认发送信息
```language

import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

/**
 * <h1>使用默认的通信信道实现消息的发送</h1>
 * */
@Slf4j
@EnableBinding(Source.class)
public class DefaultSendService {

    private final Source source;

    public DefaultSendService(Source source) {
        this.source = source;
    }

    /**
     * <h2>使用默认的输出信道发送消息</h2>
     * */
    public void sendMessage(QinyiMessage message) {

        String _message = JSON.toJSONString(message);
        log.info("in DefaultSendService send message: [{}]", _message);

        // Spring Messaging, 统一消息的编程模型, 是 Stream 组件的重要组成部分之一
        source.output().send(MessageBuilder.withPayload(_message).build());
    }
}

默认接受消息


import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

/**
 * <h1>使用默认的信道实现消息的接收</h1>
 * */
@Slf4j
@EnableBinding(Sink.class)
public class DefaultReceiveService {

    /**
     * <h2>使用默认的输入信道接收消息</h2>
     * */
    @StreamListener(Sink.INPUT)
    public void receiveMessage(Object payload) {

        log.info("in DefaultReceiveService consume message start");
        QinyiMessage qinyiMessage = JSON.parseObject(
                payload.toString(), QinyiMessage.class
        );
        // 消费消息
        log.info("in DefaultReceiveService consume message success: [{}]",
                JSON.toJSONString(qinyiMessage));
    }
}

(2)自定义信道发送消息


import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * <h1>自定义输出信道</h1>
 * */
public interface QinyiSource {

    String OUTPUT = "qinyiOutput";

    /** 输出信道的名称是 qinyiOutput, 需要使用 Stream 绑定器在 yml 文件中声明 */
    @Output(QinyiSource.OUTPUT)
    MessageChannel qinyiOutput();
}

import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;

/**
 * <h1>使用自定义的通信信道 QinyiSource 实现消息的发送</h1>
 * */
@Slf4j
@EnableBinding(QinyiSource.class)
public class QinyiSendService {

    private final QinyiSource qinyiSource;

    public QinyiSendService(QinyiSource qinyiSource) {
        this.qinyiSource = qinyiSource;
    }

    /**
     * <h2>使用自定义的输出信道发送消息</h2>
     * */
    public void sendMessage(QinyiMessage message) {

        String _message = JSON.toJSONString(message);
        log.info("in QinyiSendService send message: [{}]", _message);
        qinyiSource.qinyiOutput().send(
                MessageBuilder.withPayload(_message).build()
        );
    }
}

自定义信道接收消息


import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * <h1>自定义输入信道</h1>
 * */
public interface QinyiSink {

    String INPUT = "qinyiInput";

    /** 输入信道的名称是 qinyiInput, 需要使用 Stream 绑定器在 yml 文件中配置*/
    @Input(QinyiSink.INPUT)
    SubscribableChannel qinyiInput();
}

import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;

/**
 * <h1>使用自定义的输入信道实现消息的接收</h1>
 * */
@Slf4j
@EnableBinding(QinyiSink.class)
public class QinyiReceiveService {

    /** 使用自定义的输入信道接收消息 */
    @StreamListener(QinyiSink.INPUT)
    public void receiveMessage(@Payload Object payload) {

        log.info("in QinyiReceiveService consume message start");
        QinyiMessage qinyiMessage = JSON.parseObject(payload.toString(), QinyiMessage.class);
        log.info("in QinyiReceiveService consume message success: [{}]",
                JSON.toJSONString(qinyiMessage));
    }
}

4.构建消息驱动

@Slf4j
@RestController
@RequestMapping("/message")
public class MessageController {

    private final DefaultSendService defaultSendService;
    private final QinyiSendService qinyiSendService;

    public MessageController(DefaultSendService defaultSendService,
                             QinyiSendService qinyiSendService) {
        this.defaultSendService = defaultSendService;
        this.qinyiSendService = qinyiSendService;
    }

    /**
     * <h2>默认信道</h2>
     * */
    @GetMapping("/default")
    public void defaultSend() {
        defaultSendService.sendMessage(QinyiMessage.defaultMessage());
    }

    /**
     * <h2>自定义信道</h2>
     * */
    @GetMapping("/qinyi")
    public void qinyiSend() {
        qinyiSendService.sendMessage(QinyiMessage.defaultMessage());
    }
}