Hadoop merge large part files with ‘getmerge’ command

I recently faced an issue while trying to merge large number of part files into a single file. I had used the ‘cat’ command for the merge. My command looks like below,


   hadoop fs -cat ${INPUT_PATH}/part* > ${OUTPUT_FOLDER}/output.xml

Assume that INPUT_PATH is the location of part files and OUTPUT_FOLDER is the output location of the merged file. Note that the part file contains the XML data in it and those are very huge files.

When I ran the above command, I got an error in the middle of the merge process and threw an error with “cat unable to write output stream” message.

I have decided to use getmerge command to get rid of the above error. It works fine without any issues. Check the below command.


   hadoop fs -getmerge ${INPUT_PATH}/part* > ${OUTPUT_FOLDER}/output.xml

Advertisements

Cascading example[Inner Join, Sub Assembly, Operation and Buffer]

In this post, I am going to show an example with Cascading API to perform the inner join on two related dataset

Assume that you have a professor and college details available in a separate XML files and you want to combine both these details and want to generate the consolidated data.

For example, here is our professor.xml file


<professor>
	<pid>PROF-100</pid>
	<pfirstname>John</pfirstname>
	<plastname>Turner</plastname>
	<college>
		<id>COL-100</id>
	</college>
</professor>

The college.xml is given below,


<college>
	<id>COL-100</id>
	<name>Ohio State University</name>
	<location>Ohio</location>
</college>

And You want to get the consolidated XML file as below,


<professor>
	<pid>PROF-100</pid>
	<pfirstname>John</pfirstname>
	<plastname>Turner</plastname>
	<college>
		<id>COL-100</id>
		<name>Ohio State University</name>
		<location>Ohio</location>
	</college>
</professor>

I have created three Sub assemblies here, ProfessorDataAssembly to extract out the Professor data, CollegeDataAssembly to extract out the College data and finally ProfessorCollegeJoinAssembly to combine both these data and generate the consolidated XML with ProfessorCollegeBuffer

Please refer the below code to know how i have done it.

ProfessorCollegeDtailsJob.java


package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
import com.cascading.assembly.ProfessorDataAssembly;
import com.cascading.assembly.ProfessorCollegeJoinAssembly;
import com.cascading.assembly.CollegeDataAssembly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/**
 * This cascading job is used to read Professor and College details and do inner join and
 * create a consolidated XML which has professor and college information
 */
public class ProfessorCollegeDtailsJob {

    public static Logger LOGGER = LoggerFactory.getLogger(ProfessorCollegeDtailsJob.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
            LOGGER.info("Usage ProfessorCollegeDtailsJob <PROFESSOR XML FILE PATH> " +
                    "<COLLEGE XML FILE PATH> <OUTPUT FILE PATH>");
            return;
        }
        //input paths & output path
        String professorDataPath = args[0];
        String collegeDataPath = args[1];
        String outputPath = args[2];

        LOGGER.info("professorDataPath:{}", professorDataPath);
        LOGGER.info("studentDataPath:{}", collegeDataPath);
        LOGGER.info("outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, ProfessorCollegeDtailsJob.class);

        //Source and Sink Tap. Use Hfs for running in hadoop. if you are testing in local, then use FileTap
        Tap professorInTap = new FileTap(new TextLine(new Fields("line")), professorDataPath);
        Tap collegeInTap = new FileTap(new TextLine(new Fields("line")), collegeDataPath);
        Tap outTap = new FileTap(new TextLine(), outputPath, SinkMode.REPLACE);

        //Here goes the Pipe and assemblies
        Pipe professorPipe = new Pipe("professor");
        professorPipe = new ProfessorDataAssembly(professorPipe);
        Pipe collegePipe = new Pipe("college");
        collegePipe = new CollegeDataAssembly(collegePipe);

        Pipe profCollegePipe = new ProfessorCollegeJoinAssembly(professorPipe, collegePipe);
        //Use LocalFlowConnector if you are testing in local
        //Use HadoopFlowConnector if you are running in Hadoop
        LocalFlowConnector flowConnector = new LocalFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource("professor", professorInTap).
                addSource("college", collegeInTap).addTailSink(profCollegePipe, outTap).
                setName("Professor College Details Job");

        flowConnector.connect(flowDef).complete();
    }
}

