Rest API to produce message to Kafka using Docker Maven Plugin

I have developed a simple REST API to send the incoming message to Apache Kafka.

I have used Docker Kafka (https://github.com/spotify/docker-kafka) and the Docker Maven Plugin(https://github.com/fabric8io/docker-maven-plugin) to do this.

So before going through this post be familiarize yourself with Docker and Docker Compose

Docker Maven Plugin[Docker Maven Plugin] provides us a nice way to specify multiple images in POM.xml and link it as necessary. We can also use Docker compose for doing this. But I have used this plugin here.

    1. Clone the project (https://github.com/dkbalachandar/kafka-message-sender)
    2. Then go into kafka-message-sender folder
    3. Then enter ‘mvn clean install’
    4. Then enter  ‘mvn docker:start’. Then enter ‘docker ps’ and make sure that there are two containers are running. The name of those containers are kafka, kafka-rest
    5. Then access http://localhost:8080/api/kafka/send/test (POST) and confirm that you see message has been sent on the browser
    6. Then enter the below command and make sure that whatever message which you sent is available at Kafka[Kafka Command Line Consumer] or you can also consume via a Flume agent[Kafka Flume Agent Consumer]
docker exec -it kafka /opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

How to check Kafka topic properties and node information in Zookeeper

We can verify the kafka topic properties by running the below command in the installation directory of Kafka


  bin/kafka-topics.sh --zookeeper zookeeper.host.name:2181 --topic topic1 --describe

So you will have something like below,


Topic:topic1         PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: topic1       Partition: 0    Leader: 100     Replicas: 100,101,102   Isr:  100,101,102

So with that information, we can identify that there the replication factor for this topic is 3 and replicas broker node ids.

If we want to verify the same in Zookeeper then follow the below steps,

Login into the Zookeeper node and go to the installation directory and go into bin folder.

Then run zkCli as below


 ./zkCli.sh

Then Enter the below command and see all the broker nodes id details


  ls /brokers/ids

Then Enter the below command to know the details about the topic


 get /brokers/topics/topic1

Please refer below link for more commands, https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

How to create a Kafka topic in Java – Kafka version 0.8.2.1

The below code is useful for creating the Kafka topic via Java code. Please note that the KAFKA_TOPIC and KAFKA_ZOOKEEPER_HOSTS are to be supplied as the VM arguments.

Sample values are given below
KAFKA_TOPIC=kafka_topic
KAFKA_ZOOKEEPER_HOSTS=MACHINE1_DOMAIN_NAME:2181,MACHINE2_DOMAIN_NAME:2181
KAFKA_BROKER_HOSTS=MACHINE1_DOMAIN_NAME:9092,MACHINE2_DOMAIN_NAME:9092

Replace MACHINE1_DOMAIN_NAME, MACHINE2_DOMAIN_NAME with appropriate domain name of your machine or the zookeeper server host machine

If you have only one Kafka server, then you can remove MACHINE2_DOMAIN_NAME from the KAFKA_ZOOKEEPER_HOSTS value. You can add as many as hosts separated by comma

Maven


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
    <scope>compile</scope>
    <exclusions>
        <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.4</version>
</dependency>



import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class KafkaTopicCreator {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTopicCreator.class);

    public static void main(String[] args) {

        String topicName = System.getEnv("KAFKA_TOPIC");
        String zookeeperHosts = System.getEnv("KAFKA_ZOOKEEPER_HOSTS");
        String kafkaBrokerHosts = System.getEnv("KAFKA_BROKER_HOSTS");
        int sessionTimeOut = 10000;
        int connectionTimeOut = 10000;
        LOGGER.info("zookeeperHosts:{}", zookeeperHosts);
        ZkClient zkClient = new ZkClient(zookeeperHosts, sessionTimeOut, connectionTimeOut, ZKStringSerializer$.MODULE$);
        if (!AdminUtils.topicExists(zkClient, topicName)) {
            int replicationFactor = kafkaBrokerHosts.split(",").length;
            AdminUtils.createTopic(zkClient, topicName, 1, replicationFactor, new Properties());
        } else {
            LOGGER.info("{} is available hence no changes are done");
        }
        LOGGER.info("Topic Details:{}", AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient));
        
    }
}

Kafka – Multiple producer threads

In my application, I use Kafka for logging the user events. So the user events are collected in XML format and send it to Kafka. So from Kafka it will be consumed by Flume agent.

In my API, We create a producer thread for each event. So after the message is sent to Kafka, then this producer is closed. This is perfectly fine but while testing these changes in our Load env, we have got an issue. The issue is that that Kafka server could not allocate the resources for the Kafka producer thread and its also complaining that there are two many open files.

So to avoid this issue, we have to increase the open file limit or change our API to create only one Kafka producer instance which is responsible for producing all the messages.

As Kafka producer instance is thread safe, the second solution is the correct fit for this issue

Kafka – Command Line Consumer

If you have an application deployed on Docker and the application send messages to Kafka and if you want to verify the messages without running a Junit test case or Flume agent, then you can use the below command


docker exec -it KAFKA_CONTAINER_ID /opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper ZOOKEEPER_DOMAIN_NAME:2181 --topic TOPIC_NAME --from-beginning

Apache Flume Vs Apache Kafka

Kafka Flume
Publish subscribe messaging system Its a service for collecting, aggregating and moving the large amounts of data to hadoop or process and persists the data into a relational database systems
The messages are replicated in multiple broker nodes, so in case of failure, we can easily retrieve back the message It does not replicates the events/data, so in case of node failure, the data will be lost
Its a pull messaging system so the message is still available for some number of days. So the client with different consumer group can pull the message Data is pushed to the destination which could be logger, hadoop or Custom Sink. So the messages wont be stored as like in Kafka

Both systems can be used together. So the messages can be pushed to Kafka and the same would be consumed by Flume agent with KafkaSource and the data also can be pushed to the KafkaSink.

Flume Agent for consuming Kafka messages

Please refer the below steps to create a single node flume agent to consume Kafka messages

1. Download the Flume and install it
2. Checkout the GitHub repo [https://github.com/dkbalachandar/flume-kafka-agent]
3. Copy the flume-kafka-conf.properties[available in the /src/main/resources] into FLUME_INSTALLED_DIRECTORY/conf folder[Update the zookeeper node and the topic details]
4. Then run mvn package and then copy the /target/flume-kafka-agent-1.0-SNAPSHOT-jar-with-dependencies.jar into FLUME_INSTALLED_DIRECTORY/lib folder
5. Run the Flume agent by running the below command

bin/flume-ng agent --conf conf --conf-file conf/flume-kafka-conf.properties --name a1 -Dflume.root.logger=INFO,console

Kafka without Zookeeper??

Zookeeper is an open source coordination service. Apache Kafka can’t work without Zookeeper as it handles lot of things. Few of those are given below

Leader Detection
Commiting offset
Node status
Identifies when a new join or leaves.
Serves the client request

So if the Zookeeper is down, all the above tasks wont be happened. so it leads to major issues. So we have to use Zookeeper cluster and not a single zookeeper node

Why we use Kafka

I have done some analysis to identify the messaging system for logging user specific actions in one of my systems. The user can do various activities on the site and all those actions should be logged that can be used for various analysis.

We also want the guaranteed message delivery and the message should be available and it should be distributed across the cluster so failure of any node should not cause any issue for that message. We can’t predict the number of messages created in a second. So it will be increased gradually based on number of users.

By considering all these points, we prefer to use Apache Kafka instead of RabbitMQ or other messaging system