1.官网下载安装release 版本zip下载安装
2.引入pom依赖
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>3.RocketMQ 配置管理
# RocketMQ 的配置, 这是最低配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
# 发送同一类消息的设置为同一个 group, 保证唯一
group: imooc-study-ecommerce4.RocketMQ 发送消息
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* <h1>通过 RocketMQ 发送消息</h1>
* Spring Messaging 模块
* */
@Slf4j
@Component
public class RocketMQProducer {
/** 类似 Kafka 中的 topic, 默认的读写队列都是4个 */
private static final String TOPIC = "imooc-study-rocketmq";
/** RocketMQ 客户端 */
private final RocketMQTemplate rocketMQTemplate;
public RocketMQProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
/**
* <h2>使用同步的方式发送消息, 不指定 key 和 tag</h2>
* */
public void sendMessageWithValue(String value) {
// 随机选择一个 Topic 的 Message Queue 发送消息
SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value);
log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult));
SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(
TOPIC, value, "Qinyi"
);
log.info("sendMessageWithValue orderly result: [{}]",
JSON.toJSONString(sendResultOrderly));
}
/**
* <h2>使用异步的方式发送消息, 指定 key</h2>
* */
public void sendMessageWithKey(String key, String value) {
Message<String> message = MessageBuilder.withPayload(value)
.setHeader(RocketMQHeaders.KEYS, key).build();
// 异步发送消息, 并设定回调
rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("sendMessageWithKey success result: [{}]",
JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e);
}
});
}
/**
* <h2>使用同步的方式发送消息, 带有 tag, 且发送的是 Java Pojo</h2>
* */
public void sendMessageWithTag(String tag, String value) {
QinyiMessage qinyiMessage = JSON.parseObject(value, QinyiMessage.class);
SendResult sendResult = rocketMQTemplate.syncSend(
String.format("%s:%s", TOPIC, tag),
qinyiMessage
);
log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult));
}
/**
* <h2>使用同步的方式发送消息, 带有 key 和 tag</h2>
* */
public void sendMessageWithAll(String key, String tag, String value) {
Message<String> message = MessageBuilder.withPayload(value)
.setHeader(RocketMQHeaders.KEYS, key).build();
SendResult sendResult = rocketMQTemplate.syncSend(
String.format("%s:%s", TOPIC, tag),
message
);
log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult));
}
}5.RocketMQ 消费消息
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* <h1>第一个 RocketMQ 消费者</h1>
* */
@Slf4j
@Component
@RocketMQMessageListener(
topic = "imooc-study-rocketmq",
consumerGroup = "qinyi-springboot-rocketmq-string"
)
public class RocketMQConsumerString implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
QinyiMessage rocketMessage = JSON.parseObject(message, QinyiMessage.class);
log.info("consume message in RocketMQConsumerString: [{}]",
JSON.toJSONString(rocketMessage));
}
}
指定消费带有 tag 的消息
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* <h1>第二个 RocketMQ 消费者, 指定了消费带有 tag 的消息</h1>
* */
@Slf4j
@Component
@RocketMQMessageListener(
topic = "imooc-study-rocketmq",
consumerGroup = "qinyi-springboot-rocketmq-tag-string",
selectorExpression = "qinyi" // 根据 tag 过滤
)
public class RocketMQConsumerTagString implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
QinyiMessage rocketMessage = JSON.parseObject(message, QinyiMessage.class);
log.info("consume message in RocketMQConsumerTagString: [{}]",
JSON.toJSONString(rocketMessage));
}
}消费带有key和tag
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* <h1>第三个 RocketMQ 消费者, </h1>
* */
@Slf4j
@Component
@RocketMQMessageListener(
topic = "imooc-study-rocketmq",
consumerGroup = "qinyi-springboot-rocketmq-message-ext"
)
public class RocketMQConsumerMessageExt implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String value = new String(message.getBody());
log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]",
message.getKeys(), value);
log.info("MessageExt: [{}]", JSON.toJSONString(message)); // 会慢一些
}
}指定消费带有tag,并带有pojo
import com.alibaba.fastjson.JSON;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* <h1>第四个, RocketMQ 消费者, 指定消费带有 tag 的消息, 且消费的是 Java Pojo</h1>
* */
@Slf4j
@Component
@RocketMQMessageListener(
topic = "imooc-study-rocketmq",
consumerGroup = "qinyi-springboot-rocketmq-tag-object",
selectorExpression = "qinyi" // 根据 tag 做过滤
)
public class RocketMQConsumerObject implements RocketMQListener<QinyiMessage> {
@Override
public void onMessage(QinyiMessage message) {
log.info("consume message in RocketMQConsumerObject: [{}]",
JSON.toJSONString(message));
// so something
}
}
6.发送消息