Cascading example[Inner Join, Sub Assembly, Operation and Buffer]

In this post, I am going to show an example with Cascading API to perform the inner join on two related dataset

Assume that you have a professor and college details available in a separate XML files and you want to combine both these details and want to generate the consolidated data.

For example, here is our professor.xml file


<professor>
	<pid>PROF-100</pid>
	<pfirstname>John</pfirstname>
	<plastname>Turner</plastname>
	<college>
		<id>COL-100</id>
	</college>
</professor>

The college.xml is given below,


<college>
	<id>COL-100</id>
	<name>Ohio State University</name>
	<location>Ohio</location>
</college>

And You want to get the consolidated XML file as below,


<professor>
	<pid>PROF-100</pid>
	<pfirstname>John</pfirstname>
	<plastname>Turner</plastname>
	<college>
		<id>COL-100</id>
		<name>Ohio State University</name>
		<location>Ohio</location>
	</college>
</professor>

I have created three Sub assemblies here, ProfessorDataAssembly to extract out the Professor data, CollegeDataAssembly to extract out the College data and finally ProfessorCollegeJoinAssembly to combine both these data and generate the consolidated XML with ProfessorCollegeBuffer

Please refer the below code to know how i have done it.

ProfessorCollegeDtailsJob.java


package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
import com.cascading.assembly.ProfessorDataAssembly;
import com.cascading.assembly.ProfessorCollegeJoinAssembly;
import com.cascading.assembly.CollegeDataAssembly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/**
 * This cascading job is used to read Professor and College details and do inner join and
 * create a consolidated XML which has professor and college information
 */
public class ProfessorCollegeDtailsJob {

    public static Logger LOGGER = LoggerFactory.getLogger(ProfessorCollegeDtailsJob.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
            LOGGER.info("Usage ProfessorCollegeDtailsJob <PROFESSOR XML FILE PATH> " +
                    "<COLLEGE XML FILE PATH> <OUTPUT FILE PATH>");
            return;
        }
        //input paths & output path
        String professorDataPath = args[0];
        String collegeDataPath = args[1];
        String outputPath = args[2];

        LOGGER.info("professorDataPath:{}", professorDataPath);
        LOGGER.info("studentDataPath:{}", collegeDataPath);
        LOGGER.info("outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, ProfessorCollegeDtailsJob.class);

        //Source and Sink Tap. Use Hfs for running in hadoop. if you are testing in local, then use FileTap
        Tap professorInTap = new FileTap(new TextLine(new Fields("line")), professorDataPath);
        Tap collegeInTap = new FileTap(new TextLine(new Fields("line")), collegeDataPath);
        Tap outTap = new FileTap(new TextLine(), outputPath, SinkMode.REPLACE);

        //Here goes the Pipe and assemblies
        Pipe professorPipe = new Pipe("professor");
        professorPipe = new ProfessorDataAssembly(professorPipe);
        Pipe collegePipe = new Pipe("college");
        collegePipe = new CollegeDataAssembly(collegePipe);

        Pipe profCollegePipe = new ProfessorCollegeJoinAssembly(professorPipe, collegePipe);
        //Use LocalFlowConnector if you are testing in local
        //Use HadoopFlowConnector if you are running in Hadoop
        LocalFlowConnector flowConnector = new LocalFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource("professor", professorInTap).
                addSource("college", collegeInTap).addTailSink(profCollegePipe, outTap).
                setName("Professor College Details Job");

        flowConnector.connect(flowDef).complete();
    }
}

CollegeDataAssembly.java


package com.cascading.assembly;

import cascading.operation.xml.XPathParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * This class is used to extract out the college data information
 */

public class CollegeDataAssembly extends SubAssembly {

    public CollegeDataAssembly(Pipe input) {
        super(input);
        input = new Each(input, new Fields("line"),
                new XPathParser(new Fields("collegeid", "collegename", "collegelocation"),
                        "/college/id", "/college/name", "/college/location"));
        setTails(input);
    }
}

