Spring Boot Kafka Tutorial
Introduction-
Download and Install Kafka
Download the Kafka binary from the official website: https://kafka.apache.org/downloads
Installing Kafka on Windows
1. Extract the downloaded file to a directory on your computer, such as C:\kafka
2. Open a command prompt and navigate to the Kafka directory using the cd command
3. Start the ZooKeeper server by running the following command:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
4. Open another command prompt window and start the Kafka server by running the following command:
bin\windows\kafka-server-start.bat config\server.properties
Installing Kafka on macOS
1. Extract the downloaded file to a directory on your computer, such as /Users/your-username/kafka
2. Open a Terminal window and navigate to the Kafka directory using the cd command
3. Start the ZooKeeper server by running the following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
4. Open another Terminal window and start the Kafka server by running the following command:
bin/kafka-server-start.sh config/server.properties
Installing Kafka on Linux
1. Extract the downloaded file to a directory on your computer, such as /opt/kafka
2. Open a Terminal window and navigate to the Kafka directory using the cd command
3. Start the ZooKeeper server by running the following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
4. Open another Terminal window and start the Kafka server by running the following command:
bin/kafka-server-start.sh config/server.properties
Java Syntax and Steps to use kafka
Create a Topic
To create a topic in Kafka, you can use the following command:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my_topic
This command creates a new topic called my_topic with one partition and a replication factor of 1.
Produce Messages
To produce messages in Kafka, you can use the following code:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "value-" + i;
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", key, value); producer.send(record);
}
producer.close();
This code creates a Kafka producer and sends 10 messages to the my_topic topic.
Consume Messages
To consume messages in Kafka, you can use the following code:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "my_group");
Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
}
}
This code creates a Kafka consumer that subscribes to the my_topic topic and prints out the messages it receives.
Close the Producer and Consumer
Finally, when you're finished with the producer and consumer, you should close them to free up resources:
producer.close();
consumer.close();
This code closes the Kafka producer and consumer.
Spring boot with Kafka example
Spring Boot is a popular framework for building Java applications, and it has great support for working with Kafka. In this tutorial, we'll cover how to use Spring Boot with Kafka to create a simple example.
Step 1: Set up Kafka Environment
Before we start building our Spring Boot application, we need to set up a Kafka environment. You can follow the previous topic from this tutorial to install Kafka on your operating system.
Step 2: Adding Dependency in Spring Boot Project
To create a Spring Boot project, you can use Spring Initialiser or your preferred IDE. Here's an example of the dependencies you'll need to include in your pom.xml file:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
This configuration includes the spring-kafka dependency, which provides support for working with Kafka in Spring.
Step 3: Create Kafka Configuration
Next, we'll create a configuration file for Kafka. Here's an example:
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
This configuration creates the necessary beans to connect to Kafka and send and receive messages.
Step 4: Create a Producer
Now, we'll create a simple REST API that produces messages to Kafka. Here's an example:
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic}")
private String topic;
@PostMapping("/message")
public void sendMessage(@RequestBody String message) {
kafkaTemplate.send(topic, message);
}
}
Step 5: Create a Consumer
Finally, we'll create a simple consumer that listens for messages on the Kafka topic. Here's an example:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic}", groupId = "group-id")
public void listen(String message) {
System.out.println("Received Message in group - group-id: " + message);
}
}
This code defines a Kafka listener that listens for messages on the Kafka topic specified in the application.properties file. When a message is received, it prints the message to the console.
Step 6: Configure Application Properties
Finally, we'll configure the application properties to specify the Kafka bootstrap servers and topic. Here's an example:
#KafkaConfiguration
kafka.bootstrap-servers = localhost:9092
kafka.topic = test-topic
Step 7: Run the Application
Now, we can run our Spring Boot application and test it out by sending a message to the /message endpoint using any REST client like Postman. You should see the message printed in the console.
This is a basic example of how to use Spring Boot with Kafka. You can extend this example to add more features like error handling, retry logic, and serialization/deserialization.
Hope this blog tutorial has been helpful to you. If you have any remaining doubts or suggestions, please feel free to share them in the comments below. We would be glad to receive your feedback.
💛 You can Click Here to connect with us. We will give our best for you.
Comments
Post a Comment