How to create a Kafka topic in Java – Kafka version 0.8.2.1

The below code is useful for creating the Kafka topic via Java code. Please note that the KAFKA_TOPIC and KAFKA_ZOOKEEPER_HOSTS are to be supplied as the VM arguments.

Sample values are given below
KAFKA_TOPIC=kafka_topic
KAFKA_ZOOKEEPER_HOSTS=MACHINE1_DOMAIN_NAME:2181,MACHINE2_DOMAIN_NAME:2181
KAFKA_BROKER_HOSTS=MACHINE1_DOMAIN_NAME:9092,MACHINE2_DOMAIN_NAME:9092

Replace MACHINE1_DOMAIN_NAME, MACHINE2_DOMAIN_NAME with appropriate domain name of your machine or the zookeeper server host machine

If you have only one Kafka server, then you can remove MACHINE2_DOMAIN_NAME from the KAFKA_ZOOKEEPER_HOSTS value. You can add as many as hosts separated by comma

Maven


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
    <scope>compile</scope>
    <exclusions>
        <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.4</version>
</dependency>



import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class KafkaTopicCreator {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTopicCreator.class);

    public static void main(String[] args) {

        String topicName = System.getEnv("KAFKA_TOPIC");
        String zookeeperHosts = System.getEnv("KAFKA_ZOOKEEPER_HOSTS");
        String kafkaBrokerHosts = System.getEnv("KAFKA_BROKER_HOSTS");
        int sessionTimeOut = 10000;
        int connectionTimeOut = 10000;
        LOGGER.info("zookeeperHosts:{}", zookeeperHosts);
        ZkClient zkClient = new ZkClient(zookeeperHosts, sessionTimeOut, connectionTimeOut, ZKStringSerializer$.MODULE$);
        if (!AdminUtils.topicExists(zkClient, topicName)) {
            int replicationFactor = kafkaBrokerHosts.split(",").length;
            AdminUtils.createTopic(zkClient, topicName, 1, replicationFactor, new Properties());
        } else {
            LOGGER.info("{} is available hence no changes are done");
        }
        LOGGER.info("Topic Details:{}", AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient));
        
    }
}

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s