In this post, I am going to show an example with Cascading API to perform the inner join on two related dataset
Assume that you have a professor and college details available in a separate XML files and you want to combine both these details and want to generate the consolidated data.
For example, here is our professor.xml file
<professor>
<pid>PROF-100</pid>
<pfirstname>John</pfirstname>
<plastname>Turner</plastname>
<college>
<id>COL-100</id>
</college>
</professor>
The college.xml is given below,
<college>
<id>COL-100</id>
<name>Ohio State University</name>
<location>Ohio</location>
</college>
And You want to get the consolidated XML file as below,
<professor>
<pid>PROF-100</pid>
<pfirstname>John</pfirstname>
<plastname>Turner</plastname>
<college>
<id>COL-100</id>
<name>Ohio State University</name>
<location>Ohio</location>
</college>
</professor>
I have created three Sub assemblies here, ProfessorDataAssembly to extract out the Professor data, CollegeDataAssembly to extract out the College data and finally ProfessorCollegeJoinAssembly to combine both these data and generate the consolidated XML with ProfessorCollegeBuffer
Please refer the below code to know how i have done it.
ProfessorCollegeDtailsJob.java
package com.cascading;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
import com.cascading.assembly.ProfessorDataAssembly;
import com.cascading.assembly.ProfessorCollegeJoinAssembly;
import com.cascading.assembly.CollegeDataAssembly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
/**
* This cascading job is used to read Professor and College details and do inner join and
* create a consolidated XML which has professor and college information
*/
public class ProfessorCollegeDtailsJob {
public static Logger LOGGER = LoggerFactory.getLogger(ProfessorCollegeDtailsJob.class);
public static void main(String[] args) {
if (args.length <= 0) {
LOGGER.info("Usage ProfessorCollegeDtailsJob <PROFESSOR XML FILE PATH> " +
"<COLLEGE XML FILE PATH> <OUTPUT FILE PATH>");
return;
}
//input paths & output path
String professorDataPath = args[0];
String collegeDataPath = args[1];
String outputPath = args[2];
LOGGER.info("professorDataPath:{}", professorDataPath);
LOGGER.info("studentDataPath:{}", collegeDataPath);
LOGGER.info("outputPath:{}", outputPath);
//Set the application JAR class
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties, ProfessorCollegeDtailsJob.class);
//Source and Sink Tap. Use Hfs for running in hadoop. if you are testing in local, then use FileTap
Tap professorInTap = new FileTap(new TextLine(new Fields("line")), professorDataPath);
Tap collegeInTap = new FileTap(new TextLine(new Fields("line")), collegeDataPath);
Tap outTap = new FileTap(new TextLine(), outputPath, SinkMode.REPLACE);
//Here goes the Pipe and assemblies
Pipe professorPipe = new Pipe("professor");
professorPipe = new ProfessorDataAssembly(professorPipe);
Pipe collegePipe = new Pipe("college");
collegePipe = new CollegeDataAssembly(collegePipe);
Pipe profCollegePipe = new ProfessorCollegeJoinAssembly(professorPipe, collegePipe);
//Use LocalFlowConnector if you are testing in local
//Use HadoopFlowConnector if you are running in Hadoop
LocalFlowConnector flowConnector = new LocalFlowConnector();
FlowDef flowDef = FlowDef.flowDef().addSource("professor", professorInTap).
addSource("college", collegeInTap).addTailSink(profCollegePipe, outTap).
setName("Professor College Details Job");
flowConnector.connect(flowDef).complete();
}
}
CollegeDataAssembly.java
package com.cascading.assembly;
import cascading.operation.xml.XPathParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;
/**
* This class is used to extract out the college data information
*/
public class CollegeDataAssembly extends SubAssembly {
public CollegeDataAssembly(Pipe input) {
super(input);
input = new Each(input, new Fields("line"),
new XPathParser(new Fields("collegeid", "collegename", "collegelocation"),
"/college/id", "/college/name", "/college/location"));
setTails(input);
}
}
ProfessorDataAssembly.java
package com.cascading.assembly;
import cascading.operation.xml.XPathParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;
/**
* This class is used to extract out the professor data information
*/
public class ProfessorDataAssembly extends SubAssembly {
private static Fields fields = new Fields("pid", "pfirstname", "plastname", "profcollegeid");
public ProfessorDataAssembly(Pipe input) {
super(input);
input = new Each(input, new Fields("line"),
new XPathParser(fields,
"/professor/pid",
"/professor/pfirstname",
"/professor/plastname",
"/professor/college/id"));
setTails(input);
}
}
ProfessorCollegeJoinAssembly.java
package com.cascading.assembly;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.pipe.joiner.InnerJoin;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
/**
* This will do the inner job on professor and college data and create a consolidated XML
*
*/
public class ProfessorCollegeJoinAssembly extends SubAssembly {
public static Logger LOGGER = LoggerFactory.getLogger(ProfessorCollegeJoinAssembly.class);
Fields fields = new Fields("professor");
public ProfessorCollegeJoinAssembly(Pipe professorPipe, Pipe collegePipe) {
super(professorPipe, collegePipe);
professorPipe = new Each(professorPipe, new PrintDataFunction(Fields.ALL));
collegePipe = new Each(collegePipe, new PrintDataFunction(Fields.ALL));
Pipe profCollegePipe = new CoGroup("profCollegePipe",
professorPipe,
new Fields("profcollegeid"), collegePipe,
new Fields("collegeid"),
new InnerJoin());
profCollegePipe = new Every(profCollegePipe, new ProfessorCollegeBuffer(new Fields("professor")), fields);
setTails(profCollegePipe);
}
public static class PrintDataFunction extends BaseOperation implements Function {
private static final long serialVersionUID = -5108505951262118306L;
public PrintDataFunction(Fields fields) {
super(1, fields);
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
TupleEntry arguments = functionCall.getArguments();
if (arguments == null || arguments.getString(0) == null) {
return;
}
Iterator itr = arguments.getTuple().iterator();
while (itr.hasNext()) {
LOGGER.info((String) itr.next());
}
functionCall.getOutputCollector().add(arguments.getTuple());
}
}
}
ProfessorCollegeBuffer.java
package com.cascading.assembly;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.util.Iterator;
/**
* Create the consolidated output xml
*/
public class ProfessorCollegeBuffer extends BaseOperation implements Buffer {
public ProfessorCollegeBuffer(Fields outputFieldName) {
super(outputFieldName);
}
@Override
public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
BufferCall bufferCall = (BufferCall) operationCall;
bufferCall.setRetainValues(true);
}
@Override
public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
Iterator iter = bufferCall.getArgumentsIterator();
Tuple innerTuple = new Tuple();
while (iter.hasNext()) {
TupleEntry entry = iter.next();
Tuple output = new Tuple();
output.add("<professor>");
output.add(entry.getString("pid"));
output.add(entry.getString("pfirstname"));
output.add(entry.getString("plastname"));
output.add("<college>");
output.add(entry.getString("collegeid"));
output.add(entry.getString("collegename"));
output.add(entry.getString("collegelocation"));
output.add("</college>");
output.add("</professor>");
innerTuple.add(output);
}
Tuple outputTuple = new Tuple();
outputTuple.add(innerTuple);
bufferCall.getOutputCollector().add(outputTuple);
}
}