[新增功能](master): kafka-springboot整合

SpringBoot与Kafka的完美整合代码编辑完成
master
土豆兄弟 2 years ago
parent 828780b20e
commit 762150042d

@ -0,0 +1,46 @@
### Example user template template
### Example user template
# IntelliJ project files
.idea
*.iml
out
gen
### Maven template
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar
### Java template
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dev-protocol</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springboot-kafka</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- SpringBoot 监控端点 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 让 SpringBoot 能够识别 bootstrap.yml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- Spring Data Jpa -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,18 @@
package com.baiye;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* spring-kafka
*
* @author q
* @date 2022/07/23
*/
@SpringBootApplication
public class SpringBootKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApplication.class, args);
}
}

@ -0,0 +1,49 @@
package com.baiye.controller;
import com.baiye.kafka.KafkaProducer;
import com.baiye.vo.QMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka
*
* @author q
* @date 2022/07/23
*/
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {
public final ObjectMapper mapper;
public final KafkaProducer kafkaProducer;
public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) {
this.mapper = mapper;
this.kafkaProducer = kafkaProducer;
}
/**
* -
*
* @param key
* @param topic
* @throws Exception
*/
@GetMapping("/send-message")
public void sendMessage(@RequestParam(required = false)String key, @RequestParam(required = true) String topic)
throws Exception{
QMessage message = new QMessage(1, "baiye-study-kafka");
kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
}
}

@ -0,0 +1,99 @@
package com.baiye.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
/**
* kafka
*
* @author q
* @date 2022/07/23
*/
@Configuration
public class KafkaConfig {
/**
* kafka -
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* kafka
*
* @return {@link ProducerFactory}
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
// 服务器地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// KEY 的序列化方式
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// VALUE 的序列化方式
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
/**
* kafka Producer
*
* @return {@link KafkaTemplate}
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* kafka Consumer
*
* @return {@link ConsumerFactory}
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// 服务器地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 拉取模式 - 每次拉取数量 50
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
// KEY 的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// VALUE 的反序列化方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* kafka Consumer
*
* @return {@link ConcurrentKafkaListenerContainerFactory}
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
// 并发数 - 并发数就是一个消费者启几个线程 - 3个线程
listenerContainerFactory.setConcurrency(3);
// 设置自定义的 Consumer 配置工厂
listenerContainerFactory.setConsumerFactory(consumerFactory());
return listenerContainerFactory;
}
}

@ -0,0 +1,72 @@
package com.baiye.kafka;
import com.baiye.vo.QMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* kafka
*
* @author q
* @date 2022/07/23
*/
@Slf4j
@Component
public class KafkaConsumer {
public final ObjectMapper mapper;
public KafkaConsumer(ObjectMapper mapper) {
this.mapper = mapper;
}
/**
* Kafka 1
*
* @param record
* @throws Exception
*/
@KafkaListener(
// 这个topic groupId指定一定要和producer对应
topics = {"test-kafka-springboot-topic"},
groupId = "test-kafka-springboot-groupId-1"
)
public void listenerWay1(ConsumerRecord<String, String> record) throws Exception {
String key = record.key();
String value = record.value();
QMessage message = mapper.readValue(value, QMessage.class);
log.info("in listenerWay1 consumer kafka message: [{}], [{}]", key, mapper.writeValueAsString(message));
}
/**
* Kafka 2 -
*
* @param record
* @throws Exception
*/
@KafkaListener(
// 这个topic groupId指定一定要和producer对应
topics = {"test-kafka-springboot-topic"},
groupId = "test-kafka-springboot-groupId-2"
)
public void listenerWay2(ConsumerRecord<?, ?> record)throws Exception{
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
try{
QMessage qMessage = mapper.readValue(message.toString(), QMessage.class);
log.info("in listenerWay2 consumer kafka message: [{}]", mapper.writeValueAsString(qMessage));
}catch(Exception e){
log.info("in listenerWay2 ObjectMapper kafka message error !!!", e);
}
}
}
}

