Quick Inquiry

Home >> Articles >> What Is The Use Of Multiple Input Files In Mapreduce Hadoop Development?

Here Hadoop development experts will make you understand the concept of multiple input files required in Hadoop MapReduce. As a mapper extracts its input from the input file, if there are multiple input files, developers will require same amount of mapper to read records from input files. In this story, professionals are making use of two input files with two mapper classes and a reducer. Read the complete story to know more.

Consider a simple hypothetical case where you want to load the data from various different data sources havingdifferent format/separator.

For the above simple case we are assuming that we do have 2 different files of users with different records. The files also have different separator.

Now, to solve this type of problem we can use Multiple Inputs from hadoop.


How to solve this problem?

To solve this type of problem we can take below approach,

This is also an example of Reducer side join in hadoop mapreduce.

Though the solution is a left outer join, you can tweak this code to perform any type of join as you wish.


Environment:

Java : 1.7.0_75
Hadoop : 1.0.4

Sample Input Files:

Sample input file 1 (TAB separated file)


Sample input file 2 (“;”separated file)


I used the below code to solve this problem.

MultipleInputMapper1.java

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MultipleInputMapper1 extends Mapper<LongWritable, Text, LongWritable, Text> { private static String separator; private static String commonSeparator; private static String FILE_TAG="F1"; public void setup(Context context) { Configuration configuration = context.getConfiguration(); //Retrieving the file separator from context for file1. separator = configuration.get("Separator.File1"); //Retrieving the file separator from context for writing the data to reducer. commonSeparator=configuration.get("Separator.Common"); } @Override public void map(LongWritable rowKey, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split(separator); StringBuilder stringBuilder = new StringBuilder(); for(int index=1;index<values.length;index++) { stringBuilder.append(values[index]+commonSeparator); } if(values[0] != null && !"NULL".equalsIgnoreCase(values[0])) { context.write(new LongWritable(Long.parseLong(values[0])), new Text(FILE_TAG+commonSeparator+stringBuilder.toString())); } } }

MultipleInputMapper2.java

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MultipleInputMapper2 extends Mapper<LongWritable, Text, LongWritable, Text> { private static String separator; private static String commonSeparator; private static String FILE_TAG="F2"; public void setup(Context context) { Configuration configuration = context.getConfiguration(); //Retrieving the file separator from context for file2. separator = configuration.get("Separator.File2"); //Retrieving the file separator from context for writing the data to reducer. commonSeparator=configuration.get("Separator.Common"); } @Override public void map(LongWritable rowKey, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split(separator); StringBuilder stringBuilder = new StringBuilder(); for(int index=1;index<values.length;index++) { stringBuilder.append(values[index]+commonSeparator); } if(values[0] != null && !"NULL".equalsIgnoreCase(values[0])) { context.write(new LongWritable(Long.parseLong(values[0])), new Text(FILE_TAG+commonSeparator+stringBuilder.toString())); } } }

MultipleInputDriver.java

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MultipleInputDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { int status = -1; Configuration configuration = getConf(); //Setting the Separator for parsing TAB separated input file 1 for MultipleInputMapper1 configuration.set("Separator.File1", "\t"); //Setting the separator for parsing ";" separated input file 2 for MultipleInputMapper2 configuration.set("Separator.File2", ";"); //Setting the separator for Reducer for saving the file into TAB separated output file. configuration.set("Separator.Common", "\t"); Job job = new Job(configuration, "Multiple Input Example"); //TAB separated input File 1 MultipleInputs.addInputPath(job,new Path(args[0]), TextInputFormat.class, MultipleInputMapper1.class); //";" separated input file 2 MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MultipleInputMapper2.class); job.setJarByClass(MultipleInputDriver.class); job.setReducerClass(MultipleInputReducer.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.waitForCompletion(true); status = job.isSuccessful()? 0:-1; return status; } public static void main(String [] args) { int result; try{ result= ToolRunner.run(new Configuration(), new MultipleInputDriver(), args); if(0 == result) { System.out.println("Job completed Successfully..."); } else { System.out.println("Job Failed..."); } } catch(Exception exception) { exception.printStackTrace(); } } }

MultipleInputReducer.java

import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MultipleInputReducer extends Reducer<LongWritable, Text, LongWritable, Text> { private static String commonSeparator; public void setup(Context context) { Configuration configuration = context.getConfiguration(); //Retrieving the common file separator from context for output file. commonSeparator=configuration.get("Separator.Common"); } @Override public void reduce(LongWritable key, Iterable<Text> textValues, Context context) throws IOException, InterruptedException { StringBuilder stringBuilder = new StringBuilder(); String[] firstFileValues=null, secondFileValues=null; String[] stringValues; for (Text textValue : textValues) { stringValues = textValue.toString().split(commonSeparator); if("F1".equalsIgnoreCase(stringValues[0])) { // firstFileValues = stringValues; firstFileValues=Arrays.copyOf(stringValues, stringValues.length); } if("F2".equalsIgnoreCase(stringValues[0])) { secondFileValues = Arrays.copyOf(stringValues, stringValues.length); } } if(firstFileValues != null) { for(int index=1;index<firstFileValues.length;index++) { stringBuilder.append(firstFileValues[index]+commonSeparator); } } if(secondFileValues != null) { for(int index=1;index<secondFileValues.length;index++) { stringBuilder.append(secondFileValues[index]+commonSeparator); } } context.write(key, new Text(stringBuilder.toString())); } }

After running the code, the output file looks like,


In this case, we can see that the record 7 is having values in File 1 only while there is no value for record 7 in file 2 hence it performs a left join.


Tweaks & Tricks:

You can also use this code as a template for performing any of joins in hadoop mapreduce by just altering small amount of code.


Code Walk Through:

And it’s done. You can start using multiple input files in MapReduce as per the need.

Hadoop development experts are here to help you out. Just mention the problem you are facing while using them in the comments and experts will answer it soon.

I hope this program will help you to understand Multiple inputs and left join in hadoop mapreduce.


For further information, mail us at info@aegissofttech.com


Copyright © 2017 - aegisisc, All rights reserved