Building a real-time big data pipeline 1 : Kafka, RESTful, Java

Updated on September 20, 2021

Apache Kafka is used for building real-time data pipelines and streaming apps. Kafka is a message broker, which helps transmit messages from one system to another. Zookeeper is required to run a Kafka Cluster. Apache ZooKeeper is primarily used to track the status of nodes in the Kafka Cluster and maintain a list of Kafka topics and messages. Starting with v2.8, Kafka can be run without ZooKeeper. However, this update isn’t ready for use in production.

Kafka for local development of applications:.

There are multiple ways of running Kafka locally for development of apps but the easiest method is by docker-compose. First download Docker Desktop,Docker Huband SignIn with Docker ID.

Docker Compose is included as part of this desktop.docker-composefacilitates installing Kafka and Zookeeper with the help ofdocker-compose.yml

Create a file called docker-compose.yml in your project directory and paste the following lines of code:

The above compose file defines two services: zookeeper and kafka. The zookeeper service uses a public zookeeper image and the kafka service uses a public kafka image pulled from the Docker Hub registry.

1. Start the Kafka service.
Open a terminal, go to the directory where you have the docker-compose.yml file, and execute the following command. This command starts the docker-compose engine, and downloads the images and runs them.

$docker compose up -d

To list all running docker containers, run the following command

$docker compose ps

You can shut down docker-compose by executing the following command in another terminal.

$docker compose down

Start the kafka service and check the ZooKeeper logs to verify that ZooKeeper is working and healthy.

$docker compose logs zookeeper | grep -i binding

Next, check the Kafka logs to verify that broker is working and healthy.

$docker compose logs kafka | grep -i started

Two fundamental concepts in Kafka are Topics and Partitions. Kafka topics are divided into a number of partitions. While the topic is a logical concept in Kafka, a partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion.

2. Create a Kafka topic
The Kafka cluster stores streams of records in categories called topics. Each record in a topic consists of a key, a value, and a timestamp. A topic can have zero, one, or many consumers that subscribe to the data written to it.

Use docker compose exec to execute a command in a running container. For example, docker compose exec command by default allocates a TTY, so that you can use such a command to get an interactive prompt. Go into directory where docker-compose.yml file present, and execute it as

$docker compose exec kafka bash

(for zookeeper $docker compose exec zookeeper bash)

Change the directory to /opt/kafka/bin where you find scripts such as

cd /opt/kafka/bin

Kafka topics are divided into a number of partitions. Each partition in a topic is an ordered, immutable sequence of records that continually appended.

Create a new topic or, list or delete existing topics:

bash-4.4# ./ --list --bootstrap-server localhost:9092

If necessary, delete a topic using the following command.

bash-4.4# ./ --delete --topic mytopic --bootstrap-server localhost:9092

3. How to produce and consume messages from kafka Topic using the command line?
Kafka producers are those client applications that publish (write) events to Kafka, and Kafka consumers are those that subscribe (read and process) to these events. A producer can use a partition key to direct messages to a specific partition. If a producer doesn’t specify a partition key when producing a record, Kafka will use a round-robin partition assignment.

Kafka broker (a.k.a Kafka server/node) is the server node in the cluster, mainly responsible for hosting partitions of Kafka topics, transferring messages from Kafka producer to Kafka consumer and, providing data replication and partitioning within a Kafka Cluster.

The following is a producer command line to read data from standard input and write it to a Kafka topic.

The following is a command line to read data from a Kafka topic and write it to standard output.

4. How to consume messages from kafka Topic using java web application?

Another way of reading data from a Kafka topic is by simply using a Java Spring Boot.

The following demonstrates how to receive messages from Kafka topic. First in this blog I create a Spring Kafka Consumer, which is able to listen the messages sent to a Kafka topic using the commandline. Then, I create a Spring Kafka Producer, which is able to send messages to a Kafka topic.

Download Spring Tool Suite4 and install it.
At Eclipse IDE’s Package Explorer click “Create new Spring Starter Project” and
Name: SpringBootKafka
Type: Maven project
Spring Boot Version: 2.5.4
Java Version: 8 Search “kafka” at New Spring Starter Project Dependencies and select “Spring for Apache Kafka“
Click Finish.

The Spring Initializr creates the following simple application class for you.

@SpringBootApplication annotation is equivalent to using @Configuration, @EnableAutoConfiguration, and @ComponentScan. As a result, when we run this Spring Boot application, it will automatically scan the components in the current package and its sub-packages. Thus it will register them in Spring’s Application Context, and allow us to inject beans using @Autowired.

Configure Kafka through application.yml configuration file
In Spring Boot, properties are kept in the file under the classpath. The file is located in the src/main/resources directory. Change file to application.yml, then add the following content.