@ -0,0 +1,85 @@
package com.baiye.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
/**
* kafka
*
* @author q
* @date 2022/07/23
*/
@Slf4j
@Component
public class KafkaProducer {
public final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
*
*
* @param key - Null/ - kafka
* @param value - Null
* @param topic - Null
*/
public void sendMessage(String key, String value, String topic) {
if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("value or topic is null or empty");
}
// 异步发送消息
ListenableFuture<SendResult<String, String>> future =
StringUtils.isBlank(key) ? kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);
/*
,
*/
// 异步回调的方式获取通知
future.addCallback(
success -> {
assert null != success && null != success.getRecordMetadata();
// 源数据信息 发送到 kafka 的 topic
String topiC = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = success.getRecordMetadata().offset();
// 信息打印出来 - 翻遍进行调试
log.info("send kafka message success: [{}], [{}], [{}]", topiC, partition, offset);
},
failure ->{
// Kafka 出错的概率很低, 基本考虑是网络的问题了
log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);
}
);
// 同步等待的方式获取通知
try{
// 等待5s 返回结果, 否则抛出异常
SendResult<String, String> sendResult = future.get(5, TimeUnit.SECONDS);
// 源数据信息 发送到 kafka 的 topic
String topiC = sendResult.getRecordMetadata().topic();
// 消息发送到的分区
int partition = sendResult.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = sendResult.getRecordMetadata().offset();
// 信息打印出来 - 翻遍进行调试
log.info("send kafka message success: [{}], [{}], [{}]", topiC, partition, offset);
}catch(Exception e){
log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic, e);
}
}
}

@ -0,0 +1,21 @@
package com.baiye.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* kafka
*
* @author q
* @date 2022/07/23
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class QMessage {
private Integer id;
private String productName;
}

@ -0,0 +1,49 @@
server:
port: 8001
servlet:
context-path: /springboot-kafka-dev
spring:
# SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
kafka:
bootstrap-servers: 127.0.0.1:9092
# jpa:
# show-sql: true
# hibernate:
# ddl-auto: none
# properties:
# hibernate.show_sql: true
# hibernate.format_sql: true
# open-in-view: false
# datasource:
# # 数据源
# url: jdbc:mysql://127.0.0.1:3306/q_springboot_kafka?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false
# username: root
# password: root
# type: com.zaxxer.hikari.HikariDataSource
# driver-class-name: com.mysql.cj.jdbc.Driver
# # 连接池
# hikari:
# maximum-pool-size: 8
# minimum-idle: 4
# idle-timeout: 30000
# connection-timeout: 30000
# max-lifetime: 45000
# auto-commit: true
# pool-name: StudyEcommerce
# consumer:
# 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
# group-id: imooc-study-ecommerce
# auto-offset-reset: latest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
# RocketMQ 的配置, 这是最低配置
#rocketmq:
# name-server: 127.0.0.1:9876
# producer:
# # 发送同一类消息的设置为同一个 group, 保证唯一
# group: study-ecommerce

@ -0,0 +1,16 @@
spring:
profiles:
# prod, dev
active: dev
application:
name: study-ecommerce
# 暴露端点
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always

@ -0,0 +1,7 @@
### kafka-send-message
GET http://127.0.0.1:8001/springboot-kafka-dev/kafka/send-message?key=q&topic=test-kafka-springboot-topic
Content-Type: application/json
### kafka-send-message
GET http://127.0.0.1:8001/springboot-kafka-dev/kafka/send-message?topic=test-kafka-springboot-topic
Content-Type: application/json

@ -0,0 +1,15 @@
### message-with-value
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-value
Content-Type: application/json
### message-with-key
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-key
Content-Type: application/json
### message-with-tag
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-tag
Content-Type: application/json
### message-with-all
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-all
Content-Type: application/json

@ -0,0 +1,7 @@
### wrong-rollback-for
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-rollback-for
Content-Type: application/json
### wrong-inner-call
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-inner-call
Content-Type: application/json

@ -34,6 +34,7 @@
<module>utils/dev-protocol-id</module> <module>utils/dev-protocol-id</module>
<module>database/shardingsphere/dev-protocol-shardingsphere-base</module> <module>database/shardingsphere/dev-protocol-shardingsphere-base</module>
<module>mq/rocketmq/rocket-api</module> <module>mq/rocketmq/rocket-api</module>
<module>bigdata/kafka/springboot-kafka</module>
</modules> </modules>
<properties> <properties>

Loading…
Cancel
Save