Spring Boot Kafka Tutorial

Introduction-

Kafka is an open-source distributed event streaming platform that allows you to publish and subscribe to streams of records. In this tutorial, we'll cover how to use Java with Kafka to send and receive messages.
First we will learn how to download and install Kafka in different operating systems, then we will write code in Java (Spring Boot framework) for producer and consumer. 

One of the core concepts in Kafka is the idea of a stream, which is a continuous flow of records. These records can be anything - log entries, sensor readings, user events, etc. - and are published by producers to one or more Kafka topics. Consumers can then subscribe to those topics and consume the records in real-time.

To get started with Kafka, you first need to download and install it on your machine. Kafka is available for different operating systems, including Windows, macOS, and Linux. Once you have Kafka installed, you can start using it to build real-time data pipelines.

To write code in Java for producing and consuming messages in Kafka, you can use the Spring Boot framework. Spring Boot provides a simple and easy-to-use interface for working with Kafka, making it a popular choice among developers.

For the producer, you will need to create a KafkaTemplate bean that allows you to send messages to a specific topic. You can then use this template to send messages to Kafka by calling the send() method.

For the consumer, you will need to create a KafkaListener bean that listens for messages on a specific topic. When a message is received, the listener method is called and you can process the message accordingly.

Download and Install Kafka

Download the Kafka binary from the official website: https://kafka.apache.org/downloads

Installing Kafka on Windows

1. Extract the downloaded file to a directory on your computer, such as C:\kafka

2. Open a command prompt and navigate to the Kafka directory using the cd command

3. Start the ZooKeeper server by running the following command:


bin\windows\zookeeper-server-start.bat config\zookeeper.properties

4. Open another command prompt window and start the Kafka server by running the following command:


bin\windows\kafka-server-start.bat config\server.properties

Installing Kafka on macOS

1. Extract the downloaded file to a directory on your computer, such as /Users/your-username/kafka

2. Open a Terminal window and navigate to the Kafka directory using the cd command

3. Start the ZooKeeper server by running the following command:


bin/zookeeper-server-start.sh config/zookeeper.properties

4. Open another Terminal window and start the Kafka server by running the following command:


bin/kafka-server-start.sh config/server.properties

Installing Kafka on Linux

1. Extract the downloaded file to a directory on your computer, such as /opt/kafka

2. Open a Terminal window and navigate to the Kafka directory using the cd command

3. Start the ZooKeeper server by running the following command:


bin/zookeeper-server-start.sh config/zookeeper.properties

4. Open another Terminal window and start the Kafka server by running the following command:


bin/kafka-server-start.sh config/server.properties


Java Syntax and Steps to use kafka

Create a Topic

To create a topic in Kafka, you can use the following command:


bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my_topic

This command creates a new topic called my_topic with one partition and a replication factor of 1.

Produce Messages

To produce messages in Kafka, you can use the following code:


Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092"); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

Producer<String, String> producer = new KafkaProducer<>(props); 

for (int i = 0; i < 10; i++) { 
	String key = "key-" + i; 
	String value = "value-" + i; 
	ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", key, value); producer.send(record); 	
    } 
producer.close();

This code creates a Kafka producer and sends 10 messages to the my_topic topic.

Consume Messages

To consume messages in Kafka, you can use the following code:


Properties props = new Properties(); 

props.put("bootstrap.servers", "localhost:9092"); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("group.id", "my_group"); 

Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); 

while (true) { 

	ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); 

	for (ConsumerRecord<String, String> record : records) {

 	System.out.println("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value()); 

	} 

}

This code creates a Kafka consumer that subscribes to the my_topic topic and prints out the messages it receives.

Close the Producer and Consumer

Finally, when you're finished with the producer and consumer, you should close them to free up resources:


producer.close();

consumer.close();

This code closes the Kafka producer and consumer.


Spring boot with Kafka example

Spring Boot is a popular framework for building Java applications, and it has great support for working with Kafka. In this tutorial, we'll cover how to use Spring Boot with Kafka to create a simple example.


Step 1: Set up Kafka Environment

Before we start building our Spring Boot application, we need to set up a Kafka environment. You can follow the previous topic from this tutorial to install Kafka on your operating system.


Step 2: Adding Dependency in Spring Boot Project

To create a Spring Boot project, you can use Spring Initialiser or your preferred IDE. Here's an example of the dependencies you'll need to include in your pom.xml file:


<dependency> 

    <groupId>org.springframework.boot</groupId>

     <artifactId>spring-boot-starter-web</artifactId>

</dependency>   

<dependency> 

    <groupId>org.springframework.kafka</groupId> 

    <artifactId>spring-kafka</artifactId> 

</dependency>

This configuration includes the spring-kafka dependency, which provides support for working with Kafka in Spring.

Step 3: Create Kafka Configuration

Next, we'll create a configuration file for Kafka. Here's an example:


@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
 
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
 
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
 
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

This configuration creates the necessary beans to connect to Kafka and send and receive messages.


Step 4: Create a Producer

Now, we'll create a simple REST API that produces messages to Kafka. Here's an example:


@RestController
public class KafkaController {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @Value("${kafka.topic}")
    private String topic;
 
    @PostMapping("/message")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send(topic, message);
    }
}

Step 5: Create a Consumer

Finally, we'll create a simple consumer that listens for messages on the Kafka topic. Here's an example:


@Service
public class KafkaConsumer {
 
    @KafkaListener(topics = "${kafka.topic}", groupId = "group-id")
    public void listen(String message) {
        System.out.println("Received Message in group - group-id: " + message);
    }
}

This code defines a Kafka listener that listens for messages on the Kafka topic specified in the application.properties file. When a message is received, it prints the message to the console.

Step 6: Configure Application Properties

Finally, we'll configure the application properties to specify the Kafka bootstrap servers and topic. Here's an example:


    #KafkaConfiguration
  
    kafka.bootstrap-servers = localhost:9092
    
    kafka.topic = test-topic
  

Step 7: Run the Application

Now, we can run our Spring Boot application and test it out by sending a message to the /message endpoint using any REST client like Postman. You should see the message printed in the console.

This is a basic example of how to use Spring Boot with Kafka. You can extend this example to add more features like error handling, retry logic, and serialization/deserialization.

Hope this blog tutorial has been helpful to you. If you have any remaining doubts or suggestions, please feel free to share them in the comments below. We would be glad to receive your feedback.

💛 You can Click Here to connect with us. We will give our best for you.

Comments

Popular Posts on Code Katha

Java Interview Questions for 10 Years Experience

Sql Interview Questions for 10 Years Experience

Spring Boot Interview Questions for 10 Years Experience

Visual Studio Code setup for Java and Spring with GitHub Copilot

Java interview questions - Must to know concepts

Spring Data JPA

Data Structures & Algorithms Tutorial with Coding Interview Questions

Elasticsearch Java Spring Boot

Java interview questions for 5 years experience