Skip to content

Event-Driven Architecture with Apache Kafka: A Comprehensive Guide

Apache Kafka has emerged as the de facto standard for building event-driven architectures at scale. Originally developed at LinkedIn and now maintained by the Apache Software Foundation, Kafka provides a distributed streaming platform that enables real-time data processing, event sourcing, and microservices communication. This comprehensive guide explores Kafka’s fundamentals and demonstrates practical implementations for modern distributed systems.

In this article, we’ll dive deep into Kafka’s architecture, explore real-world patterns like CQRS and event sourcing, and provide production-ready code examples in both Java and Node.js.


Kafka Fundamentals

Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant messaging. At its core, Kafka acts as a distributed commit log where events are stored durably and can be consumed by multiple applications.

Core Components

  1. Brokers: Kafka servers that store and manage events
  2. Topics: Logical channels for organizing events
  3. Partitions: Physical divisions of topics for parallelism
  4. Producers: Applications that publish events
  5. Consumers: Applications that subscribe to events
  6. ZooKeeper/KRaft: Coordination service for cluster management

Kafka Architecture Overview

graph TB
    subgraph "Kafka Cluster"
        Broker1[Broker 1]
        Broker2[Broker 2] 
        Broker3[Broker 3]
        
        subgraph "Topic: Orders"
            P1[Partition 0<br/>Leader: Broker 1]
            P2[Partition 1<br/>Leader: Broker 2]
            P3[Partition 2<br/>Leader: Broker 3]
        end
    end
    
    subgraph "Producers"
        OrderService[Order Service]
        PaymentService[Payment Service]
        InventoryService[Inventory Service]
    end
    
    subgraph "Consumer Groups"
        CG1[Email Service<br/>Group: notifications]
        CG2[Analytics Service<br/>Group: analytics]
        CG3[Audit Service<br/>Group: audit]
    end
    
    ZooKeeper[(ZooKeeper/<br/>KRaft)]
    
    OrderService --> P1
    PaymentService --> P2
    InventoryService --> P3
    
    P1 --> CG1
    P2 --> CG1
    P3 --> CG1
    
    P1 --> CG2
    P2 --> CG2
    P3 --> CG2
    
    P1 --> CG3
    P2 --> CG3
    P3 --> CG3
    
    ZooKeeper --> Broker1
    ZooKeeper --> Broker2
    ZooKeeper --> Broker3
    
    style P1 fill:#e1f5fe
    style P2 fill:#f3e5f5
    style P3 fill:#fff3e0
    style ZooKeeper fill:#e8f5e8

Topics and Partitions

Topics are the fundamental unit of organization in Kafka. Each topic is divided into partitions, which are ordered, immutable sequences of records.

// @filename: main.py
// Creating a topic with Java Admin API

public class TopicManager {
    public static void createTopic(String topicName, int partitions, short replicationFactor) {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (AdminClient adminClient = AdminClient.create(props)) {
            NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);

            // Configure topic settings
            Map<String, String> configs = new HashMap<>();
            configs.put("retention.ms", "604800000"); // 7 days
            configs.put("segment.ms", "86400000"); // 1 day
            configs.put("compression.type", "snappy");
            newTopic.configs(configs);

            CreateTopicsResult result = adminClient.createTopics(
                Collections.singleton(newTopic)
            );

            result.all().get();
            System.out.println("Topic created successfully: " + topicName);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Message Keys and Partitioning

Kafka uses message keys to determine partition assignment. Messages with the same key always go to the same partition, ensuring order preservation for related events.

// @filename: index.js
// Node.js producer with custom partitioner
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092'],
})

const producer = kafka.producer({
  createPartitioner: () => {
    return ({ topic, partitionMetadata, message }) => {
      // Custom partitioning logic based on customer ID
      if (message.key) {
        const customerId = message.key.toString()
        const numPartitions = partitionMetadata.length
        const hash = customerId.split('').reduce((a, b) => {
          return (a << 5) - a + b.charCodeAt(0)
        }, 0)
        return Math.abs(hash) % numPartitions
      }
      // Round-robin for messages without keys
      return Math.floor(Math.random() * partitionMetadata.length)
    }
  },
})

async function publishOrder(order) {
  await producer.connect()

  try {
    await producer.send({
      topic: 'orders',
      messages: [
        {
          key: order.customerId,
          value: JSON.stringify(order),
          headers: {
            'correlation-id': order.orderId,
            'event-type': 'order.created',
          },
        },
      ],
    })
  } finally {
    await producer.disconnect()
  }
}

