parent
02898b00e4
commit
08b87590cd
@ -0,0 +1,51 @@
|
||||
package org.example.controller;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.example.rocket.RocketMQProducer;
|
||||
import org.example.vo.QMessage;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* <h1>SpringBoot 集成 RocketMQ</h1>
|
||||
* */
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("/rocket-mq")
|
||||
public class RocketMQController {
|
||||
|
||||
private static final QMessage RocketMQMessage = new QMessage(
|
||||
1,
|
||||
"Q-Study-RocketMQ-In-SpringBoot"
|
||||
);
|
||||
|
||||
private final RocketMQProducer rocketMQProducer;
|
||||
|
||||
public RocketMQController(RocketMQProducer rocketMQProducer) {
|
||||
this.rocketMQProducer = rocketMQProducer;
|
||||
}
|
||||
|
||||
@GetMapping("/message-with-value")
|
||||
public void sendMessageWithValue() {
|
||||
rocketMQProducer.sendMessageWithValue(JSON.toJSONString(RocketMQMessage));
|
||||
}
|
||||
|
||||
@GetMapping("/message-with-key")
|
||||
public void sendMessageWithKey() {
|
||||
rocketMQProducer.sendMessageWithKey("qy", JSON.toJSONString(RocketMQMessage));
|
||||
}
|
||||
|
||||
@GetMapping("/message-with-tag")
|
||||
public void sendMessageWithTag() {
|
||||
rocketMQProducer.sendMessageWithTag("qy",
|
||||
JSON.toJSONString(RocketMQMessage));
|
||||
}
|
||||
|
||||
@GetMapping("/message-with-all")
|
||||
public void sendMessageWithAll() {
|
||||
rocketMQProducer.sendMessageWithAll("q", "q",
|
||||
JSON.toJSONString(RocketMQMessage));
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package org.example.rocket;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* <h1>第三个 RocketMQ 消费者, </h1>
|
||||
* */
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "q-study-rocketmq",
|
||||
consumerGroup = "q-springboot-rocketmq-message-ext"
|
||||
)
|
||||
public class RocketMQConsumerMessageExt implements RocketMQListener<MessageExt> {
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt message) {
|
||||
|
||||
String value = new String(message.getBody());
|
||||
log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]",
|
||||
message.getKeys(), value); // 能拿到消息中的 key
|
||||
log.info("MessageExt: [{}]", JSON.toJSONString(message)); // 会慢一些
|
||||
}
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package org.example.rocket;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.example.vo.QMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* <h1>第四个, RocketMQ 消费者, 指定消费带有 tag 的消息, 且消费的是 Java Pojo</h1>
|
||||
* */
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "imooc-study-rocketmq",
|
||||
consumerGroup = "q-springboot-rocketmq-tag-object",
|
||||
selectorExpression = "q" // 根据 tag 做过滤
|
||||
)
|
||||
public class RocketMQConsumerObject implements RocketMQListener<QMessage> {
|
||||
@Override
|
||||
public void onMessage(QMessage message) {
|
||||
log.info("consume message in RocketMQConsumerObject: [{}]",
|
||||
JSON.toJSONString(message));
|
||||
// so something
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package org.example.rocket;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.example.vo.QMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* <h1>第一个 RocketMQ 消费者</h1>
|
||||
* */
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "q-study-rocketmq",
|
||||
consumerGroup = "q-springboot-rocketmq-string"
|
||||
)
|
||||
public class RocketMQConsumerString implements RocketMQListener<String> {
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
|
||||
QMessage rocketMessage = JSON.parseObject(message, QMessage.class);
|
||||
log.info("consume message in RocketMQConsumerString: [{}]",
|
||||
JSON.toJSONString(rocketMessage));
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package org.example.rocket;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.example.vo.QMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* <h1>第二个 RocketMQ 消费者, 指定了消费带有 tag 的消息</h1>
|
||||
* */
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "q-study-rocketmq",
|
||||
consumerGroup = "q-springboot-rocketmq-tag-string",
|
||||
selectorExpression = "qy" // 根据 tag 过滤, tag 中要带有 qy
|
||||
)
|
||||
public class RocketMQConsumerTagString implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
|
||||
QMessage rocketMessage = JSON.parseObject(message, QMessage.class);
|
||||
log.info("consume message in RocketMQConsumerTagString: [{}]",
|
||||
JSON.toJSONString(rocketMessage));
|
||||
}
|
||||
}
|
@ -0,0 +1,100 @@
|
||||
package org.example.rocket;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.apache.rocketmq.spring.support.RocketMQHeaders;
|
||||
import org.example.vo.QMessage;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* <h1>通过 RocketMQ 发送消息</h1>
|
||||
* Spring Messaging 模块
|
||||
* */
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RocketMQProducer {
|
||||
|
||||
/** 类似 Kafka 中的 topic, 默认的读写队列都是4个, 默认自动创建topic */
|
||||
private static final String TOPIC = "q-study-rocketmq";
|
||||
|
||||
/** RocketMQ 客户端 */
|
||||
private final RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
public RocketMQProducer(RocketMQTemplate rocketMQTemplate) {
|
||||
this.rocketMQTemplate = rocketMQTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>使用同步的方式发送消息, 不指定 key 和 tag</h2>
|
||||
* */
|
||||
public void sendMessageWithValue(String value) {
|
||||
|
||||
// 随机选择一个 Topic 的 Message Queue 发送消息
|
||||
SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value);
|
||||
log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult));
|
||||
|
||||
SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(
|
||||
TOPIC, value, "QQQ"
|
||||
);
|
||||
log.info("sendMessageWithValue orderly result: [{}]",
|
||||
JSON.toJSONString(sendResultOrderly));
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>使用异步的方式发送消息, 指定 key</h2>
|
||||
* */
|
||||
public void sendMessageWithKey(String key, String value) {
|
||||
|
||||
Message<String> message = MessageBuilder.withPayload(value)
|
||||
// 这个Key 不是分区的效果, 只是方便进行查询, 在设置的时候, 可以使用空格进行分开, 例如: aaaa bbb
|
||||
.setHeader(RocketMQHeaders.KEYS, key).build();
|
||||
|
||||
// 异步发送消息, 并设定回调
|
||||
rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("sendMessageWithKey success result: [{}]",
|
||||
JSON.toJSONString(sendResult));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable e) {
|
||||
log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>使用同步的方式发送消息, 带有 tag, 且发送的是 Java Pojo</h2>
|
||||
* 发送消息可以定义自己的消息数据结构
|
||||
* */
|
||||
public void sendMessageWithTag(String tag, String value) {
|
||||
|
||||
QMessage qMessage = JSON.parseObject(value, QMessage.class);
|
||||
SendResult sendResult = rocketMQTemplate.syncSend(
|
||||
String.format("%s:%s", TOPIC, tag), // 不同的消费者组使用不同的 tag
|
||||
qMessage
|
||||
);
|
||||
log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult));
|
||||
}
|
||||
|
||||
/**
|
||||
* <h2>使用同步的方式发送消息, 带有 key 和 tag</h2>
|
||||
* */
|
||||
public void sendMessageWithAll(String key, String tag, String value) {
|
||||
|
||||
Message<String> message = MessageBuilder.withPayload(value)
|
||||
.setHeader(RocketMQHeaders.KEYS, key).build();
|
||||
SendResult sendResult = rocketMQTemplate.syncSend(
|
||||
String.format("%s:%s", TOPIC, tag),
|
||||
message
|
||||
);
|
||||
log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult));
|
||||
}
|
||||
}
|
@ -1,15 +1,15 @@
|
||||
### message-with-value
|
||||
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-value
|
||||
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-value
|
||||
Content-Type: application/json
|
||||
|
||||
### message-with-key
|
||||
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-key
|
||||
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-key
|
||||
Content-Type: application/json
|
||||
|
||||
### message-with-tag
|
||||
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-tag
|
||||
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-tag
|
||||
Content-Type: application/json
|
||||
|
||||
### message-with-all
|
||||
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-all
|
||||
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-all
|
||||
Content-Type: application/json
|
||||
|
Loading…
Reference in New Issue