Cascading Job – Remove Duplicates


package com.cascading;

import cascading.flow.Flow;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Unique;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

import java.util.Properties;

/**
 * A Cascading example to remove duplicates in a text file
 */
public class Main {

    public static void main(String[] args) {

        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);
        Fields users = new Fields("number");
        //Create the source tap
        Tap inTap = new Hfs(new TextDelimited(users, true, "\t"), inputPath);
        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(false, "\t"), outputPath, SinkMode.REPLACE);

        // Pipe to connect Source and Sink Tap
        Pipe numberPipe = new Pipe("number");
        Pipe uniquePipe = new Unique(numberPipe, new Fields("number"));
        HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
        Flow flow = flowConnector.connect("Unique Job", inTap, outTap, uniquePipe);
        flow.complete();
    }
}

Please refer the below GitHub Repo for the code,
https://github.com/dkbalachandar/cascading-examples2

Advertisements

Why Cascading

In the real world, if we want to resolve a big data problem, then we have to think the solution in map reduce pattern. So we have to identify the key, value and sorting everything. Some problems can’t be run parallel it may requires some sequential flows. At that time, the Cascading API will be in handy

Please be aware that under the hood, the cascading program would be converted into Map reduce programs

Source ==> Pipe==> Sink

Source, Sink could be anything, HFS, LFS, HBase

Pipe is set of functions and operations.

It can be compared with Hive/Pig. But these tools are mainly used for Adhoc request. For doing extensive analysis, we should use Cascading API and Hadoop MapReduce

Cascading File copy job

 


package com.cascading;

import cascading.flow.Flow;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;

import java.util.Properties;

public class HdfsFileCopy {

    public static void main(String[] args) {

        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, HdfsFileCopy.class);
        //input tap[Source]
        Tap inTap = new Hfs(new TextLine(), inputPath);
        //Sink tap
        Tap outTap = new Hfs(new TextLine(), outputPath, SinkMode.REPLACE);
        // Pipe to connect Source and Sink Tap
        Pipe copyPipe = new Pipe("copy");
        HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
        Flow flow = flowConnector.connect("File Copy", inTap, outTap, copyPipe);
        flow.complete();
    }
}

Please refer the below GitHub Repo for the code
https://github.com/dkbalachandar/cascading-examples1