How to check Kafka topic properties and node information in Zookeeper

We can verify the kafka topic properties by running the below command in the installation directory of Kafka


  bin/kafka-topics.sh --zookeeper zookeeper.host.name:2181 --topic topic1 --describe

So you will have something like below,


Topic:topic1         PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: topic1       Partition: 0    Leader: 100     Replicas: 100,101,102   Isr:  100,101,102

So with that information, we can identify that there the replication factor for this topic is 3 and replicas broker node ids.

If we want to verify the same in Zookeeper then follow the below steps,

Login into the Zookeeper node and go to the installation directory and go into bin folder.

Then run zkCli as below


 /.zkCli.sh

Then Enter the below command and see all the broker nodes id details


  ls /brokers/ids

Then Enter the below command to know the details about the topic


 get /brokers/topics/topic1

Please refer below link for more commands, https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

Advertisements

Spark Job for Removing Empty XML tags

The below spark program will read a set of XML files, parse it and then remove the empty tags and finally write the output as a sequence file.

package com.spark

import org.apache.spark.{SparkConf, SparkContext}

//Find empty tags and remove it and then write as sequence files
object TagHandler {
 def main(args: Array[String]) {
 if (args.length < 2) {
     println("Usage <Source File or Directory> <Destination File or Directory>")
     // bin/spark-submit --class com.spark.TagHandler --master local tagHandler-assembly-1.0.jar /data/xml /data/output
 }

 val inFile = args(0)
 val outFile = args(1)

 val htmlTags = List("<sub/>", "<sup/>", "<i/>", "<b/>")

 val conf = new SparkConf().setAppName("TagHandler")
 val sc = new SparkContext(conf)
 val wholeFiles = sc.wholeTextFiles(inFile)

 wholeFiles.collect().foreach { case (fileName, content) =>
 var newContent = content
 for (tag <- htmlTags) {
     newContent = newContent.replace(newContent, "")
 }
 val data = sc.parallelize(List((fileName, newContent)))
 data.saveAsSequenceFile(outFile)
 }
 sc.stop()
 }
}