Map Side Join
package MapJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MapJoin {
static class mapper extends Mapper<LongWritable, Text, Text, Text> {
private Map<String, String> Table1Map =
new HashMap<String, String>();
protected void setup(Context context)
throws IOException {
URI[] paths = context.getCacheFiles();
Configuration conf =
new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataInputStream fsr = fs.open(
new Path(paths[
0].toString()));
String line =
null;
try {
while ((line = fsr.readLine().toString()) !=
null) {
String[] vals = line.split(
"\\t");
if (vals.length ==
2) {
Table1Map.put(vals[
0], vals[
1]);
}
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
fsr.close();
}
}
protected void map(LongWritable key, Text val, Context context)
throws IOException, InterruptedException {
String[] vals = val.toString().split(
"\\t");
if (vals.length ==
3) {
String Table1Vals = Table1Map.get(vals[
0]);
Table1Vals = (Table1Vals ==
null) ?
"" : Table1Vals;
context.write(
new Text(vals[
0]),
new Text(Table1Vals +
"\t"
+ vals[
1] +
"\t" + vals[
2]));
}
}
}
public static void main(String[] args)
throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf =
new Configuration();
String[] otherArgs =
new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length !=
3) {
System.err
.println(
"Parameter number is wrong, please enter three parameters:<big table hdfs input> <small table local input> <hdfs output>");
System.exit(-
1);
}
Job job =
new Job(conf,
"MapJoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,
new Path(args[
0]));
job.addCacheFile((
new Path(args[
1]).toUri()));
FileOutputFormat.setOutputPath(job,
new Path(args[
2]));
System.exit(job.waitForCompletion(
true) ?
0 :
1);
}
}
Reduce Side Join
package ReduceJoin;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
public static class MapClass extends
Mapper<LongWritable, Text, Text, Text>
{
private Text key =
new Text();
private Text value =
new Text();
private String[] keyValue =
null;
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
keyValue = value.toString().split(
",",
2);
this.key.set(keyValue[
0]);
this.value.set(keyValue[
1]);
context.write(
this.key,
this.value);
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
private Text value =
new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
StringBuilder valueStr =
new StringBuilder();
for(Text val : values)
{
valueStr.append(val);
valueStr.append(
",");
}
this.value.set(valueStr.deleteCharAt(valueStr.length()-
1).toString());
context.write(key,
this.value);
}
}
public static void main(String[] args)
throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
Configuration conf =
new Configuration();
Job job =
new Job(conf,
"MyJoin");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path(args[
0]));
FileOutputFormat.setOutputPath(job,
new Path(args[
1]));
System.exit(job.waitForCompletion(
true) ?
0 :
1);
}
}