Cascading SubAssembly Example

Cascading SubAssemblies are reusable pipe assemblies that are linked into larger pipe assemblies. Think of them as subroutines in a programming language. This helps commonly used pipe assemblies to be packaged into libraries and those can be used in the complex flow

I have created a cascading job to do the below process

Read a tsv[tab separated text file] file which contains the user name, age and dept details. Assume that we want to remove the users whose age is more than or equals to 30. Then group by deptId and output the deptId and count in a tsv file

This is a simple job for only example purpose. In the real scenario, we may end up with different problems.

Refer the below SubAssembly code. Here we have separated out the process, so this will take care of reading the content from the Tap and filter it, group by and finally write output to sink. Please make sure that we are not hardcoding anything. So we can use any kind of source as long as it satisfies valid format. same for Sink also.

Creating this kind of SubAssemblies will help us to use this wherever we want.

package com.cascading;

import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

public class UserSubAssembly extends SubAssembly {

    private static final long serialVersionUID = 3070079050769273334L;

    public static final String INPUT_PIPE_NAME = "input";

    public static final String OUTPUT_PIPE_NAME = "output";

    public UserSubAssembly() {
        this(new Pipe(INPUT_PIPE_NAME), OUTPUT_PIPE_NAME);

    public UserSubAssembly(Pipe input, String tailName) {

        Pipe pipe = new Each(input, new Fields("age"),
                new ExpressionFilter("age >= 30", Integer.TYPE));
        pipe = new GroupBy(pipe, new Fields("deptId"));
        pipe = new Every(pipe, new Count());
        pipe = new Pipe(tailName, pipe);

Please refer the below Driver code.

package com.cascading;

import cascading.flow.Flow;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

 * A Cascading example to read a tsv file file which contains user name, age and dept details and remove the users whose age is more than or equals to 30
 * and do group by deptId and output the deptId and count in a tsv file
public class Main {

     * This examples uses SubAssembly. ExpressFilter and TapsMap
     * @param args
    public static void main(String[] args) {

        System.out.println("Hdfs Job Starts");
        //input and output path
        String inputPath = args[0];
        String outputPath = args[1];

        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);

        //Create the source tap
        Fields fields = new Fields("userName", "age", "deptId");
        Tap inTap = new Hfs(new TextDelimited(fields, true, "\t"), inputPath);

        //Create the sink tap
        Tap outTap = new Hfs(new TextDelimited(true, "\t"), outputPath, SinkMode.REPLACE);

        //Create input taps map
        Map tapsMap = new HashMap();
        tapsMap.put(UserSubAssembly.INPUT_PIPE_NAME, inTap);

        Pipe pipe = new UserSubAssembly();

        //Create the flow
        Flow flow = new HadoopFlowConnector().connect(tapsMap, outTap, pipe);
        System.out.println("Hdfs Job Ends");

Please refer the below Github Repo


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s