Cascading job to count the empty tags in a XML file

The below cascading job is used to count the empty tags in a XML file and exports the output to a text file.

package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import org.apache.commons.lang3.StringUtils;
import org.jdom2.JDOMException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

//import cascading.scheme.local.TextDelimited;
//import cascading.scheme.local.TextLine;

 * Cascading job to count the number of empty tags found in a XML file.

public class CascadingEmptyTagCounter {

    public static Logger LOGGER = LoggerFactory.getLogger(CascadingEmptyTagCounter.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
  "Usage CascadingEmptyTagCounter <INPUT> <OUTPUT> ");

        //input path & output path
        String inputPath = args[0];
        String outputPath = args[1];"inputPath:{}", inputPath);"outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, CascadingEmptyTagCounter.class);

        //Source and Sink Tap. Use Hfs. if you are testing in local, then use FileTap
        Tap inTap = new Hfs(new TextLine(new Fields("line")), inputPath);
        Tap outTap = new Hfs(new TextDelimited(new Fields("line")), outputPath, SinkMode.REPLACE);

        Pipe input = new Pipe("input");
        Pipe dataPipe = new Each(input, new EmptyTagCounter(Fields.ALL));

        //Use LocalFlowConnector if you are testing in local
        HadoopFlowConnector flowConnector = new HadoopFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).
                setName("Cascading extractor Job");

     * Cascading function to count the Empty tags
    public static class EmptyTagCounter extends BaseOperation implements Function {

        private static final long serialVersionUID = 1L;

        private static List tags = Arrays.asList("<sub />", "<sup />", "<b />", "<i />");

        public EmptyTagCounter(Fields fields) {
            super(1, fields);

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
            Tuple tuple = new Tuple();
            try {
                Map tagCountMap = getEmptyTagCounts(arguments.getTuple().getString(0));
                Set tagsSet = tagCountMap.entrySet();
                StringBuilder tagCounter = new StringBuilder();
                for (Map.Entry tagEntry : tagsSet) {
                    tagCounter.append(tagEntry.getKey() + "::" + tagEntry.getValue()).append("\n");
            } catch (JDOMException | IOException e) {
                LOGGER.error("XML parsing error", e);

        public static Map getEmptyTagCounts(String xmlData) throws JDOMException, IOException {
            Map tagsCountMap = new HashMap();
            for (String tag : tags) {
                tagsCountMap.put(tag, String.valueOf(StringUtils.countMatches(xmlData, tag)));
            return tagsCountMap;


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s