Producers and Consumers

Producer Configuration and Best Practices

Producers are responsible for publishing events to Kafka topics. Proper configuration ensures reliability and performance.

// @filename: main.py
// Java producer with advanced configuration

public class EventProducer {
    private final KafkaProducer<String, String> producer;

    public EventProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // Reliability settings
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Performance settings
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB

        this.producer = new KafkaProducer<>(props);
    }

    public void publishEvent(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.err.println("Failed to send message: " + exception.getMessage());
                } else {
                    System.out.printf("Sent message to topic=%s partition=%d offset=%d%n",
                        metadata.topic(), metadata.partition(), metadata.offset());
                }
            }
        });
    }

    public void close() {
        producer.close();
    }
}

Consumer Groups and Offset Management

Consumers work together in consumer groups to process messages from topics. Each partition is assigned to only one consumer within a group.

// @filename: index.js
// Node.js consumer with manual offset management
const { Kafka } = require('kafkajs')

class EventConsumer {
  constructor(groupId) {
    this.kafka = new Kafka({
      clientId: 'event-processor',
      brokers: ['localhost:9092'],
    })

    this.consumer = this.kafka.consumer({
      groupId: groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
      maxBytesPerPartition: 1048576, // 1MB
      retry: {
        initialRetryTime: 100,
        retries: 8,
      },
    })
  }

  async start(topics, handler) {
    await this.consumer.connect()
    await this.consumer.subscribe({
      topics,
      fromBeginning: false,
    })

    await this.consumer.run({
      autoCommit: false, // Manual commit for better control
      eachMessage: async ({ topic, partition, message }) => {
        try {
          const event = {
            key: message.key?.toString(),
            value: JSON.parse(message.value.toString()),
            headers: message.headers,
            timestamp: message.timestamp,
            offset: message.offset,
            partition: partition,
          }

          // Process the event
          await handler(event)

          // Commit offset after successful processing
          await this.consumer.commitOffsets([
            {
              topic: topic,
              partition: partition,
              offset: (parseInt(message.offset) + 1).toString(),
            },
          ])
        } catch (error) {
          console.error('Error processing message:', error)
          // Implement retry logic or send to DLQ
        }
      },
    })
  }

  async stop() {
    await this.consumer.disconnect()
  }
}

// Usage example
const consumer = new EventConsumer('order-processing-group')
consumer.start(['orders'], async (event) => {
  console.log('Processing order:', event.value)
  // Process order logic here
})

Microservices Communication with Kafka

Kafka excels at enabling asynchronous communication between microservices, providing loose coupling and scalability.

Event-Driven Choreography Pattern

// @filename: Main.java
// Order Service - Publishing domain events
@Service
public class OrderService {
    private final EventProducer eventProducer;

    @Transactional
    public Order createOrder(CreateOrderRequest request) {
        // Create order in database
        Order order = orderRepository.save(new Order(request));

        // Publish order created event
        OrderCreatedEvent event = OrderCreatedEvent.builder()
            .orderId(order.getId())
            .customerId(order.getCustomerId())
            .items(order.getItems())
            .totalAmount(order.getTotalAmount())
            .timestamp(Instant.now())
            .build();

        eventProducer.publish("order-events", order.getId(), event);

        return order;
    }
}

// Inventory Service - Reacting to order events
@Component
public class InventoryEventHandler {
    @KafkaListener(topics = "order-events", groupId = "inventory-service")
    public void handleOrderCreated(OrderCreatedEvent event) {
        try {
            // Reserve inventory
            for (OrderItem item : event.getItems()) {
                inventoryService.reserveStock(item.getProductId(), item.getQuantity());
            }

            // Publish inventory reserved event
            InventoryReservedEvent reservedEvent = InventoryReservedEvent.builder()
                .orderId(event.getOrderId())
                .reservationId(UUID.randomUUID().toString())
                .items(event.getItems())
                .build();

            eventProducer.publish("inventory-events", event.getOrderId(), reservedEvent);
        } catch (InsufficientStockException e) {
            // Publish inventory shortage event
            publishInventoryShortageEvent(event.getOrderId(), e.getProductId());
        }
    }
}

Saga Pattern Implementation

