Building Event-Driven Systems with Kafka and Spring Boot: Production-Ready Patterns

Event-driven architecture (EDA) enables loose coupling, scalability, and real-time processing. This guide covers production-ready patterns using Apache Kafka and Spring Boot 3.

1. Event Schema Design

Proper event design is critical for evolvable systems:

Use Avro with Schema Registry

// order.avsc - Avro schema
{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.dilendra.events",
  "fields": [
    {
      "name": "eventId",
      "type": "string"
    },
    {
      "name": "aggregateId",
      "type": "string"
    },
    {
      "name": "eventType",
      "type": {
        "type": "enum",
        "name": "EventType",
        "symbols": ["ORDER_CREATED", "ORDER_UPDATED", "ORDER_CANCELLED"]
      }
    },
    {
      "name": "payload",
      "type": {
        "type": "record",
        "name": "OrderPayload",
        "fields": [
          {"name": "orderId", "type": "string"},
          {"name": "customerId", "type": "string"},
          {"name": "amount", "type": "double"},
          {"name": "items", "type": {"type": "array", "items": "string"}}
        ]
      }
    },
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "EventMetadata",
        "fields": [
          {"name": "timestamp", "type": "long"},
          {"name": "source", "type": "string"},
          {"name": "correlationId", "type": "string"}
        ]
      }
    }
  ]
}

2. Producer Implementation

@Component
public class EventPublisher {
  
  private final KafkaTemplate<String, Object> kafkaTemplate;
  private final ObjectMapper objectMapper;
  
  public EventPublisher(KafkaTemplate<String, Object> kafkaTemplate,
                       ObjectMapper objectMapper) {
    this.kafkaTemplate = kafkaTemplate;
    this.objectMapper = objectMapper;
  }
  
  @Transactional
  public CompletableFuture<SendResult<String, Object>> publish(
      String topic, DomainEvent event) {
    
    // Add metadata
    event.getMetadata().setTimestamp(System.currentTimeMillis());
    event.getMetadata().setSource("order-service");
    event.getMetadata().setCorrelationId(MDC.get("correlationId"));
    
    // Create producer record with headers
    ProducerRecord<String, Object> record = new ProducerRecord<>(
      topic, 
      event.getAggregateId(), 
      event
    );
    
    // Add tracing headers
    record.headers().add("traceId", 
      MDC.get("traceId").getBytes(StandardCharsets.UTF_8));
    record.headers().add("spanId", 
      MDC.get("spanId").getBytes(StandardCharsets.UTF_8));
    
    return kafkaTemplate.send(record)
      .completable()
      .exceptionally(ex -> {
        log.error("Failed to publish event to topic: {}", topic, ex);
        // Implement dead letter queue logic
        publishToDLQ(topic, event, ex);
        throw new EventPublishingException("Failed to publish event", ex);
      });
  }
  
  private void publishToDLQ(String originalTopic, 
                           DomainEvent event, 
                           Throwable cause) {
    DLQEvent dlqEvent = new DLQEvent(originalTopic, event, cause);
    kafkaTemplate.send("dead-letter-queue", dlqEvent);
  }
}

3. Consumer Implementation with Error Handling

@Component
@Slf4j
public class OrderEventConsumer {
  
  private final RetryTemplate retryTemplate;
  private final DeadLetterPublishingRecoverer dlqRecoverer;
  
  public OrderEventConsumer(KafkaTemplate<String, Object> kafkaTemplate) {
    this.retryTemplate = RetryTemplate.builder()
      .maxAttempts(3)
      .exponentialBackoff(1000, 2, 10000)
      .retryOn(DataAccessException.class)
      .retryOn(TransientDataAccessException.class)
      .build();
    
    this.dlqRecoverer = new DeadLetterPublishingRecoverer(
      kafkaTemplate,
      (record, ex) -> new TopicPartition("order-events-dlq", 
        record.partition())
    );
  }
  
