Administrator
发布于 2024-01-24 / 3 阅读
0

SpringBoot 集成RocketMQ

1.官网下载安装release 版本zip下载安装

2.引入pom依赖

<!-- RocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

3.RocketMQ 配置管理

# RocketMQ 的配置, 这是最低配置
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    # 发送同一类消息的设置为同一个 group, 保证唯一
    group: imooc-study-ecommerce

4.RocketMQ 发送消息


import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * <h1>通过 RocketMQ 发送消息</h1>
 * Spring Messaging 模块
 * */
@Slf4j
@Component
public class RocketMQProducer {

    /** 类似 Kafka 中的 topic, 默认的读写队列都是4个 */
    private static final String TOPIC = "imooc-study-rocketmq";

    /** RocketMQ 客户端 */
    private final RocketMQTemplate rocketMQTemplate;

    public RocketMQProducer(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
    }

    /**
     * <h2>使用同步的方式发送消息, 不指定 key 和 tag</h2>
     * */
    public void sendMessageWithValue(String value) {

        // 随机选择一个 Topic 的 Message Queue 发送消息
        SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value);
        log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult));

        SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(
                TOPIC, value, "Qinyi"
        );
        log.info("sendMessageWithValue orderly result: [{}]",
                JSON.toJSONString(sendResultOrderly));
    }

    /**
     * <h2>使用异步的方式发送消息, 指定 key</h2>
     * */
    public void sendMessageWithKey(String key, String value) {

        Message<String> message = MessageBuilder.withPayload(value)
                .setHeader(RocketMQHeaders.KEYS, key).build();

        // 异步发送消息, 并设定回调
        rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("sendMessageWithKey success result: [{}]",
                        JSON.toJSONString(sendResult));
            }

            @Override
            public void onException(Throwable e) {
                log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e);
            }
        });
    }

    /**
     * <h2>使用同步的方式发送消息, 带有 tag, 且发送的是 Java Pojo</h2>
     * */
    public void sendMessageWithTag(String tag, String value) {

        QinyiMessage qinyiMessage = JSON.parseObject(value, QinyiMessage.class);
        SendResult sendResult = rocketMQTemplate.syncSend(
                String.format("%s:%s", TOPIC, tag),
                qinyiMessage 
        );
        log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult));
    }

    /**
     * <h2>使用同步的方式发送消息, 带有 key 和 tag</h2>
     * */
    public void sendMessageWithAll(String key, String tag, String value) {

        Message<String> message = MessageBuilder.withPayload(value)
                .setHeader(RocketMQHeaders.KEYS, key).build();
        SendResult sendResult = rocketMQTemplate.syncSend(
                String.format("%s:%s", TOPIC, tag),
                message
        );
        log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult));
    }
}

5.RocketMQ 消费消息

import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * <h1>第一个 RocketMQ 消费者</h1>
 * */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "imooc-study-rocketmq",
        consumerGroup = "qinyi-springboot-rocketmq-string"
)
public class RocketMQConsumerString implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {

        QinyiMessage rocketMessage = JSON.parseObject(message, QinyiMessage.class);
        log.info("consume message in RocketMQConsumerString: [{}]",
                JSON.toJSONString(rocketMessage));
    }
}

指定消费带有 tag 的消息


import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * <h1>第二个 RocketMQ 消费者, 指定了消费带有 tag 的消息</h1>
 * */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "imooc-study-rocketmq",
        consumerGroup = "qinyi-springboot-rocketmq-tag-string",
        selectorExpression = "qinyi"        // 根据 tag 过滤
)
public class RocketMQConsumerTagString implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {

        QinyiMessage rocketMessage = JSON.parseObject(message, QinyiMessage.class);
        log.info("consume message in RocketMQConsumerTagString: [{}]",
                JSON.toJSONString(rocketMessage));
    }
}

消费带有key和tag


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * <h1>第三个 RocketMQ 消费者, </h1>
 * */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "imooc-study-rocketmq",
        consumerGroup = "qinyi-springboot-rocketmq-message-ext"
)
public class RocketMQConsumerMessageExt implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {

        String value = new String(message.getBody());
        log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]",
                message.getKeys(), value);
        log.info("MessageExt: [{}]", JSON.toJSONString(message));   // 会慢一些
    }
}

指定消费带有tag,并带有pojo


import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * <h1>第四个, RocketMQ 消费者, 指定消费带有 tag 的消息, 且消费的是 Java Pojo</h1>
 * */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "imooc-study-rocketmq",
        consumerGroup = "qinyi-springboot-rocketmq-tag-object",
        selectorExpression = "qinyi"    // 根据 tag 做过滤
)
public class RocketMQConsumerObject implements RocketMQListener<QinyiMessage> {

    @Override
    public void onMessage(QinyiMessage message) {

        log.info("consume message in RocketMQConsumerObject: [{}]",
                JSON.toJSONString(message));
        // so something
    }
}

6.发送消息