1.MapReduce计算框架
2.实例WordCount
package org
.apache.hadoop.examples
import java
.io.IOException
import java
.util.StringTokenizer
import org
.apache.hadoop.conf.Configuration
import org
.apache.hadoop.fs.Path
import org
.apache.hadoop.io.IntWritable
import org
.apache.hadoop.io.Text
import org
.apache.hadoop.mapreduce.Job
import org
.apache.hadoop.mapreduce.Mapper
import org
.apache.hadoop.mapreduce.Reducer
import org
.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org
.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org
.apache.hadoop.mapred.TextInputFormat
import org
.apache.hadoop.mapred.TextOutputFormat
import org
.apache.hadoop.util.GenericOptionsParser
public class WordCount {
// This is the Mapper class
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
//define IntWritaable class object one
private final static IntWritable one = new IntWritable(
1)
private Text word = new Text()
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value
.toString())
while (itr
.hasMoreTokens()) {
word
.set(itr
.nextToken())
context
.write(word, one)
}
}
}
public static class IntSumCombiner
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable()
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum =
0
for (IntWritable val : values) {
sum += val
.get()
}
result
.set(sum)
context
.write(key, result)
}
}
// This is the Reducer class
// count of word = count
set output format
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,Text> {
private Text result_key= new Text()
private Text result_value= new Text()
private byte[] prefix
private byte[] suffix
protected void setup(Context context) {
try {
prefix= Text
.encode(
"count of ")
.array()
suffix= Text
.encode(
" =")
.array()
} catch (Exception e) {
prefix = suffix = new byte[
0]
}
}
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum =
0
for (IntWritable val : values) {
sum += val
.get()
}
// generate result key
result_key
.set(prefix)
result_key
.append(key
.getBytes(),
0, key
.getLength())
result_key
.append(suffix,
0, suffix
.length)
// generate result value
result_value
.set(Integer
.toString(sum))
context
.write(result_key, result_value)
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration()
System
.out.println(
"conf information: "+conf
.toString())
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs()
System
.out.println(
"otherArgs:")
for(int i=
0
System
.out.println(otherArgs[i])
if (otherArgs
.length <
2) {
System
.err.println(
"Usage: wordcount <in> [<in>...] <out>")
System
.exit(
2)
}
Job job = Job
.getInstance(conf,
"word count")
job
.setJarByClass(WordCount
.class)
job
.setMapperClass(TokenizerMapper
.class)
job
.setCombinerClass(IntSumCombiner
.class)
job
.setReducerClass(IntSumReducer
.class)
job
.setMapOutputKeyClass(Text
.class)
job
.setMapOutputValueClass(IntWritable
.class)
job
.setOutputKeyClass(Text
.class)
job
.setOutputValueClass(Text
.class)
//
add the input paths as given by command line
for (int i =
0
FileInputFormat
.addInputPath(job, new Path(otherArgs[i]))
System
.out.println(
"==================================")
System
.out.println(otherArgs[i])
}
System
.out.println(otherArgs[otherArgs
.length -
1])
//
add the output path as given by the command line
FileOutputFormat
.setOutputPath(job,
new Path(otherArgs[otherArgs
.length -
1]))
System
.exit(job
.waitForCompletion(true) ?
0 :
1)
}
}
转载请注明原文地址: https://ju.6miu.com/read-668404.html