  @KafkaListener(
    topics = "order-events",
    groupId = "order-processor-group",
    containerFactory = "kafkaListenerContainerFactory"
  )
  public void consume(ConsumerRecord<String, OrderCreated> record,
                     Acknowledgment acknowledgment) {
    
    try {
      // Process with retry
      retryTemplate.execute(context -> {
        processOrderEvent(record.value());
        return null;
      });
      
      // Manual acknowledgment
      acknowledgment.acknowledge();
      
    } catch (Exception ex) {
      log.error("Failed to process order event after retries", ex);
      
      // Send to DLQ
      dlqRecoverer.accept(record, ex);
      
      // Acknowledge to prevent reprocessing
      acknowledgment.acknowledge();
    }
  }
  
  private void processOrderEvent(OrderCreated event) {
    // Business logic here
    log.info("Processing order: {}", event.getPayload().getOrderId());
    
    // Validate event
    validateEvent(event);
    
    // Update read model
    updateOrderProjection(event);
    
    // Trigger downstream actions
    triggerDownstreamActions(event);
  }
}

4. Kafka Configuration for Production

# application-production.yml
spring:
  kafka:
    bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
    
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
        acks: all
        retries: 10
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5
        compression.type: snappy
        linger.ms: 5
        batch.size: 16384
    
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
        specific.avro.reader: true
        auto.offset.reset: latest
        enable.auto.commit: false
        isolation.level: read_committed
        fetch.min.bytes: 1
        fetch.max.wait.ms: 500
        max.partition.fetch.bytes: 1048576
    
    listener:
      ack-mode: manual_immediate
      concurrency: 3
      poll-timeout: 5000
      idle-between-polls: 5000

5. Monitoring and Observability

@Configuration
public class KafkaMetricsConfig {
  
  @Bean
  public MeterRegistryCustomizer<MeterRegistry> kafkaMetrics() {
    return registry -> {
      // Consumer metrics
      new KafkaConsumerMetrics().bindTo(registry);
      
      // Producer metrics
      new KafkaProducerMetrics().bindTo(registry);
      
      // Custom metrics
      Metrics.addRegistry(registry);
    };
  }
  
  @Bean
  public MicrometerConsumerListener<String, Object> consumerListener(
      MeterRegistry meterRegistry) {
    return new MicrometerConsumerListener<>(meterRegistry);
  }
  
  @Bean
  public MicrometerProducerListener<String, Object> producerListener(
      MeterRegistry meterRegistry) {
    return new MicrometerProducerListener<>(meterRegistry);
  }
}

6. Testing Strategy

@SpringBootTest
@EmbeddedKafka(
  partitions = 3,
  topics = {"order-events", "payment-events"},
  brokerProperties = {
    "listeners=PLAINTEXT://localhost:9092",
    "port=9092"
  }
)
public class OrderEventTest {
  
  @Autowired
  private EmbeddedKafkaBroker embeddedKafka;
  
  @Autowired
  private KafkaTemplate<String, Object> kafkaTemplate;
  
  @Test
  public void testOrderEventProcessing() {
    // Given
    OrderCreated event = createTestOrderEvent();
    
    // When
    kafkaTemplate.send("order-events", event.getAggregateId(), event);
    
    // Then
    ConsumerRecord<String, Object> record = 
      KafkaTestUtils.getSingleRecord(
        consumer, 
        "order-events", 
        5000
      );
    
    assertNotNull(record);
    assertEquals(event.getAggregateId(), record.key());
    
    // Verify side effects
    verify(orderRepository, times(1)).save(any(Order.class));
  }
}

Key Production Considerations

  1. Schema Evolution: Always use backward compatible schema changes
  2. Idempotency: Design consumers to handle duplicate events
  3. DLQ Strategy: Implement automatic retry and manual intervention for DLQ
  4. Monitoring: Track consumer lag, error rates, and throughput
  5. Security: Implement SSL, SASL, and ACLs for production
  6. Disaster Recovery: Regular backups and cross-cluster replication

Building production-ready event-driven systems requires careful planning around schema design, error handling, monitoring, and testing. Start with simple patterns and evolve as your system grows.