Springboot简单集成Kafka

创建项目

start.spring.io选择依赖并生成项目

配置文件

spring:
  application:
    name: kafka-stuff
  kafka:
    bootstrap-servers: host-to-your-kafka:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: 1
    consumer:
      auto-commit-interval: 1S
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: demoGroup
    listener:
      concurrency: 1
      ack-mode: manual_immediate
      missing-topics-fatal: false

创建一个Web接口来向testtopic发送消息

// TestController.java

@RestController
@RequestMapping("/test")
public class TestController {

    private final KafkaTemplate<Object, String> kafkaTemplate;

    public TestController(KafkaTemplate<Object, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @GetMapping
    public String index() {
        kafkaTemplate.send("test", "halo");
        return "OK";
    }
}

创建一个监听器来订阅test topic的消息

@Slf4j
@Component
public class DemoListener {

    @KafkaListener(topics = {"test"})
    public void demo(ConsumerRecord<String, Object> consumerRecord, Acknowledgment ak) {
        log.info("record: {}", consumerRecord);
        ak.acknowledge();
    }
}

把项目跑起来

评论