ProfessorDataAssembly.java


package com.cascading.assembly;

import cascading.operation.xml.XPathParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * This class is used to extract out the professor data information
 */

public class ProfessorDataAssembly extends SubAssembly {

    private static Fields fields = new Fields("pid", "pfirstname", "plastname", "profcollegeid");

    public ProfessorDataAssembly(Pipe input) {
        super(input);
        input = new Each(input, new Fields("line"),
                new XPathParser(fields,
                        "/professor/pid",
                        "/professor/pfirstname",
                        "/professor/plastname",
                        "/professor/college/id"));
        setTails(input);
    }
}


ProfessorCollegeJoinAssembly.java


package com.cascading.assembly;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.pipe.joiner.InnerJoin;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;

/**
 * This will do the inner job on professor and college data and create a consolidated XML
 *
 */
public class ProfessorCollegeJoinAssembly extends SubAssembly {

    public static Logger LOGGER = LoggerFactory.getLogger(ProfessorCollegeJoinAssembly.class);

    Fields fields = new Fields("professor");

    public ProfessorCollegeJoinAssembly(Pipe professorPipe, Pipe collegePipe) {
        super(professorPipe, collegePipe);
        professorPipe = new Each(professorPipe, new PrintDataFunction(Fields.ALL));
        collegePipe = new Each(collegePipe, new PrintDataFunction(Fields.ALL));
        Pipe profCollegePipe = new CoGroup("profCollegePipe",
                professorPipe,
                new Fields("profcollegeid"), collegePipe,
                new Fields("collegeid"),
                new InnerJoin());
        profCollegePipe = new Every(profCollegePipe, new ProfessorCollegeBuffer(new Fields("professor")), fields);
        setTails(profCollegePipe);
    }

    public static class PrintDataFunction extends BaseOperation implements Function {

        private static final long serialVersionUID = -5108505951262118306L;

        public PrintDataFunction(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            Iterator itr = arguments.getTuple().iterator();
            while (itr.hasNext()) {
                LOGGER.info((String) itr.next());
            }
            functionCall.getOutputCollector().add(arguments.getTuple());
        }
    }
}

ProfessorCollegeBuffer.java


package com.cascading.assembly;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

import java.util.Iterator;

/**
 * Create the consolidated output xml
 */

public class ProfessorCollegeBuffer extends BaseOperation implements Buffer {

    public ProfessorCollegeBuffer(Fields outputFieldName) {
        super(outputFieldName);
    }

    @Override
    public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
        BufferCall bufferCall = (BufferCall) operationCall;
        bufferCall.setRetainValues(true);
    }

    @Override
    public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
        Iterator iter = bufferCall.getArgumentsIterator();
        Tuple innerTuple = new Tuple();

        while (iter.hasNext()) {
            TupleEntry entry = iter.next();
            Tuple output = new Tuple();
            output.add("<professor>");
            output.add(entry.getString("pid"));
            output.add(entry.getString("pfirstname"));
            output.add(entry.getString("plastname"));
            output.add("<college>");
            output.add(entry.getString("collegeid"));
            output.add(entry.getString("collegename"));
            output.add(entry.getString("collegelocation"));
            output.add("</college>");
            output.add("</professor>");
            innerTuple.add(output);
        }
        Tuple outputTuple = new Tuple();
        outputTuple.add(innerTuple);
        bufferCall.getOutputCollector().add(outputTuple);
    }
}

Advertisements

Cascading job to count the empty tags in a XML file

The below cascading job is used to count the empty tags in a XML file and exports the output to a text file.



package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import org.apache.commons.lang3.StringUtils;
import org.jdom2.JDOMException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

//import cascading.scheme.local.TextDelimited;
//import cascading.scheme.local.TextLine;

/**
 * Cascading job to count the number of empty tags found in a XML file.
 */

