Springboot简单集成Kafka
创建项目
在start.spring.io选择依赖并生成项目

配置文件
spring:
application:
name: kafka-stuff
kafka:
bootstrap-servers: host-to-your-kafka:9092
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
consumer:
auto-commit-interval: 1S
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: demoGroup
listener:
concurrency: 1
ack-mode: manual_immediate
missing-topics-fatal: false
创建一个Web接口来向testtopic发送消息
// TestController.java
@RestController
@RequestMapping("/test")
public class TestController {
private final KafkaTemplate<Object, String> kafkaTemplate;
public TestController(KafkaTemplate<Object, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping
public String index() {
kafkaTemplate.send("test", "halo");
return "OK";
}
}创建一个监听器来订阅test topic的消息
@Slf4j
@Component
public class DemoListener {
@KafkaListener(topics = {"test"})
public void demo(ConsumerRecord<String, Object> consumerRecord, Acknowledgment ak) {
log.info("record: {}", consumerRecord);
ak.acknowledge();
}
}把项目跑起来

评论