The Saga pattern manages distributed transactions by coordinating a series of local transactions across multiple services.

sequenceDiagram
    participant SC as Saga Coordinator
    participant OS as Order Service
    participant IS as Inventory Service
    participant PS as Payment Service
    participant SS as Shipping Service
    participant K as Kafka
    
    Note over SC: Order Saga Started
    SC->>K: Publish SAGA_STARTED
    SC->>OS: Create Order
    OS->>K: Order Created Event
    OS->>SC: Success
    
    SC->>IS: Reserve Inventory
    IS->>K: Inventory Reserved Event
    IS->>SC: Success
    
    SC->>PS: Process Payment
    PS-->>SC: Payment Failed ❌
    PS->>K: Payment Failed Event
    
    Note over SC: Start Compensation
    SC->>IS: Release Inventory (Compensate)
    IS->>K: Inventory Released Event
    IS->>SC: Compensated
    
    SC->>OS: Cancel Order (Compensate)
    OS->>K: Order Cancelled Event
    OS->>SC: Compensated
    
    SC->>K: Publish SAGA_COMPENSATED
    
    Note over SC,K: Alternative: Successful Flow
    Note over SC: If all steps succeed:
    Note over SC: SC publishes SAGA_COMPLETED
// @filename: index.js
// Saga coordinator for distributed transactions
class OrderSaga {
  constructor(producer, consumer) {
    this.producer = producer
    this.consumer = consumer
    this.sagaState = new Map()
  }

  async startOrderSaga(order) {
    const sagaId = uuidv4()

    // Initialize saga state
    this.sagaState.set(sagaId, {
      orderId: order.id,
      status: 'STARTED',
      completedSteps: [],
      compensatedSteps: [],
    })

    // Start the saga
    await this.producer.send({
      topic: 'order-saga-events',
      messages: [
        {
          key: sagaId,
          value: JSON.stringify({
            type: 'SAGA_STARTED',
            sagaId: sagaId,
            orderId: order.id,
            steps: ['RESERVE_INVENTORY', 'PROCESS_PAYMENT', 'CREATE_SHIPMENT'],
          }),
        },
      ],
    })
  }

  async handleSagaEvent(event) {
    const saga = this.sagaState.get(event.sagaId)

    switch (event.type) {
      case 'STEP_COMPLETED':
        saga.completedSteps.push(event.step)
        await this.executeNextStep(saga)
        break

      case 'STEP_FAILED':
        saga.status = 'COMPENSATING'
        await this.startCompensation(saga, event.failedStep)
        break

      case 'COMPENSATION_COMPLETED':
        saga.compensatedSteps.push(event.step)
        await this.continueCompensation(saga)
        break
    }
  }

  async executeNextStep(saga) {
    const nextStep = this.getNextStep(saga)

    if (!nextStep) {
      // All steps completed successfully
      saga.status = 'COMPLETED'
      await this.publishSagaCompleted(saga)
      return
    }

    // Execute next step
    await this.producer.send({
      topic: this.getTopicForStep(nextStep),
      messages: [
        {
          key: saga.orderId,
          value: JSON.stringify({
            sagaId: saga.sagaId,
            orderId: saga.orderId,
            action: 'EXECUTE',
            step: nextStep,
          }),
        },
      ],
    })
  }
}

CQRS and Event Sourcing

Implementing CQRS with Kafka

Command Query Responsibility Segregation (CQRS) separates read and write models, with Kafka as the event backbone.

// @filename: Main.java
// Command side - Write model
@RestController
@RequestMapping("/api/products")
public class ProductCommandController {
    private final KafkaProducer<String, ProductCommand> commandProducer;

    @PostMapping
    public ResponseEntity<Void> createProduct(@RequestBody CreateProductCommand command) {
        command.setCommandId(UUID.randomUUID().toString());
        command.setTimestamp(Instant.now());

        ProducerRecord<String, ProductCommand> record =
            new ProducerRecord<>("product-commands", command.getProductId(), command);

        commandProducer.send(record);

        return ResponseEntity.accepted()
            .header("X-Command-Id", command.getCommandId())
            .build();
    }
}

// Command processor
@Component
public class ProductCommandProcessor {
    @KafkaListener(topics = "product-commands")
    public void processCommand(ProductCommand command) {
        switch (command.getType()) {
            case CREATE:
                Product product = new Product(command);
                productRepository.save(product);
                publishProductCreatedEvent(product);
                break;
            case UPDATE:
                updateProduct(command);
                break;
            case DELETE:
                deleteProduct(command);
                break;
        }
    }