public class CascadingEmptyTagCounter {

    public static Logger LOGGER = LoggerFactory.getLogger(CascadingEmptyTagCounter.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
            LOGGER.info("Usage CascadingEmptyTagCounter <INPUT> <OUTPUT> ");
            return;
        }

        //input path & output path
        String inputPath = args[0];
        String outputPath = args[1];

        LOGGER.info("inputPath:{}", inputPath);
        LOGGER.info("outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, CascadingEmptyTagCounter.class);

        //Source and Sink Tap. Use Hfs. if you are testing in local, then use FileTap
        Tap inTap = new Hfs(new TextLine(new Fields("line")), inputPath);
        Tap outTap = new Hfs(new TextDelimited(new Fields("line")), outputPath, SinkMode.REPLACE);

        Pipe input = new Pipe("input");
        Pipe dataPipe = new Each(input, new EmptyTagCounter(Fields.ALL));

        //Use LocalFlowConnector if you are testing in local
        HadoopFlowConnector flowConnector = new HadoopFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).
                setName("Cascading extractor Job");
        flowConnector.connect(flowDef).complete();
    }

    /**
     * Cascading function to count the Empty tags
     */
    public static class EmptyTagCounter extends BaseOperation implements Function {

        private static final long serialVersionUID = 1L;

        private static List tags = Arrays.asList("<sub />", "<sup />", "<b />", "<i />");

        public EmptyTagCounter(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            Tuple tuple = new Tuple();
            try {
                Map tagCountMap = getEmptyTagCounts(arguments.getTuple().getString(0));
                Set tagsSet = tagCountMap.entrySet();
                StringBuilder tagCounter = new StringBuilder();
                for (Map.Entry tagEntry : tagsSet) {
                    tagCounter.append(tagEntry.getKey() + "::" + tagEntry.getValue()).append("\n");
                }
                tuple.add(tagCounter.toString());
            } catch (JDOMException | IOException e) {
                LOGGER.error("XML parsing error", e);
            }
            functionCall.getOutputCollector().add(tuple);
        }

        public static Map getEmptyTagCounts(String xmlData) throws JDOMException, IOException {
            Map tagsCountMap = new HashMap();
            for (String tag : tags) {
                tagsCountMap.put(tag, String.valueOf(StringUtils.countMatches(xmlData, tag)));
            }
            return tagsCountMap;
        }
    }
}


Cascading Job to remove Empty Tags from a XML file

The below cascading job is used to remove the empty tags found in a XML file.


package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

//import cascading.flow.local.LocalFlowConnector;
//import cascading.scheme.local.TextDelimited;
//import cascading.scheme.local.TextLine;
//import cascading.tap.local.FileTap;

/**
 * Cascading job to replace the empty html tags found in a XML file
 */

public class CascadingEmptyTagReplacer {

    public static Logger LOGGER = LoggerFactory.getLogger(CascadingEmptyTagReplacer.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
            LOGGER.info("Usage CascadingEmptyTagReplacer <INPUT> <OUTPUT>");
            return;
        }
        //input path & output path
        String inputPath = args[0];
        String outputPath = args[1];

        LOGGER.info("inputPath:{}", inputPath);
        LOGGER.info("outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, CascadingEmptyTagReplacer.class);

        //Source and Sink Tap. Use Hfs. if you are testing in local, then use FileTap
        Tap inTap = new Hfs(new TextLine(new Fields("line")), inputPath);
        Tap outTap = new Hfs(new TextDelimited(new Fields("line")), outputPath, SinkMode.REPLACE);

        Pipe input = new Pipe("input");
        Pipe dataPipe = new Each(input, new EmptyTagReplacer(Fields.ALL));

        //Use LocalFlowConnector if you are testing local
        HadoopFlowConnector flowConnector = new HadoopFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).
                setName("Cascading Empty Tag Replacer Job");
        flowConnector.connect(flowDef).complete();
    }

    /**
     * Custom Function to replace Empty tags in the XML content
     */
    public static class EmptyTagReplacer extends BaseOperation implements Function {

        private static final long serialVersionUID = -5108505951262118306L;

        private static List tags = Arrays.asList("<sub/>", "<sup/>", "<b/>", "<i/>");

        public EmptyTagReplacer(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            Tuple tuple = new Tuple();
            String xmlData = arguments.getTuple().getString(0);
            for (String tag : tags) {
                xmlData = xmlData.replace(tag, "");
            }
            tuple.add(xmlData);
            functionCall.getOutputCollector().add(tuple);
        }
    }
}