CollegeDataAssembly.java


package com.cascading.assembly;

import cascading.operation.xml.XPathParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * This class is used to extract out the college data information
 */

public class CollegeDataAssembly extends SubAssembly {

    public CollegeDataAssembly(Pipe input) {
        super(input);
        input = new Each(input, new Fields("line"),
                new XPathParser(new Fields("collegeid", "collegename", "collegelocation"),
                        "/college/id", "/college/name", "/college/location"));
        setTails(input);
    }
}

ProfessorDataAssembly.java


package com.cascading.assembly;

import cascading.operation.xml.XPathParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tuple.Fields;

/**
 * This class is used to extract out the professor data information
 */

public class ProfessorDataAssembly extends SubAssembly {

    private static Fields fields = new Fields("pid", "pfirstname", "plastname", "profcollegeid");

    public ProfessorDataAssembly(Pipe input) {
        super(input);
        input = new Each(input, new Fields("line"),
                new XPathParser(fields,
                        "/professor/pid",
                        "/professor/pfirstname",
                        "/professor/plastname",
                        "/professor/college/id"));
        setTails(input);
    }
}


ProfessorCollegeJoinAssembly.java


package com.cascading.assembly;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.pipe.joiner.InnerJoin;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;

/**
 * This will do the inner job on professor and college data and create a consolidated XML
 *
 */
public class ProfessorCollegeJoinAssembly extends SubAssembly {

    public static Logger LOGGER = LoggerFactory.getLogger(ProfessorCollegeJoinAssembly.class);

    Fields fields = new Fields("professor");

    public ProfessorCollegeJoinAssembly(Pipe professorPipe, Pipe collegePipe) {
        super(professorPipe, collegePipe);
        professorPipe = new Each(professorPipe, new PrintDataFunction(Fields.ALL));
        collegePipe = new Each(collegePipe, new PrintDataFunction(Fields.ALL));
        Pipe profCollegePipe = new CoGroup("profCollegePipe",
                professorPipe,
                new Fields("profcollegeid"), collegePipe,
                new Fields("collegeid"),
                new InnerJoin());
        profCollegePipe = new Every(profCollegePipe, new ProfessorCollegeBuffer(new Fields("professor")), fields);
        setTails(profCollegePipe);
    }

    public static class PrintDataFunction extends BaseOperation implements Function {

        private static final long serialVersionUID = -5108505951262118306L;

        public PrintDataFunction(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            Iterator itr = arguments.getTuple().iterator();
            while (itr.hasNext()) {
                LOGGER.info((String) itr.next());
            }
            functionCall.getOutputCollector().add(arguments.getTuple());
        }
    }
}

ProfessorCollegeBuffer.java


package com.cascading.assembly;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

import java.util.Iterator;

/**
 * Create the consolidated output xml
 */

public class ProfessorCollegeBuffer extends BaseOperation implements Buffer {

    public ProfessorCollegeBuffer(Fields outputFieldName) {
        super(outputFieldName);
    }

    @Override
    public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
        BufferCall bufferCall = (BufferCall) operationCall;
        bufferCall.setRetainValues(true);
    }

    @Override
    public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
        Iterator iter = bufferCall.getArgumentsIterator();
        Tuple innerTuple = new Tuple();

        while (iter.hasNext()) {
            TupleEntry entry = iter.next();
            Tuple output = new Tuple();
            output.add("<professor>");
            output.add(entry.getString("pid"));
            output.add(entry.getString("pfirstname"));
            output.add(entry.getString("plastname"));
            output.add("<college>");
            output.add(entry.getString("collegeid"));
            output.add(entry.getString("collegename"));
            output.add(entry.getString("collegelocation"));
            output.add("</college>");
            output.add("</professor>");
            innerTuple.add(output);
        }
        Tuple outputTuple = new Tuple();
        outputTuple.add(innerTuple);
        bufferCall.getOutputCollector().add(outputTuple);
    }
}

