Cascading with Custom Filter

A cascading job which uses Custom filter and filter the data

package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

import java.util.Properties;

 * A Cascading example to read a text file which contains user name and age details and remove the users whose age is more than or equals to 30
public class Main {

    public static void main(String[] args) {

        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);

        //Create the source tap
        Fields fields = new Fields("userName", "age");
        Tap inTap = new Hfs(new TextDelimited(fields, true, "\t"), inputPath);

        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);

        // Pipe to connect Source and Sink Tap
        Pipe dataPipe = new Each("data", new CustomFilter(Fields.ALL));

        HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).setName("Hdfs Job");

     * This custom filter will remove all the users whose age is more than or equals to 30
    public static class CustomFilter extends BaseOperation implements Filter {
        private static final long serialVersionUID = 1L;

        public CustomFilter(Fields fields) {
            super(1, fields);

        public boolean isRemove(FlowProcess flowProcess, FilterCall filterCall) {
            TupleEntry arguments = filterCall.getArguments();
            String age = arguments.getString(1).trim();
            return Integer.valueOf(age) >= 30;

Please refer the below GitHub Repo for the code,


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s