Event-driven architecture (EDA) enables loose coupling, scalability, and real-time processing. This guide covers production-ready patterns using Apache Kafka and Spring Boot 3.
1. Event Schema Design
Proper event design is critical for evolvable systems:
Use Avro with Schema Registry
// order.avsc - Avro schema
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.dilendra.events",
"fields": [
{
"name": "eventId",
"type": "string"
},
{
"name": "aggregateId",
"type": "string"
},
{
"name": "eventType",
"type": {
"type": "enum",
"name": "EventType",
"symbols": ["ORDER_CREATED", "ORDER_UPDATED", "ORDER_CANCELLED"]
}
},
{
"name": "payload",
"type": {
"type": "record",
"name": "OrderPayload",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "items", "type": {"type": "array", "items": "string"}}
]
}
},
{
"name": "metadata",
"type": {
"type": "record",
"name": "EventMetadata",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "source", "type": "string"},
{"name": "correlationId", "type": "string"}
]
}
}
]
}
2. Producer Implementation
@Component
public class EventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
public EventPublisher(KafkaTemplate<String, Object> kafkaTemplate,
ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
}
@Transactional
public CompletableFuture<SendResult<String, Object>> publish(
String topic, DomainEvent event) {
// Add metadata
event.getMetadata().setTimestamp(System.currentTimeMillis());
event.getMetadata().setSource("order-service");
event.getMetadata().setCorrelationId(MDC.get("correlationId"));
// Create producer record with headers
ProducerRecord<String, Object> record = new ProducerRecord<>(
topic,
event.getAggregateId(),
event
);
// Add tracing headers
record.headers().add("traceId",
MDC.get("traceId").getBytes(StandardCharsets.UTF_8));
record.headers().add("spanId",
MDC.get("spanId").getBytes(StandardCharsets.UTF_8));
return kafkaTemplate.send(record)
.completable()
.exceptionally(ex -> {
log.error("Failed to publish event to topic: {}", topic, ex);
// Implement dead letter queue logic
publishToDLQ(topic, event, ex);
throw new EventPublishingException("Failed to publish event", ex);
});
}
private void publishToDLQ(String originalTopic,
DomainEvent event,
Throwable cause) {
DLQEvent dlqEvent = new DLQEvent(originalTopic, event, cause);
kafkaTemplate.send("dead-letter-queue", dlqEvent);
}
}
3. Consumer Implementation with Error Handling
@Component
@Slf4j
public class OrderEventConsumer {
private final RetryTemplate retryTemplate;
private final DeadLetterPublishingRecoverer dlqRecoverer;
public OrderEventConsumer(KafkaTemplate<String, Object> kafkaTemplate) {
this.retryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000, 2, 10000)
.retryOn(DataAccessException.class)
.retryOn(TransientDataAccessException.class)
.build();
this.dlqRecoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, ex) -> new TopicPartition("order-events-dlq",
record.partition())
);
}
@KafkaListener(
topics = "order-events",
groupId = "order-processor-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void consume(ConsumerRecord<String, OrderCreated> record,
Acknowledgment acknowledgment) {
try {
// Process with retry
retryTemplate.execute(context -> {
processOrderEvent(record.value());
return null;
});
// Manual acknowledgment
acknowledgment.acknowledge();
} catch (Exception ex) {
log.error("Failed to process order event after retries", ex);
// Send to DLQ
dlqRecoverer.accept(record, ex);
// Acknowledge to prevent reprocessing
acknowledgment.acknowledge();
}
}
private void processOrderEvent(OrderCreated event) {
// Business logic here
log.info("Processing order: {}", event.getPayload().getOrderId());
// Validate event
validateEvent(event);
// Update read model
updateOrderProjection(event);
// Trigger downstream actions
triggerDownstreamActions(event);
}
}
4. Kafka Configuration for Production
# application-production.yml
spring:
kafka:
bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
acks: all
retries: 10
enable.idempotence: true
max.in.flight.requests.per.connection: 5
compression.type: snappy
linger.ms: 5
batch.size: 16384
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
specific.avro.reader: true
auto.offset.reset: latest
enable.auto.commit: false
isolation.level: read_committed
fetch.min.bytes: 1
fetch.max.wait.ms: 500
max.partition.fetch.bytes: 1048576
listener:
ack-mode: manual_immediate
concurrency: 3
poll-timeout: 5000
idle-between-polls: 5000
5. Monitoring and Observability
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterRegistryCustomizer<MeterRegistry> kafkaMetrics() {
return registry -> {
// Consumer metrics
new KafkaConsumerMetrics().bindTo(registry);
// Producer metrics
new KafkaProducerMetrics().bindTo(registry);
// Custom metrics
Metrics.addRegistry(registry);
};
}
@Bean
public MicrometerConsumerListener<String, Object> consumerListener(
MeterRegistry meterRegistry) {
return new MicrometerConsumerListener<>(meterRegistry);
}
@Bean
public MicrometerProducerListener<String, Object> producerListener(
MeterRegistry meterRegistry) {
return new MicrometerProducerListener<>(meterRegistry);
}
}
6. Testing Strategy
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = {"order-events", "payment-events"},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092"
}
)
public class OrderEventTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Test
public void testOrderEventProcessing() {
// Given
OrderCreated event = createTestOrderEvent();
// When
kafkaTemplate.send("order-events", event.getAggregateId(), event);
// Then
ConsumerRecord<String, Object> record =
KafkaTestUtils.getSingleRecord(
consumer,
"order-events",
5000
);
assertNotNull(record);
assertEquals(event.getAggregateId(), record.key());
// Verify side effects
verify(orderRepository, times(1)).save(any(Order.class));
}
}
Key Production Considerations
- Schema Evolution: Always use backward compatible schema changes
- Idempotency: Design consumers to handle duplicate events
- DLQ Strategy: Implement automatic retry and manual intervention for DLQ
- Monitoring: Track consumer lag, error rates, and throughput
- Security: Implement SSL, SASL, and ACLs for production
- Disaster Recovery: Regular backups and cross-cluster replication
Building production-ready event-driven systems requires careful planning around schema design, error handling, monitoring, and testing. Start with simple patterns and evolve as your system grows.