    private void publishProductCreatedEvent(Product product) {
        ProductCreatedEvent event = new ProductCreatedEvent(
            product.getId(),
            product.getName(),
            product.getPrice(),
            product.getDescription()
        );

        eventProducer.send("product-events", product.getId(), event);
    }
}

// Query side - Read model projection
@Component
public class ProductProjection {
    private final ProductViewRepository viewRepository;

    @KafkaListener(topics = "product-events")
    public void handleProductEvent(ProductEvent event) {
        switch (event.getType()) {
            case "ProductCreated":
                createProductView((ProductCreatedEvent) event);
                break;
            case "ProductUpdated":
                updateProductView((ProductUpdatedEvent) event);
                break;
            case "ProductDeleted":
                deleteProductView((ProductDeletedEvent) event);
                break;
        }
    }

    private void createProductView(ProductCreatedEvent event) {
        ProductView view = ProductView.builder()
            .id(event.getProductId())
            .name(event.getName())
            .price(event.getPrice())
            .description(event.getDescription())
            .lastUpdated(event.getTimestamp())
            .build();

        viewRepository.save(view);
    }
}

Event Sourcing Implementation

// @filename: index.js
// Event store using Kafka
class EventStore {
  constructor(kafka) {
    this.producer = kafka.producer()
    this.admin = kafka.admin()
  }

  async saveEvents(aggregateId, events, expectedVersion) {
    const messages = events.map((event, index) => ({
      key: aggregateId,
      value: JSON.stringify({
        aggregateId: aggregateId,
        eventType: event.constructor.name,
        eventData: event,
        eventVersion: expectedVersion + index + 1,
        timestamp: new Date().toISOString(),
      }),
      headers: {
        'aggregate-type': event.aggregateType,
        'event-version': (expectedVersion + index + 1).toString(),
      },
    }))

    await this.producer.send({
      topic: 'event-store',
      messages: messages,
      acks: -1, // Wait for all in-sync replicas
    })
  }

  async loadEvents(aggregateId, fromVersion = 0) {
    const consumer = this.kafka.consumer({
      groupId: `event-store-reader-${Date.now()}`,
    })

    await consumer.connect()
    await consumer.subscribe({
      topic: 'event-store',
      fromBeginning: true,
    })

    const events = []
    let resolvePromise
    const eventsPromise = new Promise((resolve) => (resolvePromise = resolve))

    await consumer.run({
      eachMessage: async ({ message }) => {
        const event = JSON.parse(message.value.toString())

        if (
          event.aggregateId === aggregateId &&
          event.eventVersion > fromVersion
        ) {
          events.push(event)
        }

        // Check if we've reached the end
        if (message.offset === message.highWaterOffset - 1) {
          resolvePromise()
        }
      },
    })

    await eventsPromise
    await consumer.disconnect()

    return events.sort((a, b) => a.eventVersion - b.eventVersion)
  }
}

// Aggregate implementation
class OrderAggregate {
  constructor(id) {
    this.id = id
    this.version = 0
    this.uncommittedEvents = []
    this.status = 'PENDING'
    this.items = []
    this.totalAmount = 0
  }

  static async load(id, eventStore) {
    const aggregate = new OrderAggregate(id)
    const events = await eventStore.loadEvents(id)

    for (const event of events) {
      aggregate.applyEvent(event.eventData, false)
      aggregate.version = event.eventVersion
    }

    return aggregate
  }

  createOrder(customerId, items) {
    if (this.status !== 'PENDING') {
      throw new Error('Order already created')
    }

    const event = {
      aggregateType: 'Order',
      customerId: customerId,
      items: items,
      totalAmount: items.reduce(
        (sum, item) => sum + item.price * item.quantity,
        0
      ),
    }

    this.applyEvent(event, true)
  }

  applyEvent(event, isNew = true) {
    switch (event.constructor.name || event.aggregateType) {
      case 'OrderCreated':
        this.status = 'CREATED'
        this.customerId = event.customerId
        this.items = event.items
        this.totalAmount = event.totalAmount
        break
      // Handle other events...
    }

    if (isNew) {
      this.uncommittedEvents.push(event)
    }
  }

