Complete Spring Boot Kafka Consumer and Publisher Guide with Docker


In this guide, weโ€™ll walk through a complete Spring Boot Kafka Consumer and Publisher exampleโ€ฆ

  • The customer service exposes a REST API that accepts customer data and sends a notification event to Kafka.
  • The notification service consumes that Kafka event and processes it (e.g., logs or sends an email).

๐Ÿ”ง Kafka Setup using Docker Compose

First, set up Kafka locally using the following Docker Compose file:

version: '3.8'

services:
  kafka:
    image: bitnami/kafka:3.6
    container_name: kafka
    ports:
      - "29092:29092"
    environment:
      - KAFKA_KRAFT_CLUSTER_ID=KsdbJY1VQEGaTfcYpGnLZw
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      - ALLOW_PLAINTEXT_LISTENER=yes
    volumes:
      - kafka-data:/bitnami/kafka
    networks:
      - kafka-net
    restart: unless-stopped

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "18080:8080"
    depends_on:
      - kafka
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
    networks:
      - kafka-net
    restart: unless-stopped

networks:
  kafka-net:
    driver: bridge

volumes:
  kafka-data:

โœ… After starting Kafka via Docker (docker-compose up -d), open http://localhost:18080 to access the Kafka UI and create a topic named customer-notification.

Please create topic :


๐Ÿ“ค Kafka Publisher – Customer Service

In this service, we simulate saving a customer (not storing in DB for now), and after that we publish a notification to Kafka.

Even if the notification service is down, Kafka will keep the message and deliver it later when the consumer is up.

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:29092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  application:
    name: customer-service
  profiles:
    active: local

server:
  port: 9092

CustomerNotification DTO

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CustomerNotification {
    private String notificationType;
    private CustomerResponseDTO customerResponseDTO;
    private String message;
}

CustomerResponseDTO

@Getter
@Setter
@Builder
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class CustomerResponseDTO {
    private long customerId;
    private String firstName;
    private String lastName;
    private String company;
    private String address;
    private String city;
    private String state;
    private String country;
    private String postalCode;
    private String phone;
    private String email;
}

Sample REST Controller

@RestController
@RequestMapping("/api/customers")
@RequiredArgsConstructor
public class CustomerController {

    private final CustomerPublisher customerPublisher;

    @PostMapping
    public ResponseEntity<String> createCustomer(@RequestBody CustomerResponseDTO customerDto) {

        // Here you would normally save to DB

        CustomerNotification notification = CustomerNotification.builder()
                .notificationType("NEW_CUSTOMER")
                .customerResponseDTO(customerDto)
                .message("New customer created")
                .build();

        customerPublisher.publishCustomerNotification(notification);

        return ResponseEntity.ok("Customer created and event published to Kafka");
    }
}

Publisher Service

@Service
@Log4j2
public class CustomerPublisher {

    @Autowired
    private KafkaTemplate<String, CustomerNotification> kafkaTemplate;

    public void publishCustomerNotification(CustomerNotification customerNotification) {
        kafkaTemplate.send("customer-notification", customerNotification);
        log.info("Kafka event published: {}", customerNotification);
    }
}

๐Ÿ“ฅ Kafka Consumer – Notification Service

Maven Dependencies


<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

application.yml

spring:
  application:
    name: notification-service
  kafka:
    bootstrap-servers: localhost:29092
    consumer:
      auto-offset-reset: earliest
      group-id: customer-notification-email
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: com.codingboot.customer_service.dto

server:
  port: 8081

Kafka Listener

This method listens to the customer-notification topic. Whenever a new message arrives (like a new customer), it logs the message and can take actions (e.g., send an email).

@Component
@Log4j2
public class NotificationConsumer {

    @KafkaListener(topics = "customer-notification", groupId = "customer-notification-email")
    public void consume(CustomerNotification notification) {
        log.info("Received notification: {}", notification);
        // Process notification, send email, etc.
    }
}

๐Ÿ”„ What If the Notification Service is Down?

If we were calling the notification service using RestTemplate, and the service is down, our customer service would fail or retry. โŒ

But with Kafka, even if the consumer (notification service) is temporarily down, the event is still sent to Kafka and will be delivered once the consumer is back up. โœ…

This makes the system more resilient and loosely coupled.


๐Ÿ“ง Sending Emails (Optional Extension)

If you want to send emails from the consumer side:

Maven Email Dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-mail</artifactId>
</dependency>

Email Config in application.yml

spring:
  mail:
    host: smtp.your-provider.com
    port: 587
    username: your-email@example.com
    password: your-password
    properties:
      mail.smtp.auth: true
      mail.smtp.starttls.enable: true

Send Email Logic

@Autowired
private JavaMailSender mailSender;

public void sendNotificationEmail(String to, String subject, String body) {
    SimpleMailMessage message = new SimpleMailMessage();
    message.setTo(to);
    message.setSubject(subject);
    message.setText(body);
    mailSender.send(message);
}

โœ… Summary

  • Use Docker Compose to spin up Kafka locally.
  • Create Kafka topic using the Kafka UI.
  • Publisher app sends CustomerNotification objects.
  • Consumer app listens to the topic and processes the messages.
  • Optionally, send email notifications using Spring Mail.