Cascading Job to remove Empty Tags from a XML file

The below cascading job is used to remove the empty tags found in a XML file.


package com.cascading;

import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

//import cascading.flow.local.LocalFlowConnector;
//import cascading.scheme.local.TextDelimited;
//import cascading.scheme.local.TextLine;
//import cascading.tap.local.FileTap;

/**
 * Cascading job to replace the empty html tags found in a XML file
 */

public class CascadingEmptyTagReplacer {

    public static Logger LOGGER = LoggerFactory.getLogger(CascadingEmptyTagReplacer.class);

    public static void main(String[] args) {

        if (args.length <= 0) {
            LOGGER.info("Usage CascadingEmptyTagReplacer <INPUT> <OUTPUT>");
            return;
        }
        //input path & output path
        String inputPath = args[0];
        String outputPath = args[1];

        LOGGER.info("inputPath:{}", inputPath);
        LOGGER.info("outputPath:{}", outputPath);

        //Set the application JAR class
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, CascadingEmptyTagReplacer.class);

        //Source and Sink Tap. Use Hfs. if you are testing in local, then use FileTap
        Tap inTap = new Hfs(new TextLine(new Fields("line")), inputPath);
        Tap outTap = new Hfs(new TextDelimited(new Fields("line")), outputPath, SinkMode.REPLACE);

        Pipe input = new Pipe("input");
        Pipe dataPipe = new Each(input, new EmptyTagReplacer(Fields.ALL));

        //Use LocalFlowConnector if you are testing local
        HadoopFlowConnector flowConnector = new HadoopFlowConnector();
        FlowDef flowDef = FlowDef.flowDef().addSource(dataPipe, inTap).addTailSink(dataPipe, outTap).
                setName("Cascading Empty Tag Replacer Job");
        flowConnector.connect(flowDef).complete();
    }

    /**
     * Custom Function to replace Empty tags in the XML content
     */
    public static class EmptyTagReplacer extends BaseOperation implements Function {

        private static final long serialVersionUID = -5108505951262118306L;

        private static List tags = Arrays.asList("<sub/>", "<sup/>", "<b/>", "<i/>");

        public EmptyTagReplacer(Fields fields) {
            super(1, fields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();
            if (arguments == null || arguments.getString(0) == null) {
                return;
            }
            Tuple tuple = new Tuple();
            String xmlData = arguments.getTuple().getString(0);
            for (String tag : tags) {
                xmlData = xmlData.replace(tag, "");
            }
            tuple.add(xmlData);
            functionCall.getOutputCollector().add(tuple);
        }
    }
}


Apache Pig vs Apache Hive

Both are very useful for anyone who is not well versed with Java. By using these, we can easily create Map Reduce jobs without worrying about the complexities of Map Reducer tasks internal structure. But these are some differences between these two.

Apache Pig Apache Hive
Pig was developed by Yahoo Hive was developed by Facebook
It uses a language called Pig Latin It uses a query language HiveQL
Pig Latin is a procedural language and the syntax is similar to SQL HiveQL is a declarative language
It can handle structured, unstructured, and semi-structured data It can handle structured data alone

Hadoop Map Reduce Output

During the Map Reduce job run,
The map tasks output would be written into local file system of each node where the map task is running. This will be removed once the job is completed. The location of this directory is defined using this property mapreduce.cluster.local.dir

The reduce tasks output would be persisted into HDFS and followed by necessary replication will happen

Hadoop Reducer – Configuration

If we dont set the number of reducer tasks in our driver class, then by default, it will be assumed as 1 and run the job.

If we dont set the reducer class in our driver class,then the IdentityReducer will be taken by default and it will just do sorting and shuffling and produce the results in a single output file.

If we set the number of reducer tasks as 0, then no reducer tasks will be run and the map output will be the final output and would be written into HDFS.