Rest API to produce message to Kafka using Docker Maven Plugin

I have developed a simple REST API to send the incoming message to Apache Kafka.

I have used Docker Kafka (https://github.com/spotify/docker-kafka) and the Docker Maven Plugin(https://github.com/fabric8io/docker-maven-plugin) to do this.

So before going through this post be familiarize yourself with Docker and Docker Compose

Docker Maven Plugin[Docker Maven Plugin] provides us a nice way to specify multiple images in POM.xml and link it as necessary. We can also use Docker compose for doing this. But I have used this plugin here.

    1. Clone the project (https://github.com/dkbalachandar/kafka-message-sender)
    2. Then go into kafka-message-sender folder
    3. Then enter ‘mvn clean install’
    4. Then enter  ‘mvn docker:start’. Then enter ‘docker ps’ and make sure that there are two containers are running. The name of those containers are kafka, kafka-rest
    5. Then access http://localhost:8080/api/kafka/send/test (POST) and confirm that you see message has been sent on the browser
    6. Then enter the below command and make sure that whatever message which you sent is available at Kafka[Kafka Command Line Consumer] or you can also consume via a Flume agent[Kafka Flume Agent Consumer]
docker exec -it kafka /opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Advertisements

How to check Kafka topic properties and node information in Zookeeper

We can verify the kafka topic properties by running the below command in the installation directory of Kafka


  bin/kafka-topics.sh --zookeeper zookeeper.host.name:2181 --topic topic1 --describe

So you will have something like below,


Topic:topic1         PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: topic1       Partition: 0    Leader: 100     Replicas: 100,101,102   Isr:  100,101,102

So with that information, we can identify that there the replication factor for this topic is 3 and replicas broker node ids.

If we want to verify the same in Zookeeper then follow the below steps,

Login into the Zookeeper node and go to the installation directory and go into bin folder.

Then run zkCli as below


 /.zkCli.sh

Then Enter the below command and see all the broker nodes id details


  ls /brokers/ids

Then Enter the below command to know the details about the topic


 get /brokers/topics/topic1

Please refer below link for more commands, https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

How to create a Kafka topic in Java – Kafka version 0.8.2.1

The below code is useful for creating the Kafka topic via Java code. Please note that the KAFKA_TOPIC and KAFKA_ZOOKEEPER_HOSTS are to be supplied as the VM arguments.

Sample values are given below
KAFKA_TOPIC=kafka_topic
KAFKA_ZOOKEEPER_HOSTS=MACHINE1_DOMAIN_NAME:2181,MACHINE2_DOMAIN_NAME:2181
KAFKA_BROKER_HOSTS=MACHINE1_DOMAIN_NAME:9092,MACHINE2_DOMAIN_NAME:9092

Replace MACHINE1_DOMAIN_NAME, MACHINE2_DOMAIN_NAME with appropriate domain name of your machine or the zookeeper server host machine

If you have only one Kafka server, then you can remove MACHINE2_DOMAIN_NAME from the KAFKA_ZOOKEEPER_HOSTS value. You can add as many as hosts separated by comma

Maven


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
    <scope>compile</scope>
    <exclusions>
        <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.4</version>
</dependency>



import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class KafkaTopicCreator {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTopicCreator.class);

    public static void main(String[] args) {

        String topicName = System.getEnv("KAFKA_TOPIC");
        String zookeeperHosts = System.getEnv("KAFKA_ZOOKEEPER_HOSTS");
        String kafkaBrokerHosts = System.getEnv("KAFKA_BROKER_HOSTS");
        int sessionTimeOut = 10000;
        int connectionTimeOut = 10000;
        LOGGER.info("zookeeperHosts:{}", zookeeperHosts);
        ZkClient zkClient = new ZkClient(zookeeperHosts, sessionTimeOut, connectionTimeOut, ZKStringSerializer$.MODULE$);
        if (!AdminUtils.topicExists(zkClient, topicName)) {
            int replicationFactor = kafkaBrokerHosts.split(",").length;
            AdminUtils.createTopic(zkClient, topicName, 1, replicationFactor, new Properties());
        } else {
            LOGGER.info("{} is available hence no changes are done");
        }
        LOGGER.info("Topic Details:{}", AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient));
        
    }
}

Kafka – Multiple producer threads

In my application, I use Kafka for logging the user events. So the user events are collected in XML format and send it to Kafka. So from Kafka it will be consumed by Flume agent.

In my API, We create a producer thread for each event. So after the message is sent to Kafka, then this producer is closed. This is perfectly fine but while testing these changes in our Load env, we have got an issue. The issue is that that Kafka server could not allocate the resources for the Kafka producer thread and its also complaining that there are two many open files.

So to avoid this issue, we have to increase the open file limit or change our API to create only one Kafka producer instance which is responsible for producing all the messages.

As Kafka producer instance is thread safe, the second solution is the correct fit for this issue

Kafka – Command Line Consumer

If you have an application deployed on Docker and the application send messages to Kafka and if you want to verify the messages without running a Junit test case or Flume agent, then you can use the below command


docker exec -it KAFKA_CONTAINER_ID /opt/kafka_2.11-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper ZOOKEEPER_DOMAIN_NAME:2181 --topic TOPIC_NAME --from-beginning

Apache Flume Vs Apache Kafka

Kafka Flume
Publish subscribe messaging system Its a service for collecting, aggregating and moving the large amounts of data to hadoop or process and persists the data into a relational database systems
The messages are replicated in multiple broker nodes, so in case of failure, we can easily retrieve back the message It does not replicates the events/data, so in case of node failure, the data will be lost
Its a pull messaging system so the message is still available for some number of days. So the client with different consumer group can pull the message Data is pushed to the destination which could be logger, hadoop or Custom Sink. So the messages wont be stored as like in Kafka

Both systems can be used together. So the messages can be pushed to Kafka and the same would be consumed by Flume agent with KafkaSource and the data also can be pushed to the KafkaSink.