Cascading with Custom Function

A cascading job which uses Custom Funcion and filter the data


package com.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

import java.util.Properties;

/**
 * A Cascading example to read a text file and convert the string to upper case letters and write into the output file
 */
public class Main {

    public static void main(String[] args) {

        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);
        //Create the source tap
        Tap inTap = new Hfs(new TextDelimited(new Fields("line"), true, "\t"), inputPath);
        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);

        // Pipe to connect Source and Sink Tap
        Pipe wordsPipe = new Each("words", new UpperCaseFunction(new Fields("line")));
        HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
        Flow flow = flowConnector.connect("Hdfs Job", inTap, outTap, wordsPipe);
        flow.complete();
    }

    public static class UpperCaseFunction extends BaseOperation implements Function {
        private static final long serialVersionUID = 1L;

        public UpperCaseFunction(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            Tuple tuple = new Tuple();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            String original = arguments.getString(0).trim();
            tuple.add(original.toUpperCase());
            functionCall.getOutputCollector().add(tuple);
        }
    }
}

Please refer the below GitHub Repo for the code,
https://github.com/dkbalachandar/cascading-examples3

Advertisements

Cascading Job – Remove Duplicates


package com.cascading;

import cascading.flow.Flow;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Unique;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

import java.util.Properties;

/**
 * A Cascading example to remove duplicates in a text file
 */
public class Main {

    public static void main(String[] args) {

        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);
        Fields users = new Fields("number");
        //Create the source tap
        Tap inTap = new Hfs(new TextDelimited(users, true, "\t"), inputPath);
        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);

        // Pipe to connect Source and Sink Tap
        Pipe numberPipe = new Pipe("number");
        Pipe uniquePipe = new Unique(numberPipe, new Fields("number"));
        HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
        Flow flow = flowConnector.connect("Unique Job", inTap, outTap, uniquePipe);
        flow.complete();
    }
}

Please refer the below GitHub Repo for the code,
https://github.com/dkbalachandar/cascading-examples2

Why Cascading

In the real world, if we want to resolve a big data problem, then we have to think the solution in map reduce pattern. So we have to identify the key, value and sorting everything. Some problems can’t be run parallel it may requires some sequential flows. At that time, the Cascading API will be in handy

Please be aware that under the hood, the cascading program would be converted into Map reduce programs

Source ==> Pipe==> Sink

Source, Sink could be anything, HFS, LFS, HBase

Pipe is set of functions and operations.

It can be compared with Hive/Pig. But these tools are mainly used for Adhoc request. For doing extensive analysis, we should use Cascading API and Hadoop MapReduce