  async save(eventStore) {
    if (this.uncommittedEvents.length === 0) return

    await eventStore.saveEvents(this.id, this.uncommittedEvents, this.version)
    this.version += this.uncommittedEvents.length
    this.uncommittedEvents = []
  }
}

Kafka Streams

Kafka Streams provides a powerful library for building streaming applications that process data in real-time.

Stream Processing Fundamentals

// @filename: Main.java
// Real-time order analytics with Kafka Streams
public class OrderAnalyticsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // Stream of orders
        KStream<String, Order> orders = builder.stream("orders",
            Consumed.with(Serdes.String(), orderSerde));

        // Calculate order totals by customer
        KTable<String, Double> customerTotals = orders
            .groupBy((key, order) -> order.getCustomerId())
            .aggregate(
                () -> 0.0,
                (customerId, order, total) -> total + order.getTotalAmount(),
                Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-totals")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(Serdes.Double())
            );

        // Windowed aggregations - Orders per hour
        KTable<Windowed<String>, Long> ordersPerHour = orders
            .groupBy((key, order) -> "all")
            .windowedBy(TimeWindows.of(Duration.ofHours(1)))
            .count(Materialized.as("orders-per-hour"));

        // Join with customer data
        KTable<String, Customer> customers = builder.table("customers",
            Consumed.with(Serdes.String(), customerSerde));

        KStream<String, EnrichedOrder> enrichedOrders = orders
            .join(customers,
                (order, customer) -> new EnrichedOrder(order, customer),
                Joined.with(Serdes.String(), orderSerde, customerSerde)
            );

        // Output enriched orders
        enrichedOrders.to("enriched-orders",
            Produced.with(Serdes.String(), enrichedOrderSerde));

        // Build and start the streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Stateful Stream Processing

// @filename: Main.java
// Fraud detection with stateful processing
public class FraudDetectionProcessor {
    public static Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();

        // Transaction stream
        KStream<String, Transaction> transactions = builder.stream("transactions");

        // State store for tracking user behavior
        StoreBuilder<KeyValueStore<String, UserProfile>> storeBuilder =
            Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("user-profiles"),
                Serdes.String(),
                userProfileSerde
            );
        builder.addStateStore(storeBuilder);

        // Process transactions with state
        KStream<String, FraudAlert> fraudAlerts = transactions
            .transform(() -> new FraudDetectionTransformer(), "user-profiles");

        fraudAlerts.to("fraud-alerts");

        return builder.build();
    }

    static class FraudDetectionTransformer
        implements Transformer<String, Transaction, KeyValue<String, FraudAlert>> {

        private KeyValueStore<String, UserProfile> stateStore;
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            this.stateStore = (KeyValueStore<String, UserProfile>)
                context.getStateStore("user-profiles");
        }

        @Override
        public KeyValue<String, FraudAlert> transform(String key, Transaction transaction) {
            String userId = transaction.getUserId();
            UserProfile profile = stateStore.get(userId);

            if (profile == null) {
                profile = new UserProfile(userId);
            }

            // Update profile with new transaction
            profile.addTransaction(transaction);

            // Check for fraud patterns
            if (isSuspicious(transaction, profile)) {
                FraudAlert alert = FraudAlert.builder()
                    .userId(userId)
                    .transactionId(transaction.getId())
                    .reason(detectFraudPattern(transaction, profile))
                    .riskScore(calculateRiskScore(transaction, profile))
                    .timestamp(context.timestamp())
                    .build();

                // Update state
                stateStore.put(userId, profile);

                return KeyValue.pair(userId, alert);
            }

            // Update state even if no fraud detected
            stateStore.put(userId, profile);
            return null; // No alert
        }

        private boolean isSuspicious(Transaction transaction, UserProfile profile) {
            // Velocity check - too many transactions in short time
            if (profile.getTransactionCount(Duration.ofMinutes(5)) > 5) {
                return true;
            }

            // Location anomaly
            if (profile.getLastLocation() != null &&
                calculateDistance(profile.getLastLocation(), transaction.getLocation()) > 1000) {
                return true;
            }

            // Amount anomaly
            if (transaction.getAmount() > profile.getAverageAmount() * 3) {
                return true;
            }

            return false;
        }
    }
}

Kafka Connect

Kafka Connect provides a framework for streaming data between Kafka and external systems.

