Sream能屏蔽底层的中间件,只需关注stream
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<!-- SpringCloud Stream-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!-- SpringCloud Stream + Kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<!-- SpringCloud Stream + RocketMQ -->
<!-- <dependency>-->
<!-- <groupId>com.alibaba.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>-->
<!-- </dependency>-->2.配置bootstrap文件
# 消息驱动的配置
spring:
stream:
# SpringCloud Stream + Kafka
kafka:
binder:
brokers: 127.0.0.1:9092
auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
# SpringCloud Stream + RocketMQ
# rocketmq:
# binder:
# name-server: 127.0.0.1:9876
# 开启 stream 分区支持
instanceCount: 1 # 消费者的总数
instanceIndex: 0 # 当前消费者的索引
bindings:
# 默认发送方
output: # 这里用 Stream 给我们提供的默认 output 信道
destination: ecommerce-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic
content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要
# 消息分区
producer:
# partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
partitionCount: 1 # 分区大小
# 使用自定义的分区策略, 注释掉 partitionKeyExpression
partitionKeyExtractorName: qinyiPartitionKeyExtractorStrategy
partitionSelectorName: qinyiPartitionSelectorStrategy
# 默认接收方
input: # 这里用 Stream 给我们提供的默认 input 信道
destination: ecommerce-stream-client-default
group: e-commerce-qinyi-default
# 消费者开启分区支持
consumer:
partitioned: true
# Qinyi 发送方
qinyiOutput:
destination: ecommerce-stream-client-qinyi
content-type: text/plain
# Qinyi 接收方
qinyiInput:
destination: ecommerce-stream-client-qinyi
group: e-commerce-qinyi-qinyi
3.通信信道
(1)默认发送信息
```language
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
/**
* <h1>使用默认的通信信道实现消息的发送</h1>
* */
@Slf4j
@EnableBinding(Source.class)
public class DefaultSendService {
private final Source source;
public DefaultSendService(Source source) {
this.source = source;
}
/**
* <h2>使用默认的输出信道发送消息</h2>
* */
public void sendMessage(QinyiMessage message) {
String _message = JSON.toJSONString(message);
log.info("in DefaultSendService send message: [{}]", _message);
// Spring Messaging, 统一消息的编程模型, 是 Stream 组件的重要组成部分之一
source.output().send(MessageBuilder.withPayload(_message).build());
}
}默认接受消息
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
* <h1>使用默认的信道实现消息的接收</h1>
* */
@Slf4j
@EnableBinding(Sink.class)
public class DefaultReceiveService {
/**
* <h2>使用默认的输入信道接收消息</h2>
* */
@StreamListener(Sink.INPUT)
public void receiveMessage(Object payload) {
log.info("in DefaultReceiveService consume message start");
QinyiMessage qinyiMessage = JSON.parseObject(
payload.toString(), QinyiMessage.class
);
// 消费消息
log.info("in DefaultReceiveService consume message success: [{}]",
JSON.toJSONString(qinyiMessage));
}
}(2)自定义信道发送消息
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* <h1>自定义输出信道</h1>
* */
public interface QinyiSource {
String OUTPUT = "qinyiOutput";
/** 输出信道的名称是 qinyiOutput, 需要使用 Stream 绑定器在 yml 文件中声明 */
@Output(QinyiSource.OUTPUT)
MessageChannel qinyiOutput();
}
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
/**
* <h1>使用自定义的通信信道 QinyiSource 实现消息的发送</h1>
* */
@Slf4j
@EnableBinding(QinyiSource.class)
public class QinyiSendService {
private final QinyiSource qinyiSource;
public QinyiSendService(QinyiSource qinyiSource) {
this.qinyiSource = qinyiSource;
}
/**
* <h2>使用自定义的输出信道发送消息</h2>
* */
public void sendMessage(QinyiMessage message) {
String _message = JSON.toJSONString(message);
log.info("in QinyiSendService send message: [{}]", _message);
qinyiSource.qinyiOutput().send(
MessageBuilder.withPayload(_message).build()
);
}
}
自定义信道接收消息
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* <h1>自定义输入信道</h1>
* */
public interface QinyiSink {
String INPUT = "qinyiInput";
/** 输入信道的名称是 qinyiInput, 需要使用 Stream 绑定器在 yml 文件中配置*/
@Input(QinyiSink.INPUT)
SubscribableChannel qinyiInput();
}
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
/**
* <h1>使用自定义的输入信道实现消息的接收</h1>
* */
@Slf4j
@EnableBinding(QinyiSink.class)
public class QinyiReceiveService {
/** 使用自定义的输入信道接收消息 */
@StreamListener(QinyiSink.INPUT)
public void receiveMessage(@Payload Object payload) {
log.info("in QinyiReceiveService consume message start");
QinyiMessage qinyiMessage = JSON.parseObject(payload.toString(), QinyiMessage.class);
log.info("in QinyiReceiveService consume message success: [{}]",
JSON.toJSONString(qinyiMessage));
}
}4.构建消息驱动
@Slf4j
@RestController
@RequestMapping("/message")
public class MessageController {
private final DefaultSendService defaultSendService;
private final QinyiSendService qinyiSendService;
public MessageController(DefaultSendService defaultSendService,
QinyiSendService qinyiSendService) {
this.defaultSendService = defaultSendService;
this.qinyiSendService = qinyiSendService;
}
/**
* <h2>默认信道</h2>
* */
@GetMapping("/default")
public void defaultSend() {
defaultSendService.sendMessage(QinyiMessage.defaultMessage());
}
/**
* <h2>自定义信道</h2>
* */
@GetMapping("/qinyi")
public void qinyiSend() {
qinyiSendService.sendMessage(QinyiMessage.defaultMessage());
}
}