Kafka Tutorial with Java Spring Boot

A starter's guide

Kafka Tutorial with Java Spring Boot

Why Kafka?

Kafka has become the mainstay of messaging in applications with high-volume requirements

According to confluent.io,

Apache Kafka is an open source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.

This basically means two systems can talk to each other, in a low-to-none overhead and high velocity manner.

This article talks about how to make an application (well, actually two) that explains, in the most basic sense, how Kafka works in distributed systems.

We have used Spring Boot + Java for this purpose

Let us say we distribute our "application" in such a way that one application takes care of serving the REST requests, and takes care of things like validation, data-elimination and the second application takes care of saving the data to the database.

Here we have made sure that one application will only be catering to REST requests and do some in-memory operations required on the request, the other will always keep listening to the first application and make sure that it saves all the data that the first application will send.

We already have separation of concern in place here. If you look at our project structures below, one application that caters REST requests does not have a repository, while the other one that saves data does not have a controller.

This is how our project structure is going to look like

The REST application is called a "producer" since it will be producing the messages to be sent to Kafka, the database application is called a "consumer", since it will be consuming the messages from Kafka

Producer application

Screenshot 2022-09-10 at 17.01.53.png

Consumer application

Screenshot 2022-09-10 at 17.00.46.png

Producer Application

In our application, we have a REST endpoint called /user/save that resides in the Producer application

The User model will look something like this:

public class User {
    private String id;
    private String name;
    private Integer age;
    //getters and setters
}

The Rest controller just has a call to service with the User model

@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    @PostMapping("/save")
    public String saveUser(@RequestBody User user) throws JsonProcessingException {
        return userService.savingUser(user);
    }
}

The Kafkaesque magic happens inside the service

public String savingUser(User user) throws JsonProcessingException {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
            ObjectMapper mapper = new ObjectMapper();
            String userAsString = mapper.writeValueAsString(user);
            ProducerRecord<String, String> record = new ProducerRecord<>("MyTopic", userAsString);
            producer.send(record);
        }
        return user.getName();
    }

In the Properties calls, we are just configuring the Kafka properties to the most default and bare minimum

Then we initialize a KafkaProducer, this is a Default class inside the Kafka dependency.

A dependency is required to get this class available at classpath. We have used gradle and this is what the dependency list inside our build.gradle looks like:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.apache.kafka:kafka-clients'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

After creation of Producer, we essentially convert the User model to a json String, so it can be sent as a "message" to another system.

We then generate a ProducerRecord which mentions what the message is (userAsString) and where it wants to go (a Kafka Topic)

Then comes the main call that actually sends this data to the message pipeline,

producer.send(record);

At this point, we can say that our message has been queued in kafka, and it is ready to be consumed

Exit Producer

Enter Consumer

Consumer Application

Now we need a means to consume this message.

At this point,

  • someone has made a request to our rest endpoint to save the user,
  • the data of the user to be saved has been queued to kafka.

Now we will need to actually save this data to a database. This is where the Consumer comes in

The heart of this project is UserConsumer, a service that consumes the Kafka message that has been waiting in the queue.

The Consumer has one and only one method, whose job is to keep listening to Kafka whether or not Kafka has something to say.

The run method looks something like this

    @Override
    @PostConstruct
    public void run() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-application");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
            consumer.subscribe(Collections.singletonList("MyTopic"));
            System.out.println("INSIDE RUN OF CONSUMER");//should've used a logger

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("VALUE:" + record.value());
                    try {
                        userRepository.save(new ObjectMapper().readValue(record.value(), User.class));
                        System.out.println(userRepository.findAll().size());
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

Here, as usual, with the Properties calls, we are bootstrapping our kafka configurations.

We then initiate a KafkaConsumer, that subscribes to the topic that the Producer sent the message to, named MyTopic.

Producer sent a message to topic MyTopic, Consumer is listening to topic MyTopic.

The message sent from our producer will reach the consumer, if everything goes fine.

Now we are converting this String message back to our Model class.

This Model class has a similar Structure to our Producer's Model class, however it is decorated with some Database specific annotations.

@Entity
@Table(name = "user_table")
public class User {
    @Id
    private String id;
    private String name;
    private Integer age;
    // getters and setters
}

Then, this message is sent to the repository's save method, where we have our database logic abstracted.

The Repository is a basic JPARepository which saves this data into a database.

Repository configuration:

server.port=8081
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres
spring.jpa.hibernate.ddl-auto=update
spring.datasource.username=<USERNAME>
spring.datasource.password=<PASSWORD>

At this point, the User is now saved to the database.

...One more thing

Though this is a very basic application, the usage of Kafka here gave us a very distinct advantage over a traditional application.

Suppose our Consumer application is down, the caller of the API can hit the endpoint, the User data will be queued, and whenever the Consumer gets up again, the message will be read and the User will be saved. Our user got saved despite the database application being down! However, this exercise is left for the user to perform.

If you want to try this (these) project(s) on your own, here are the links to the github repository(ies). I will try to keep the ReadMe updated.

Did you find this article valuable?

Support Rohan Shah's blog by becoming a sponsor. Any amount is appreciated!