Cascading SubAssembly Example

Cascading SubAssemblies are reusable pipe assemblies that are linked into larger pipe assemblies. Think of them as subroutines in a programming language. This helps commonly used pipe assemblies to be packaged into libraries and those can be used in the complex flow

I have created a cascading job to do the below process

Read a tsv[tab separated text file] file which contains the user name, age and dept details. Assume that we want to remove the users whose age is more than or equals to 30. Then group by deptId and output the deptId and count in a tsv file

This is a simple job for only example purpose. In the real scenario, we may end up with different problems.

Refer the below SubAssembly code. Here we have separated out the process, so this will take care of reading the content from the Tap and filter it, group by and finally write output to sink. Please make sure that we are not hardcoding anything. So we can use any kind of source as long as it satisfies valid format. same for Sink also.

Creating this kind of SubAssemblies will help us to use this wherever we want.


package com.cascading;

import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

public class UserSubAssembly extends SubAssembly {

    private static final long serialVersionUID = 3070079050769273334L;

    public static final String INPUT_PIPE_NAME = "input";

    public static final String OUTPUT_PIPE_NAME = "output";

    public UserSubAssembly() {
        this(new Pipe(INPUT_PIPE_NAME), OUTPUT_PIPE_NAME);
    }

    public UserSubAssembly(Pipe input, String tailName) {

        super();
        Pipe pipe = new Each(input, new Fields("age"),
                new ExpressionFilter("age >= 30", Integer.TYPE));
        pipe = new GroupBy(pipe, new Fields("deptId"));
        pipe = new Every(pipe, new Count());
        pipe = new Pipe(tailName, pipe);
        setTails(pipe);
    }
}

Please refer the below Driver code.



package com.cascading;

import cascading.flow.Flow;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * A Cascading example to read a tsv file file which contains user name, age and dept details and remove the users whose age is more than or equals to 30
 * and do group by deptId and output the deptId and count in a tsv file
 */
public class Main {


    /**
     * This examples uses SubAssembly. ExpressFilter and TapsMap
     *
     * @param args
     */
    public static void main(String[] args) {

        System.out.println("Hdfs Job Starts");
        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);

        //Create the source tap
        Fields fields = new Fields("userName", "age", "deptId");
        Tap inTap = new Hfs(new TextDelimited(fields, true, "\t"), inputPath);

        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(true, "\t"), outputPath, SinkMode.REPLACE);

        //Create input taps map
        Map tapsMap = new HashMap();
        tapsMap.put(UserSubAssembly.INPUT_PIPE_NAME, inTap);

        //SubAssembly
        Pipe pipe = new UserSubAssembly();

        //Create the flow
        Flow flow = new HadoopFlowConnector().connect(tapsMap, outTap, pipe);
        flow.complete();
        System.out.println("Hdfs Job Ends");
    }
}

Please refer the below Github Repo
https://github.com/dkbalachandar/cascading-examples6

Cascading Job with ExpressFilter and ExpressFunctions

A Cascading Job to read a text file which contains user name and age details and remove the users whose age is more than or equals to 30 and also print the content in an output file with some predefined expression which uses expression filter and Expression function



package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.expression.ExpressionFilter;
import cascading.operation.expression.ExpressionFunction;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