Source Connector Configuration

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname": "orders_db",
    "database.server.name": "postgres",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "publication.name": "orders_publication",
    "slot.name": "orders_slot",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "unwrap,route",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "postgres.public.(.*)",
    "transforms.route.replacement": "$1-events"
  }
}

Custom Sink Connector

// @filename: Main.java
// Custom Elasticsearch sink connector
public class ElasticsearchSinkConnector extends SinkConnector {
    private Map<String, String> configProps;

    @Override
    public void start(Map<String, String> props) {
        this.configProps = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return ElasticsearchSinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> configs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            configs.add(configProps);
        }
        return configs;
    }
}

public class ElasticsearchSinkTask extends SinkTask {
    private RestHighLevelClient esClient;

    @Override
    public void start(Map<String, String> props) {
        String esHosts = props.get("elasticsearch.hosts");
        this.esClient = new RestHighLevelClient(
            RestClient.builder(HttpHost.create(esHosts))
        );
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        BulkRequest bulkRequest = new BulkRequest();

        for (SinkRecord record : records) {
            String index = getIndex(record);
            String id = record.key().toString();
            String document = record.value().toString();

            IndexRequest indexRequest = new IndexRequest(index)
                .id(id)
                .source(document, XContentType.JSON);

            bulkRequest.add(indexRequest);
        }

        try {
            BulkResponse response = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (response.hasFailures()) {
                throw new ConnectException("Failed to index documents: " +
                    response.buildFailureMessage());
            }
        } catch (IOException e) {
            throw new ConnectException("Error indexing to Elasticsearch", e);
        }
    }
}

ksqlDB for Stream Processing

ksqlDB provides a SQL interface for stream processing on top of Kafka.

Creating Streams and Tables

-- Create a stream from Kafka topic
CREATE STREAM orders_stream (
  order_id VARCHAR KEY,
  customer_id VARCHAR,
  product_id VARCHAR,
  quantity INT,
  price DOUBLE,
  order_timestamp TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'orders',
  VALUE_FORMAT = 'JSON',
  TIMESTAMP = 'order_timestamp'
);

-- Create a table for customer data
CREATE TABLE customers_table (
  customer_id VARCHAR PRIMARY KEY,
  name VARCHAR,
  email VARCHAR,
  tier VARCHAR,
  total_spent DOUBLE
) WITH (
  KAFKA_TOPIC = 'customers',
  VALUE_FORMAT = 'JSON'
);

-- Join stream with table
CREATE STREAM enriched_orders AS
  SELECT
    o.order_id,
    o.customer_id,
    c.name AS customer_name,
    c.tier AS customer_tier,
    o.product_id,
    o.quantity,
    o.price,
    o.quantity * o.price AS total_amount
  FROM orders_stream o
  INNER JOIN customers_table c
    ON o.customer_id = c.customer_id
  EMIT CHANGES;

-- Windowed aggregation
CREATE TABLE orders_per_hour AS
  SELECT
    WINDOWSTART AS window_start,
    WINDOWEND AS window_end,
    COUNT(*) AS order_count,
    SUM(quantity * price) AS total_revenue
  FROM orders_stream
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY WINDOWSTART, WINDOWEND
  EMIT CHANGES;

-- Real-time anomaly detection
CREATE STREAM high_value_orders AS
  SELECT * FROM enriched_orders
  WHERE total_amount > 1000
    AND customer_tier != 'PREMIUM'
  EMIT CHANGES;

Push Queries for Real-time Updates

// @filename: query.sql
// Node.js client for ksqlDB push queries
const axios = require('axios')

class KsqlDBClient {
  constructor(host = 'http://localhost:8088') {
    this.host = host
  }

  async pushQuery(query) {
    const response = await axios({
      method: 'post',
      url: `${this.host}/query-stream`,
      headers: {
        'Content-Type': 'application/vnd.ksql.v1+json',
      },
      data: {
        sql: query,
        properties: {
          'auto.offset.reset': 'latest',
        },
      },
      responseType: 'stream',
    })

    return response.data
  }

  async subscribeToHighValueOrders(callback) {
    const stream = await this.pushQuery(
      'SELECT * FROM high_value_orders EMIT CHANGES;'
    )

    let buffer = ''
    stream.on('data', (chunk) => {
      buffer += chunk.toString()
      const lines = buffer.split('\n')
      buffer = lines.pop() // Keep incomplete line in buffer

      for (const line of lines) {
        if (line.trim()) {
          try {
            const data = JSON.parse(line)
            callback(data)
          } catch (e) {
            console.error('Failed to parse:', line)
          }
        }
      }
    })
  }
}

