1.添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- SpringBoot 监控端点 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 让 SpringBoot 能够识别 bootstrap.yml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- Spring Data Jpa -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>2.代码自定义配置
/**
* <h1>通过代码自定义 Kafka 配置</h1>
* */
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* <h2>Kafka Producer 工厂类配置</h2>
* */
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
/**
* <h2>Kafka Producer 客户端</h2>
* */
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* <h2>Kafka Consumer 工厂类配置(server及)</h2>
* */
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* <h2>Kafka Consumer 监听器工厂类配置</h2>
* */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// 并发数就是一个消费者实例起几个线程
factory.setConcurrency(3);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}4.消费消息
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imooc.ecommerce.vo.QinyiMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* <h1>Kafka 消费者</h1>
* */
@Slf4j
@Component
public class KafkaConsumer {
private final ObjectMapper mapper;
public KafkaConsumer(ObjectMapper mapper) {
this.mapper = mapper;
}
/**
* <h2>监听 Kafka 消息并消费</h2>
* */
@KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka")
public void listener01(ConsumerRecord<String, String> record) throws Exception {
String key = record.key();
String value = record.value();
QinyiMessage kafkaMessage = mapper.readValue(value, QinyiMessage.class);
log.info("in listener01 consume kafka message: [{}], [{}]",
key, mapper.writeValueAsString(kafkaMessage));
}
/**
* <h2>监听 Kafka 消息并消费</h2>
* */
@KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka-1")
public void listener02(ConsumerRecord<?, ?> record) throws Exception {
Optional<?> _kafkaMessage = Optional.ofNullable(record.value());
if (_kafkaMessage.isPresent()) {
Object message = _kafkaMessage.get();
QinyiMessage kafkaMessage = mapper.readValue(message.toString(),
QinyiMessage.class);
log.info("in listener02 consume kafka message: [{}]",
mapper.writeValueAsString(kafkaMessage));
}
}
}5.kafka 发送消息