分布式计算:批处理引擎 MapReduce(第一部分)

    xiaoxiao2021-03-25  91

    第一:MapReduce概述

    优点 易于编程良好的扩展高容错性适合PB级的海量离线处理缺点 不擅长实时计算 毫米级返回处理结果不擅长流式计算 MapReduce的数据源是静态的不擅长DAG计算 map将结果存在hdfs中,不适合多次从hdfs读写来进行计算

    第二:MapReduce编程模型

    MapReduce将整个作业的运行过程分为两个阶段Map阶段和Reduce阶段 map阶段由一定数量的map task组成 输入数据格式解析:inputFormat 文件分片方法将分片数据解析成key/value对,默认是textInputFormat输入数据处理:mapper数据分组:partitioner Map task输出的数据交给哪个reduce task处理默认实现:hash(key)mod R本地处理:combiner 看做local reducer,通常与reducer的逻辑一致减少mapper输入(磁盘io),网络传输(网络io)结果可叠加才能使用例如sum。对应average就不行Reduce阶段由一定的reduce task组成 数据远程拷贝数据按照key排序数据处理:Reducer数据输出格式:outputFormat

    第三:MapReduce2.0架构及核心设计机制

    Client 通过client与yarn交互,提交MapReduce作业MRAppMaster 类似jobtracker。任务划分,申请资源二次分配给map task,reduce task,任务状态监控等ResourceManagerNodeManagerContainer容错性 MRAppMaster挂了,由ResourceManager负责重启,默认2次map task,reduce task定期向MRAppMaster发送心跳,task挂掉,MRAppMaster重新申请资源运行,默认4次数据计算本地行 数据与task同节点 >> 数据与task同机架不同节点 >> 数据与task跨机架不同节点。推测执行机制 发现某个task运行慢,启动一个备份任务,取最先完成的结果任务并行执行

    第四:MapReduceJava编程

    Java编程(最原始的方法)Hadoop Streaming编程(支持多语言编程)

    第五:项目实战

    一:WordCount

    代码 package com.dev4free.hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MyWordCount { public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException { StringTokenizer line = new StringTokenizer(value.toString()); while (line.hasMoreTokens()) { word.set(line.nextToken()); context.write(word, one); } } } public static class WordCountRducer extends Reducer<Text, IntWritable, Text, IntWritable> { private static final IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{ int sum = 0; for(IntWritable value:values){ sum = sum + value.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception{ Configuration configuration = new Configuration(); String[] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs(); if (otherArgs.length !=2) { System.out.println("input path error"); System.exit(2); } Job job = Job.getInstance(configuration,"MyWordCounter"); job.setJarByClass(MyWordCount.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountRducer.class); job.setCombinerClass(WordCountRducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } } 通过maven打包 -bash-3.2$ ls pom.xml src target -bash-3.2$ mvn -clean package 将jar包拷贝到hadoop集群中,运行命令 [hadoop@hadoopa ~]$ hadoop jar hadoop-0.0.1-SNAPSHOT.jar com.dev4free.hadoop.MyWordCount /me /wod 结果验证 [hadoop@hadoopa ~]$ hdfs dfs -cat /wod/part-r-00000 a 1 am 1 boy 1 china 1 country 2 i 2 is 1 love 1 my 2

    二:倒排索引

    代码 package com.dev4free.hadoop; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.dev4free.hadoop.MyInvertedIndex.InvertedIndexMapper.InvertedIndexReducer; public class MyInvertedIndex { public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{ public void map(Object key,Text value,Context context) throws IOException,InterruptedException{ StringTokenizer line = new StringTokenizer(value.toString()); String fileName = ((FileSplit)context.getInputSplit()).getPath().getName(); while (line.hasMoreTokens()) { context.write(new Text(line.nextToken()), new Text(fileName)); } } public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{ Map<String, Integer> counts = new HashMap<String, Integer>(); for(Text value : values){ String fileName = value.toString(); if (counts.containsKey(fileName)) { counts.put(fileName, counts.get(fileName) + 1); }else { counts.put(fileName, 1); } } context.write(key, new Text(counts.toString())); } } } public static void main(String[] args)throws Exception{ Configuration configuration = new Configuration(); String[] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs(); if (otherArgs.length != 2) { System.out.println("input error"); System.exit(2); } Job job = Job.getInstance(configuration,"MyInvertedIndex"); job.setJarByClass(MyInvertedIndex.class); job.setMapperClass(InvertedIndexMapper.class); job.setReducerClass(InvertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } } 通过maven打包 -bash-3.2$ mvn clean package 将jar包拷贝到hadoop上,并运行 [hadoop@hadoopa test]$ hdfs dfs -put /home/hadoop/test /inverted [hadoop@hadoopa test]$ hadoop jar /home/hadoop/hadoop-0.0.1-SNAPSHOT.jar com.dev4free.hadoop.MyInvertedIndex /inverted /out/inverted 验证结果 [hadoop@hadoopa test]$ hdfs dfs -cat /out/inverted/part-r-00000 china {text1.txt=1} country {text1.txt=2, text3.txt=2, text2.txt=2} i {text1.txt=1, text2.txt=1, text3.txt=1} is {text1.txt=1, text3.txt=1, text2.txt=1} japan {text2.txt=1} love {text1.txt=1, text3.txt=1, text2.txt=1} my {text1.txt=2, text2.txt=2, text3.txt=2} usa {text3.txt=1}
    转载请注明原文地址: https://ju.6miu.com/read-22735.html

    最新回复(0)