Administrator
发布于 2024-01-24 / 1 阅读
0

SpringCloud Stream 消息分组与分区

1.消息分组:在yml文件中配置接收方组

消息分区:开启stream分区支持、发送方消息分区

# 消息驱动的配置
    stream:
      # SpringCloud Stream + Kafka
      kafka:
        binder:
          brokers: 127.0.0.1:9092
          auto-create-topics: true  # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
      # SpringCloud Stream + RocketMQ
      #      rocketmq:
      #        binder:
      #          name-server: 127.0.0.1:9876
      # 开启 stream 分区支持
      instanceCount: 1  # 消费者的总数
      instanceIndex: 0  # 当前消费者的索引
      bindings:
        # 默认发送方
        output:      # 这里用 Stream 给我们提供的默认 output 信道
          destination: ecommerce-stream-client-default    # 消息发往的目的地, Kafka 中就是 Topic
          content-type: text/plain    # 消息发送的格式, 接收端不用指定格式, 但是发送端要
          # 消息分区
          producer:
            # partitionKeyExpression: payload.author  # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
            partitionCount: 1   # 分区大小
            # 使用自定义的分区策略, 注释掉 partitionKeyExpression
            partitionKeyExtractorName: qinyiPartitionKeyExtractorStrategy
            partitionSelectorName: qinyiPartitionSelectorStrategy
        # 默认接收方
        input:      # 这里用 Stream 给我们提供的默认 input 信道
          destination: ecommerce-stream-client-default
          group: e-commerce-qinyi-default
          # 消费者开启分区支持
          consumer:
            partitioned: true

import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * <h1>自定义从 Message 中提取 partition key 的策略</h1>
 * */
@Slf4j
@Component
public class QinyiPartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy {

    @Override
    public Object extractKey(Message<?> message) {

        QinyiMessage qinyiMessage = JSON.parseObject(
                message.getPayload().toString(), QinyiMessage.class
        );
        // 自定义提取 key
        String key = qinyiMessage.getProjectName();
        log.info("SpringCloud Stream Qinyi Partition Key: [{}]", key);
        return key;
    }
}

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
import org.springframework.stereotype.Component;

/**
 * <h1>决定 message 发送到哪个分区的策略</h1>
 * */
@Slf4j
@Component
public class QinyiPartitionSelectorStrategy implements PartitionSelectorStrategy {

    /**
     * <h2>选择分区的策略</h2>
     * */
    @Override
    public int selectPartition(Object key, int partitionCount) {

        int partition = key.toString().hashCode() % partitionCount;
        log.info("SpringCloud Stream Qinyi Selector info: [{}], [{}], [{}]",
                key.toString(), partitionCount, partition);

        return partition;
    }
}