import java.util.Properties;

/**
 * A Cascading example to read a text file which contains user name and age details and remove the users whose age is more than or equals to 30
 * and also print the content in an output file with some predefined expression
 */
public class Main {


    /**
     * This examples uses ExpressionFilter and  ExpressionFunction function
     *
     * @param args
     */
    public static void main(String[] args) {

        System.out.println("Hdfs Job Starts");

        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);

        //Create the source tap
        Fields fields = new Fields("userName", "age");
        Tap inTap = new Hfs(new TextDelimited(fields, true, "\t"), inputPath);

        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);

        // Pipe to connect Source and Sink Tap
        Pipe dataPipe = new Pipe("data");
        //Adding the expression filter, if the expression returns true, then that tuple will be removed
        ExpressionFilter filter = new ExpressionFilter("age >= 30", Integer.TYPE);
        dataPipe = new Each(dataPipe, new Fields("userName", "age"), filter);
        //Finally we use the expression function to add some predefined sentences
        String expression = "\"The user name is \" + userName + \" and his age is \" + age";
        ExpressionFunction function = new ExpressionFunction(new Fields(" "), expression, String.class);
        dataPipe = new Each(dataPipe, new Fields("userName", "age"), function);

        //Add the FlowDef and call the process
        HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).setName("Hdfs Job");
        flowConnector.connect(flowDef).complete();
        System.out.println("Hdfs Job Ends");
    }
}


Please refer the below GitHub Repo for the code,
https://github.com/dkbalachandar/cascading-examples5

Cascading with Custom Filter

A cascading job which uses Custom filter and filter the data


package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

import java.util.Properties;

/**
 * A Cascading example to read a text file which contains user name and age details and remove the users whose age is more than or equals to 30
 */
public class Main {

    public static void main(String[] args) {

        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);

        //Create the source tap
        Fields fields = new Fields("userName", "age");
        Tap inTap = new Hfs(new TextDelimited(fields, true, "\t"), inputPath);

        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);

        // Pipe to connect Source and Sink Tap
        Pipe dataPipe = new Each("data", new CustomFilter(Fields.ALL));

        HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).setName("Hdfs Job");
        flowConnector.connect(flowDef).complete();
    }

    /**
     * This custom filter will remove all the users whose age is more than or equals to 30
     */
    public static class CustomFilter extends BaseOperation implements Filter {
        private static final long serialVersionUID = 1L;

        public CustomFilter(Fields fields) {
            super(1, fields);
        }

        @Override
        public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
            TupleEntry arguments = filterCall.getArguments();
            String age = arguments.getString(1).trim();
            return Integer.valueOf(age) >= 30;
        }
    }
}

Please refer the below GitHub Repo for the code,
https://github.com/dkbalachandar/cascading-examples4

Cascading with Custom Function

A cascading job which uses Custom Funcion and filter the data


package com.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

import java.util.Properties;

/**
 * A Cascading example to read a text file and convert the string to upper case letters and write into the output file
 */
public class Main {

    public static void main(String[] args) {

        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);
        //Create the source tap
        Tap inTap = new Hfs(new TextDelimited(new Fields("line"), true, "\t"), inputPath);
        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);

        // Pipe to connect Source and Sink Tap
        Pipe wordsPipe = new Each("words", new UpperCaseFunction(new Fields("line")));
        HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
        Flow flow = flowConnector.connect("Hdfs Job", inTap, outTap, wordsPipe);
        flow.complete();
    }

    public static class UpperCaseFunction extends BaseOperation implements Function {
        private static final long serialVersionUID = 1L;

        public UpperCaseFunction(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            Tuple tuple = new Tuple();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            String original = arguments.getString(0).trim();
            tuple.add(original.toUpperCase());
            functionCall.getOutputCollector().add(tuple);
        }
    }
}

Please refer the below GitHub Repo for the code,
https://github.com/dkbalachandar/cascading-examples3