1.消息分组:在yml文件中配置接收方组
消息分区:开启stream分区支持、发送方消息分区
# 消息驱动的配置
stream:
# SpringCloud Stream + Kafka
kafka:
binder:
brokers: 127.0.0.1:9092
auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
# SpringCloud Stream + RocketMQ
# rocketmq:
# binder:
# name-server: 127.0.0.1:9876
# 开启 stream 分区支持
instanceCount: 1 # 消费者的总数
instanceIndex: 0 # 当前消费者的索引
bindings:
# 默认发送方
output: # 这里用 Stream 给我们提供的默认 output 信道
destination: ecommerce-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic
content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要
# 消息分区
producer:
# partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
partitionCount: 1 # 分区大小
# 使用自定义的分区策略, 注释掉 partitionKeyExpression
partitionKeyExtractorName: qinyiPartitionKeyExtractorStrategy
partitionSelectorName: qinyiPartitionSelectorStrategy
# 默认接收方
input: # 这里用 Stream 给我们提供的默认 input 信道
destination: ecommerce-stream-client-default
group: e-commerce-qinyi-default
# 消费者开启分区支持
consumer:
partitioned: true
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* <h1>自定义从 Message 中提取 partition key 的策略</h1>
* */
@Slf4j
@Component
public class QinyiPartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
QinyiMessage qinyiMessage = JSON.parseObject(
message.getPayload().toString(), QinyiMessage.class
);
// 自定义提取 key
String key = qinyiMessage.getProjectName();
log.info("SpringCloud Stream Qinyi Partition Key: [{}]", key);
return key;
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
import org.springframework.stereotype.Component;
/**
* <h1>决定 message 发送到哪个分区的策略</h1>
* */
@Slf4j
@Component
public class QinyiPartitionSelectorStrategy implements PartitionSelectorStrategy {
/**
* <h2>选择分区的策略</h2>
* */
@Override
public int selectPartition(Object key, int partitionCount) {
int partition = key.toString().hashCode() % partitionCount;
log.info("SpringCloud Stream Qinyi Selector info: [{}], [{}], [{}]",
key.toString(), partitionCount, partition);
return partition;
}
}