// Usage
const ksqlClient = new KsqlDBClient()
ksqlClient.subscribeToHighValueOrders((order) => {
  console.log('High value order detected:', order)
  // Send notification or trigger workflow
})

Production Best Practices

Cluster Configuration

# Kafka broker configuration for production
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kafka-1.example.com:9092,SSL://kafka-1.example.com:9093

# Replication and durability
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

# Performance tuning
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Log configuration
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
compression.type=snappy

# Transaction support
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

Security Configuration

// @filename: Main.java
// SSL/TLS configuration for producers
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1.example.com:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/kafka/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
props.put("ssl.keystore.location", "/var/kafka/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");

// SASL authentication
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"producer-user\" " +
    "password=\"producer-password\";");

Monitoring and Alerting

// @filename: server.js
// Prometheus metrics exporter
const { register } = require('prom-client')
const express = require('express')

// Kafka metrics
const kafkaMessagesProduced = new Counter({
  name: 'kafka_messages_produced_total',
  help: 'Total number of messages produced to Kafka',
  labelNames: ['topic', 'status'],
})

const kafkaProducerLatency = new Histogram({
  name: 'kafka_producer_latency_seconds',
  help: 'Kafka producer latency in seconds',
  labelNames: ['topic'],
  buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5],
})

const kafkaConsumerLag = new Gauge({
  name: 'kafka_consumer_lag',
  help: 'Kafka consumer lag in messages',
  labelNames: ['topic', 'partition', 'consumer_group'],
})

// Monitoring wrapper for producer
class MonitoredProducer {
  constructor(kafkaProducer) {
    this.producer = kafkaProducer
  }

  async send(topic, messages) {
    const start = Date.now()

    try {
      const result = await this.producer.send({
        topic,
        messages,
      })

      kafkaMessagesProduced.inc({ topic, status: 'success' }, messages.length)
      kafkaProducerLatency.observe({ topic }, (Date.now() - start) / 1000)

      return result
    } catch (error) {
      kafkaMessagesProduced.inc({ topic, status: 'error' }, messages.length)
      throw error
    }
  }
}

// Expose metrics endpoint
const app = express()
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType)
  res.end(await register.metrics())
})

Performance Tuning Guidelines

  1. Producer Optimization

    • Batch messages with linger.ms and batch.size
    • Use compression (snappy or lz4)
    • Enable idempotence for exactly-once semantics
    • Tune buffer.memory and max.block.ms
  2. Consumer Optimization

    • Increase fetch.min.bytes for better batching
    • Tune max.poll.records based on processing time
    • Use parallel processing with consumer groups
    • Consider manual offset management for critical applications
  3. Broker Optimization

    • Use dedicated disks for Kafka logs
    • Enable OS page cache (leave 50% RAM for cache)
    • Tune JVM heap size (typically 4-8GB)
    • Monitor and balance partition leadership
  4. Network Optimization

    • Place Kafka brokers close to producers/consumers
    • Use rack awareness for multi-datacenter deployments
    • Enable compression at producer level
    • Monitor network utilization

Conclusion

Apache Kafka provides a robust foundation for building event-driven architectures at scale. Its distributed nature, high throughput, and rich ecosystem make it ideal for modern microservices architectures, real-time analytics, and event sourcing implementations.

Key takeaways:

  • Design topics and partitions based on your scalability requirements
  • Use appropriate serialization formats and schemas
  • Implement proper error handling and monitoring
  • Leverage Kafka Streams and ksqlDB for stream processing
  • Follow security best practices in production
  • Monitor consumer lag and broker health continuously

As organizations continue to embrace event-driven patterns, Kafka remains a critical component for building resilient, scalable, and real-time data pipelines. Whether you’re implementing CQRS, event sourcing, or simple pub-sub messaging, Kafka’s flexibility and performance make it an excellent choice for modern distributed systems.

Microservices Architecture Distributed Systems Production Best Practices
Share:

Continue Reading

Building Microservices with gRPC and Protocol Buffers

Learn how to build high-performance microservices using gRPC and Protocol Buffers. This comprehensive guide covers service definition, implementation patterns, streaming, error handling, and deployment strategies for building robust and efficient microservice architectures.

Read article
MicroservicesArchitectureDistributed Systems