Why Kafka?
Kafka has become the mainstay of messaging in applications with high-volume requirements
According to confluent.io,
Apache Kafka is an open source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.
This basically means two systems can talk to each other, in a low-to-none overhead and high velocity manner.
This article talks about how to make an application (well, actually two) that explains, in the most basic sense, how Kafka works in distributed systems.
We have used Spring Boot + Java for this purpose
Let us say we distribute our "application" in such a way that one application takes care of serving the REST requests, and takes care of things like validation, data-elimination and the second application takes care of saving the data to the database.
Here we have made sure that one application will only be catering to REST requests and do some in-memory operations required on the request, the other will always keep listening to the first application and make sure that it saves all the data that the first application will send.
We already have separation of concern in place here. If you look at our project structures below, one application that caters REST requests does not have a repository, while the other one that saves data does not have a controller.
This is how our project structure is going to look like
The REST application is called a "producer" since it will be producing the messages to be sent to Kafka, the database application is called a "consumer", since it will be consuming the messages from Kafka
Producer application
Consumer application
Producer Application
In our application, we have a REST endpoint called /user/save
that resides in the Producer application
The User model will look something like this:
public class User {
private String id;
private String name;
private Integer age;
//getters and setters
}
The Rest controller just has a call to service with the User
model
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private UserService userService;
@PostMapping("/save")
public String saveUser(@RequestBody User user) throws JsonProcessingException {
return userService.savingUser(user);
}
}
The Kafkaesque magic happens inside the service
public String savingUser(User user) throws JsonProcessingException {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
ObjectMapper mapper = new ObjectMapper();
String userAsString = mapper.writeValueAsString(user);
ProducerRecord<String, String> record = new ProducerRecord<>("MyTopic", userAsString);
producer.send(record);
}
return user.getName();
}
In the Properties
calls, we are just configuring the Kafka properties to the most default and bare minimum
Then we initialize a KafkaProducer
, this is a Default class inside the Kafka dependency.
A dependency is required to get this class available at classpath. We have used gradle and this is what the dependency list inside our build.gradle
looks like:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.apache.kafka:kafka-clients'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
After creation of Producer, we essentially convert the User model to a json String, so it can be sent as a "message" to another system.
We then generate a ProducerRecord
which mentions what the message is (userAsString
) and where it wants to go (a Kafka Topic)
Then comes the main call that actually sends this data to the message pipeline,
producer.send(record);
At this point, we can say that our message has been queued in kafka, and it is ready to be consumed
Exit Producer
Enter Consumer
Consumer Application
Now we need a means to consume this message.
At this point,
- someone has made a request to our rest endpoint to save the user,
- the data of the user to be saved has been queued to kafka.
Now we will need to actually save this data to a database. This is where the Consumer comes in
The heart of this project is UserConsumer
, a service that consumes the Kafka message that has been waiting in the queue.
The Consumer has one and only one method, whose job is to keep listening to Kafka whether or not Kafka has something to say.
The run method looks something like this
@Override
@PostConstruct
public void run() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-application");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList("MyTopic"));
System.out.println("INSIDE RUN OF CONSUMER");//should've used a logger
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("VALUE:" + record.value());
try {
userRepository.save(new ObjectMapper().readValue(record.value(), User.class));
System.out.println(userRepository.findAll().size());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
}
Here, as usual, with the Properties calls, we are bootstrapping our kafka configurations.
We then initiate a KafkaConsumer, that subscribes to the topic that the Producer sent the message to, named MyTopic
.
Producer sent a message to topic MyTopic
, Consumer is listening to topic MyTopic
.
The message sent from our producer will reach the consumer, if everything goes fine.
Now we are converting this String message back to our Model class.
This Model class has a similar Structure to our Producer's Model class, however it is decorated with some Database specific annotations.
@Entity
@Table(name = "user_table")
public class User {
@Id
private String id;
private String name;
private Integer age;
// getters and setters
}
Then, this message is sent to the repository
's save
method, where we have our database logic abstracted.
The Repository
is a basic JPARepository which saves this data into a database.
Repository configuration:
server.port=8081
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres
spring.jpa.hibernate.ddl-auto=update
spring.datasource.username=<USERNAME>
spring.datasource.password=<PASSWORD>
At this point, the User is now saved to the database.
...One more thing
Though this is a very basic application, the usage of Kafka here gave us a very distinct advantage over a traditional application.
Suppose our Consumer application is down, the caller of the API can hit the endpoint, the User data will be queued, and whenever the Consumer gets up again, the message will be read and the User will be saved. Our user got saved despite the database application being down! However, this exercise is left for the user to perform.
If you want to try this (these) project(s) on your own, here are the links to the github repository(ies). I will try to keep the ReadMe updated.