要编写一个mapreduce程序就需要编写一个map程序和一个reduce程序,一个WordCount示例:
/* 编写map程序需要继承Mapper类,Mapper类有四个泛型参数,第一个表示的是输入数据的key的类型,第二参数为输入value的类型,第三个参数为输出数据的key的类型,第四个参数为输出数据的value的类型; 通常是在map程序中读取文件,第一个参数默认情况下都是LongWritable类型,表示的是待处理的文本文件一行的起始偏移量,每读取一行的数据就会调用一次map方法,LongWritable是对Long的封装,Text是对String的封装 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text,LongWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] words = org.apache.commons.lang.StringUtils.split(line,"\t"); for(String word : words){ context.write(new Text(word), new LongWritable(1)) } } } /* reduce程序需要继承Reducer类,重写reduce方法,针对每一组数据调用一次reduce,比如这里框架会在所有的map处理完成之后,将所有的key-value缓存起来进行分组,类似<key,{1,1,1,1,1}>这种数据,每一次调用reduce将处理完成一种key的统计 */ public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{ long count = 0; for(LongWritable value : values){ count += value.get(); } context.write(key, new LongWritable(count)); } } /* 定义一个Runner,用来描述一个特定的作业,比如,该作业使用哪一个类作为逻辑处理的map, 哪个作为reduce,指定该作业要处理的数据所在的路径,输出的结果放在哪个路径下等信息; 如果需要将job提交到集群运行,则首先需要启动yarn,其次需要将mapred-site.xml和yarn-site.xml拷贝到src路径下;另外还需要将当前工程打成jar包,然后对conf设置conf.set("mapreduce.job.jar","打的jar包路径"); */ public class WordCountRunner{ public static void main(String[] args){ Configuration conf = new Configuration(); //直接在eclipse中运行需要添加下面的配置 //conf.set("mapreduce.job.jar","wordCount.jar") Job job = Job.getInstance(conf); job.setJarByClass(WordCountRunner.class); //指定该job需要的Mapper类和Reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //设置reduce程序输出的key和value的类型,如果map和reduce输出的key和value的类型相同的的话,就可以不用单独设置map的输出的key和value的类型,只设置这个参数就可以了; job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置map函数的输出key和value的类型 job.setMapOutputKeyValue(Text.class); job.setMapOutputValueValue(LongWritable.class); //设置输入数据的路径地址 FileInputFormat.setInputPaths(job, new Path("/input")); //设置输出数据的存放地址,这里只需要指定目录就可以, 并且这个目录不能是已经存在的,否则会报错,因为每次运行都是覆盖目录下的所有的数据,为了避免这种情况的出现,因此这里指定的目录必须是不存在的; FileOutputFormat.setOutputPaths(job, new Path("/output")); //将job提交给集群运行,参数为ture表示将运行的进度信息在屏幕上显示出来,false则不显示 job.waitForCompletin(true); } }集群模式下运行: 1.将工程打成jar包,上传到服务器,然后用hadoop命令提交 hadoop jar wordCount.jar com.soft.mapreduce.WordCountRunner