Apache Kafka and Saga Pattern in Spring Boot Microservices

Apache Kafka and Saga Pattern in Spring Boot Microservices

Apache Kafka and Saga Pattern in Sping Boot Microservices

In this article, we’ll explore event-driven integration using Apache Kafka to facilitate seamless and efficient communication among microservices. We’ll have a hands-on understanding of implementing asynchronous, event-driven communication using Apache Kafka and Saga Pattern within Spring Boot microservices.

1. Introduction

1.1 What is Apache Kafka?

Apache Kafka is a distributed event streaming platform that acts as a message broker, enabling the publish-subscribe model. It is renowned for its high throughput, scalability, and ability to support real-time processing.

Key Components of Apache Kafka

  • Producer: The service producing and sending messages to Kafka topics.
  • Consumer: The service consuming messages from Kafka topics.
  • Topic: A channel to which producers send messages and from which consumers receive messages.
  • Broker: A Kafka server that stores data and serves client requests.
  • ZooKeeper: Coordinates and manages the brokers. Kafka uses it to manage distributed brokers.
  • Partition: Topics are split into partitions, which are the basic unit of parallelism in Kafka.
  • Offset: A unique ID of a record within a partition, allowing Kafka to maintain the order of messages.

1.2 Saga Pattern

We will also dive into Saga Transactions, an architectural pattern that provides an elegant approach to implementing a transaction that spans multiple services and occurs asynchronously. In the Saga pattern, each service carries out its local transaction and broadcasts an event to notify others. Subsequent services listen to these events and execute their local transactions. If a transaction encounters an issue, the Saga triggers compensating transactions to reverse the effects of the previous ones. By orchestrating local transactions across microservices, Saga Patterns provides a dependable mechanism for maintaining data consistency and ensuring reliability.

1.3 Case Study

We’ll illustrate this concept through a practical scenario where the placement of an order triggers an update in product stock. A user will initiate an order creation through the order-service. The order-service publishes an order event to the Kafka topic called stock-update-topic. The product-service subscribes to the corresponding Kafka topic and consumes the event. Upon receiving the event, the product-service take action by updating the stock levels of the products based on the quantity ordered.

2. Setting Up the Development Environment

Let’s begin with setting up the Apache Kafka. We can refer to the official Apache Kafka documentation for installation and setup instructions.

2.1. Set Up Apache Kafka:

2.1.1 Firstly, download the latest Kafka release and extract it:

tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0

2.1.2 Follow by starting the Apache Kafka using ZooKeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

2.1.3 Finally, open another terminal session and run:

bin/kafka-server-start.sh config/server.properties

2.2 Adding Necessary Dependencies

Add the Kafka dependency to the pom.xml file of both the order and product services.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.3 Configure Kafka’s Properties

In the application.properties file of both microservices, add the Kafka bootstrap servers property:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.wynnteo.shareddto

The spring.kafka.bootstrap-servers property specifies the Kafka broker’s address. Additionally, the serializer properties specify how messages are serialized for the key and value.

spring.kafka.producer.key-serializer uses to configure the serializer for the message keys when we’re producing messages. In this case, it’s set to org.apache.kafka.common.serialization.StringSerializer, which means that the keys of the messages we send to Kafka will be serialized as strings.

On the other hand, the spring.kafka.producer.value-serializer setting is responsible for deciding how our message values get converted into something that can be sent over Kafka. In this case, it’s set to org.springframework.kafka.support.serializer.JsonSerializer. This means that our message values will be serialized as JSON.

Furthermore, the spring.json.trusted.packages is set to com.wynnteo.shareddto. This property is used to specify trusted packages for deserializing JSON values. It restricts deserialization to only those classes from the specified packages. This is an important security feature to prevent deserialization vulnerabilities.

3. Producer – Order Service

Create a Kafka producer that sends messages to a Kafka topic stock-update-topic when create order API is called.

3.1 Kafka Producer Configuration

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, OrderEvent> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, OrderEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

3.2 Create an Order Event Object Class

This object will serialize the order information to the Kafka broker along with the topic. This object should be serializable and avoid nested objects to keep the message simple. We need to ensure that the OrderEvent is in the same package as the producer service. During serialization, we use the fully qualified class name (including the package).

public class OrderEvent {
    private Long orderId;
    private Long productId;
    private int quantity;

    // Getters and setters...
}

3.3 Order Producer Service

In this class, we use the KafkaTemplate bean to send the OrderEvent object to Kafka topic. We need to inject the KafkaTemplate bean in our service class and specify the type of the key and value as String and OrderEvent respectively:

@Service
public class OrderProducerService {
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void sendOrder(@Valid OrderDto order) {
        OrderEvent orderEvent = new OrderEvent();
        orderEvent.setOrderId(order.getId());
        orderEvent.setProductId(order.getProductId());
        orderEvent.setQuantity(order.getQuantity());
        kafkaTemplate.send("stock-update-topic", orderEvent);
    }
}

