Cascading Job to remove Empty Tags from a XML file

The below cascading job is used to remove the empty tags found in a XML file.

package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

//import cascading.flow.local.LocalFlowConnector;
//import cascading.scheme.local.TextDelimited;
//import cascading.scheme.local.TextLine;
//import cascading.tap.local.FileTap;

 * Cascading job to replace the empty html tags found in a XML file

public class CascadingEmptyTagReplacer {

    public static Logger LOGGER = LoggerFactory.getLogger(CascadingEmptyTagReplacer.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
  "Usage CascadingEmptyTagReplacer <INPUT> <OUTPUT>");
        //input path & output path
        String inputPath = args[0];
        String outputPath = args[1];"inputPath:{}", inputPath);"outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, CascadingEmptyTagReplacer.class);

        //Source and Sink Tap. Use Hfs. if you are testing in local, then use FileTap
        Tap inTap = new Hfs(new TextLine(new Fields("line")), inputPath);
        Tap outTap = new Hfs(new TextDelimited(new Fields("line")), outputPath, SinkMode.REPLACE);

        Pipe input = new Pipe("input");
        Pipe dataPipe = new Each(input, new EmptyTagReplacer(Fields.ALL));

        //Use LocalFlowConnector if you are testing local
        HadoopFlowConnector flowConnector = new HadoopFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).
                setName("Cascading Empty Tag Replacer Job");

     * Custom Function to replace Empty tags in the XML content
    public static class EmptyTagReplacer extends BaseOperation implements Function {

        private static final long serialVersionUID = -5108505951262118306L;

        private static List tags = Arrays.asList("<sub/>", "<sup/>", "<b/>", "<i/>");

        public EmptyTagReplacer(Fields fields) {
            super(1, fields);

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
            Tuple tuple = new Tuple();
            String xmlData = arguments.getTuple().getString(0);
            for (String tag : tags) {
                xmlData = xmlData.replace(tag, "");

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s