Cascading example[Inner Join, Sub Assembly, Operation and Buffer]

In this post, I am going to show an example with Cascading API to perform the inner join on two related dataset

Assume that you have a professor and college details available in a separate XML files and you want to combine both these details and want to generate the consolidated data.

For example, here is our professor.xml file


<professor>
	<pid>PROF-100</pid>
	<pfirstname>John</pfirstname>
	<plastname>Turner</plastname>
	<college>
		<id>COL-100</id>
	</college>
</professor>

The college.xml is given below,


<college>
	<id>COL-100</id>
	<name>Ohio State University</name>
	<location>Ohio</location>
</college>

And You want to get the consolidated XML file as below,


<professor>
	<pid>PROF-100</pid>
	<pfirstname>John</pfirstname>
	<plastname>Turner</plastname>
	<college>
		<id>COL-100</id>
		<name>Ohio State University</name>
		<location>Ohio</location>
	</college>
</professor>

I have created three Sub assemblies here, ProfessorDataAssembly to extract out the Professor data, CollegeDataAssembly to extract out the College data and finally ProfessorCollegeJoinAssembly to combine both these data and generate the consolidated XML with ProfessorCollegeBuffer

Please refer the below code to know how i have done it.

ProfessorCollegeDtailsJob.java


package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
import com.cascading.assembly.ProfessorDataAssembly;
import com.cascading.assembly.ProfessorCollegeJoinAssembly;
import com.cascading.assembly.CollegeDataAssembly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/**
 * This cascading job is used to read Professor and College details and do inner join and
 * create a consolidated XML which has professor and college information
 */
public class ProfessorCollegeDtailsJob {

    public static Logger LOGGER = LoggerFactory.getLogger(ProfessorCollegeDtailsJob.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
            LOGGER.info("Usage ProfessorCollegeDtailsJob <PROFESSOR XML FILE PATH> " +
                    "<COLLEGE XML FILE PATH> <OUTPUT FILE PATH>");
            return;
        }
        //input paths & output path
        String professorDataPath = args[0];
        String collegeDataPath = args[1];
        String outputPath = args[2];

        LOGGER.info("professorDataPath:{}", professorDataPath);
        LOGGER.info("studentDataPath:{}", collegeDataPath);
        LOGGER.info("outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, ProfessorCollegeDtailsJob.class);

        //Source and Sink Tap. Use Hfs for running in hadoop. if you are testing in local, then use FileTap
        Tap professorInTap = new FileTap(new TextLine(new Fields("line")), professorDataPath);
        Tap collegeInTap = new FileTap(new TextLine(new Fields("line")), collegeDataPath);
        Tap outTap = new FileTap(new TextLine(), outputPath, SinkMode.REPLACE);

        //Here goes the Pipe and assemblies
        Pipe professorPipe = new Pipe("professor");
        professorPipe = new ProfessorDataAssembly(professorPipe);
        Pipe collegePipe = new Pipe("college");
        collegePipe = new CollegeDataAssembly(collegePipe);

        Pipe profCollegePipe = new ProfessorCollegeJoinAssembly(professorPipe, collegePipe);
        //Use LocalFlowConnector if you are testing in local
        //Use HadoopFlowConnector if you are running in Hadoop
        LocalFlowConnector flowConnector = new LocalFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource("professor", professorInTap).
                addSource("college", collegeInTap).addTailSink(profCollegePipe, outTap).
                setName("Professor College Details Job");

        flowConnector.connect(flowDef).complete();
    }
}

CollegeDataAssembly.java


package com.cascading.assembly;

import cascading.operation.xml.XPathParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * This class is used to extract out the college data information
 */

public class CollegeDataAssembly extends SubAssembly {

    public CollegeDataAssembly(Pipe input) {
        super(input);
        input = new Each(input, new Fields("line"),
                new XPathParser(new Fields("collegeid", "collegename", "collegelocation"),
                        "/college/id", "/college/name", "/college/location"));
        setTails(input);
    }
}

ProfessorDataAssembly.java


package com.cascading.assembly;

import cascading.operation.xml.XPathParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * This class is used to extract out the professor data information
 */

public class ProfessorDataAssembly extends SubAssembly {

    private static Fields fields = new Fields("pid", "pfirstname", "plastname", "profcollegeid");

    public ProfessorDataAssembly(Pipe input) {
        super(input);
        input = new Each(input, new Fields("line"),
                new XPathParser(fields,
                        "/professor/pid",
                        "/professor/pfirstname",
                        "/professor/plastname",
                        "/professor/college/id"));
        setTails(input);
    }
}


ProfessorCollegeJoinAssembly.java


package com.cascading.assembly;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.pipe.joiner.InnerJoin;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;

/**
 * This will do the inner job on professor and college data and create a consolidated XML
 *
 */
public class ProfessorCollegeJoinAssembly extends SubAssembly {

    public static Logger LOGGER = LoggerFactory.getLogger(ProfessorCollegeJoinAssembly.class);

    Fields fields = new Fields("professor");

    public ProfessorCollegeJoinAssembly(Pipe professorPipe, Pipe collegePipe) {
        super(professorPipe, collegePipe);
        professorPipe = new Each(professorPipe, new PrintDataFunction(Fields.ALL));
        collegePipe = new Each(collegePipe, new PrintDataFunction(Fields.ALL));
        Pipe profCollegePipe = new CoGroup("profCollegePipe",
                professorPipe,
                new Fields("profcollegeid"), collegePipe,
                new Fields("collegeid"),
                new InnerJoin());
        profCollegePipe = new Every(profCollegePipe, new ProfessorCollegeBuffer(new Fields("professor")), fields);
        setTails(profCollegePipe);
    }

    public static class PrintDataFunction extends BaseOperation implements Function {

        private static final long serialVersionUID = -5108505951262118306L;

        public PrintDataFunction(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            Iterator itr = arguments.getTuple().iterator();
            while (itr.hasNext()) {
                LOGGER.info((String) itr.next());
            }
            functionCall.getOutputCollector().add(arguments.getTuple());
        }
    }
}

ProfessorCollegeBuffer.java


package com.cascading.assembly;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

import java.util.Iterator;

/**
 * Create the consolidated output xml
 */

public class ProfessorCollegeBuffer extends BaseOperation implements Buffer {

    public ProfessorCollegeBuffer(Fields outputFieldName) {
        super(outputFieldName);
    }

    @Override
    public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
        BufferCall bufferCall = (BufferCall) operationCall;
        bufferCall.setRetainValues(true);
    }

    @Override
    public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
        Iterator iter = bufferCall.getArgumentsIterator();
        Tuple innerTuple = new Tuple();

        while (iter.hasNext()) {
            TupleEntry entry = iter.next();
            Tuple output = new Tuple();
            output.add("<professor>");
            output.add(entry.getString("pid"));
            output.add(entry.getString("pfirstname"));
            output.add(entry.getString("plastname"));
            output.add("<college>");
            output.add(entry.getString("collegeid"));
            output.add(entry.getString("collegename"));
            output.add(entry.getString("collegelocation"));
            output.add("</college>");
            output.add("</professor>");
            innerTuple.add(output);
        }
        Tuple outputTuple = new Tuple();
        outputTuple.add(innerTuple);
        bufferCall.getOutputCollector().add(outputTuple);
    }
}

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s