3.4 Call the Producer Service

When the order is placed, we will call this sendOrder() method to publish the order event.

@RestController
@RequestMapping("/orders")
public class OrderController {
    @Autowired
    private OrderProducerService orderProducerService;
    ...
    @PostMapping
    public ResponseEntity<Map<String, Object>> createOrder(
      @Valid @RequestBody OrderDto order
    ) {
        OrderDto createdOrder = orderService.createOrder(order);
        orderProducerService.sendOrder(order);
        Map<String, Object> response = new HashMap<>();
        response.put("status", "success");
        response.put("statusCode", HttpStatus.CREATED.value());
        response.put("message", ORDER_CREATED_SUCCESS);
        response.put("data", createdOrder);
        return new ResponseEntity<>(response, HttpStatus.CREATED);
    }
    ...
}

4. Consumer – Product Service

Create a Kafka consumer that listens to a Kafka topic stock-update-topic in a product service. Upon receiving the event, this service updates the product’s stock.

4.1 Kafka Consumer Configuration

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,   bootstrapServers);
        configProps.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
          StringDeserializer.class
        );
        configProps.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
          JsonDeserializer.class
       );
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES,   "com.wynnteo.shareddto");
        return new DefaultKafkaConsumerFactory<>(
          configProps,
          new StringDeserializer(),
          new JsonDeserializer<>(OrderEvent.class)
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

4.2 Create an Order Event Object Class

This object will deserialize the information from the Kafka broker. We need to ensure that the OrderEvent is in the same package as the consumer services. During serialization, we use the fully qualified class name (including the package).

public class OrderEvent {
    private Long orderId;
    private Long productId;
    private int quantity;

    // Getters and setters...
}

4.3 Order Consumer Service

In this class, we use the @KafkaListener annotation to mark the consumerOrder() method as a listener for the stock-update-topic topic. The group ID identifies which consumers belong to the same group. Consumers with the same group ID are part of the same consumer group. Kafka automatically distributes partitions of a topic among the consumers in a group. The group ID helps Kafka determine which consumer should consume data from which partition to balance the workload.

@KafkaListener(topics = "stock-update-topic", groupId = "order-group")
public void consumeOrder(OrderEvent orderEvent) {
    Product product = productRepository
      .findById(orderEvent.getProductId())
      .orElseThrow(() -> new ResourceNotFoundException("Product not found"));
    product.setStock(product.getStock() - orderEvent.getQuantity());
    productRepository.save(product);
}

5. Testing with JUnit and Postman

Testing is essential to ensure the quality of our microservices. We’ll write unit tests using JUnit and Mockito to test our Kafka broker thoroughly.

@SpringBootTest
public class OrderProducerServiceTest {
    @InjectMocks
    private OrderProducerService orderProducerService;
    @Mock
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    @Captor
    private ArgumentCaptor<OrderEvent> orderEventCaptor;

    @Test
    public void testSendOrder() {
      OrderDto order = new OrderDto();
      order.setId(123L);
      order.setProductId(456L);
      order.setQuantity(10);

      orderProducerService.sendOrder(order);

      // Verify that kafkaTemplate.send was called with the expected parameters
      Mockito
        .verify(kafkaTemplate)
        .send(Mockito.eq("order-update-topic"), orderEventCaptor.capture());

      OrderEvent capturedOrderEvent = orderEventCaptor.getValue();

      OrderEvent expectedEvent = new OrderEvent();
      expectedEvent.setOrderId(123L);
      expectedEvent.setProductId(456L);
      expectedEvent.setQuantity(10);

      assertEquals(
        expectedEvent.getProductId(),
        capturedOrderEvent.getProductId()
      );
      assertEquals(expectedEvent.getQuantity(), capturedOrderEvent.getQuantity());
    }
}
@SpringBootTest
public class ProductServiceImplTest {

    ...
    @Test
    public void testConsumeOrder() {
        // Create a mock OrderEvent object
        OrderEvent orderEvent = new OrderEvent();
        orderEvent.setProductId(1L);
        orderEvent.setQuantity(2);

        // Create a mock Product object
        Product product = new Product();
        product.setId(1L);
        product.setStock(10);
         when(productRepository.findById(1L)).thenReturn(Optional.of(product));
        productService.consumeOrder(orderEvent);
        verify(productRepository, times(1)).save(product);
        assertEquals(product.getStock(), 8);
    }
}

Next, we can try to invoke the API via Postman. To do so, we need to start both product and order microservices.

We then insert one product record with a stock count set to 50.

image-10 Apache Kafka and Saga Pattern in Spring Boot Microservices

Followed by inserting one order record with the productId set to 1 and quantity set to 10.

image-11 Apache Kafka and Saga Pattern in Spring Boot Microservices

Now, send a GET request to the http://localhost:8081/api/products/1 endpoint to retrieve the product record to see the updated quantity.

image-12 Apache Kafka and Saga Pattern in Spring Boot Microservices

6. Saga Pattern for Reliable Stock Deduction

In a microservices architecture, particularly in scenarios involving distributed systems like managing orders and stock levels, ensuring data consistency is paramount. This involves gracefully handling situations to provide a seamless user experience and maintain data integrity.

image-9 Apache Kafka and Saga Pattern in Spring Boot Microservices

6.1 Modify Order Producer Service

When a customer places an order, the order-service sends a message to a new Kafka topic called order-requested. This message includes the order details and sets the status to PENDING.

public void sendOrder(@Valid OrderDto order) {
    OrderEvent orderEvent = new OrderEvent();
    orderEvent.setOrderId(order.getId());
    orderEvent.setProductId(order.getProductId());
    orderEvent.setQuantity(order.getQuantity());
    orderEvent.setStatus("PENDING"); 

    kafkaTemplate.send("order-requested", orderEvent); 
}

6.2 Create Stock Consumer Service

In the product-service, a StockConsumerService listens to the order-requested topic. Upon receiving a message, it checks the availability of the requested products in stock. Based on the stock status, it sends a message to either order-confirmed or order-rejected topics.

public class StockConsumerService {
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Autowired
    ProductRepository productRepository;

    @KafkaListener(topics = "order-requested", groupId = "order-group")
    public void listenOrderRequested(ConsumerRecord<String, OrderEvent> record) {
        OrderEvent orderEvent = record.value();
        Product product = productRepository
          .findById(orderEvent.getProductId())
          .orElseThrow(() -> new ResourceNotFoundException("Product not found"));
        if (product.getStock() >= orderEvent.getQuantity()) {
            product.setStock(product.getStock() - orderEvent.getQuantity());
            productRepository.save(product);
            // Send order-confirmed message
            orderEvent.setStatus("CONFIRMED");
            kafkaTemplate.send("order-confirmed", orderEvent);
        } else {
            // Send order-rejected message
            orderEvent.setStatus("REJECTED");
            kafkaTemplate.send("order-rejected", orderEvent);
        }
    }
}

6.3 Handle Order Status

Back in the order-service, create one listener class, OrderStatusListener, to monitor the order-confirmed and order-rejected topics, respectively.

public class OrderStatusListener {
    @Autowired
    OrderRepository orderRepository;

    @KafkaListener(topics = "order-confirmed", groupId = "order-group")
    public void listenOrderConfirmed(ConsumerRecord<String, OrderEvent> record) {
        OrderEvent orderEvent = record.value();
        Order order = orderRepository
          .findById(orderEvent.getOrderId())
          .orElseThrow(() -> new ResourceNotFoundException("Order not found"));
        // Update order status to CONFIRMED
        order.setStatus("CONFIRMED");
        orderRepository.save(order);
        // TODO: Notify customer about the order confirmation
    }

    @KafkaListener(topics = "order-rejected", groupId = "order-group")
    public void listenOrderRejected(ConsumerRecord<String, OrderEvent> record) {
        OrderEvent orderEvent = record.value();
        Order order = orderRepository
          .findById(orderEvent.getOrderId())
          .orElseThrow(() -> new ResourceNotFoundException("Order not found"));
        // Update order status to REJECTED
        order.setStatus("REJECTED");
        orderRepository.save(order);
        // TODO: Notify customer about the order rejection
    }
}

7. Testing using Postman

We can invoke the API via Postman.

Firstly, start the product and order microservices. Next, we insert one product record with quantity 50.

Screenshot-2023-10-27-at-5.04.25-PM Apache Kafka and Saga Pattern in Spring Boot Microservices

Next, insert one order record with the productId set to 1 and quantity 10. Now, we retrieve the order using the API http://localhost:8082/api/orders/1

Screenshot-2023-10-27-at-5.39.45-PM Apache Kafka and Saga Pattern in Spring Boot Microservices

Now, send a GET request to the http://localhost:8081/api/products/1 endpoint to retrieve the product record.

Screenshot-2023-10-27-at-5.40.15-PM Apache Kafka and Saga Pattern in Spring Boot Microservices

Next, we insert one order record with the productId set to 1 and quantity 60. Now, we retrieve the order using the API http://localhost:8082/api/orders/2

Screenshot-2023-10-27-at-5.40.37-PM Apache Kafka and Saga Pattern in Spring Boot Microservices

Now, send a GET request to the http://localhost:8081/api/products/1 endpoint to retrieve the product record.

Screenshot-2023-10-27-at-5.40.46-PM Apache Kafka and Saga Pattern in Spring Boot Microservices

8. Conclusion

In this tutorial, we explored Apache Kafka and Saga Pattern. Integrating Apache Kafka and the Saga Pattern in the context of the Order and Product microservices provides a robust and reliable solution for handling complex, asynchronous, and distributed transaction scenarios. Full source code is available on GitHub

Share this content:

Leave a Comment

Discover more from nnyw@tech

Subscribe now to keep reading and get access to the full archive.

Continue reading