Cascading job to count the empty tags in a XML file

The below cascading job is used to count the empty tags in a XML file and exports the output to a text file.



package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import org.apache.commons.lang3.StringUtils;
import org.jdom2.JDOMException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

//import cascading.scheme.local.TextDelimited;
//import cascading.scheme.local.TextLine;

/**
 * Cascading job to count the number of empty tags found in a XML file.
 */

public class CascadingEmptyTagCounter {

    public static Logger LOGGER = LoggerFactory.getLogger(CascadingEmptyTagCounter.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
            LOGGER.info("Usage CascadingEmptyTagCounter <INPUT> <OUTPUT> ");
            return;
        }

        //input path & output path
        String inputPath = args[0];
        String outputPath = args[1];

        LOGGER.info("inputPath:{}", inputPath);
        LOGGER.info("outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, CascadingEmptyTagCounter.class);

        //Source and Sink Tap. Use Hfs. if you are testing in local, then use FileTap
        Tap inTap = new Hfs(new TextLine(new Fields("line")), inputPath);
        Tap outTap = new Hfs(new TextDelimited(new Fields("line")), outputPath, SinkMode.REPLACE);

        Pipe input = new Pipe("input");
        Pipe dataPipe = new Each(input, new EmptyTagCounter(Fields.ALL));

        //Use LocalFlowConnector if you are testing in local
        HadoopFlowConnector flowConnector = new HadoopFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).
                setName("Cascading extractor Job");
        flowConnector.connect(flowDef).complete();
    }

    /**
     * Cascading function to count the Empty tags
     */
    public static class EmptyTagCounter extends BaseOperation implements Function {

        private static final long serialVersionUID = 1L;

        private static List tags = Arrays.asList("<sub />", "<sup />", "<b />", "<i />");

        public EmptyTagCounter(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            Tuple tuple = new Tuple();
            try {
                Map tagCountMap = getEmptyTagCounts(arguments.getTuple().getString(0));
                Set tagsSet = tagCountMap.entrySet();
                StringBuilder tagCounter = new StringBuilder();
                for (Map.Entry tagEntry : tagsSet) {
                    tagCounter.append(tagEntry.getKey() + "::" + tagEntry.getValue()).append("\n");
                }
                tuple.add(tagCounter.toString());
            } catch (JDOMException | IOException e) {
                LOGGER.error("XML parsing error", e);
            }
            functionCall.getOutputCollector().add(tuple);
        }

        public static Map getEmptyTagCounts(String xmlData) throws JDOMException, IOException {
            Map tagsCountMap = new HashMap();
            for (String tag : tags) {
                tagsCountMap.put(tag, String.valueOf(StringUtils.countMatches(xmlData, tag)));
            }
            return tagsCountMap;
        }
    }
}


Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s