第一:MapReduce概述
优点
易于编程良好的扩展高容错性适合PB级的海量离线处理缺点
不擅长实时计算 毫米级返回处理结果不擅长流式计算 MapReduce的数据源是静态的不擅长DAG计算 map将结果存在hdfs中,不适合多次从hdfs读写来进行计算
第二:MapReduce编程模型
MapReduce将整个作业的运行过程分为两个阶段Map阶段和Reduce阶段
map阶段由一定数量的map task组成
输入数据格式解析:inputFormat
文件分片方法将分片数据解析成key/value对,默认是textInputFormat输入数据处理:mapper数据分组:partitioner
Map task输出的数据交给哪个reduce task处理默认实现:hash(key)mod R本地处理:combiner
看做local reducer,通常与reducer的逻辑一致减少mapper输入(磁盘io),网络传输(网络io)结果可叠加才能使用例如sum。对应average就不行Reduce阶段由一定的reduce task组成
数据远程拷贝数据按照key排序数据处理:Reducer数据输出格式:outputFormat
第三:MapReduce2.0架构及核心设计机制
Client 通过client与yarn交互,提交MapReduce作业MRAppMaster 类似jobtracker。任务划分,申请资源二次分配给map task,reduce task,任务状态监控等ResourceManagerNodeManagerContainer容错性
MRAppMaster挂了,由ResourceManager负责重启,默认2次map task,reduce task定期向MRAppMaster发送心跳,task挂掉,MRAppMaster重新申请资源运行,默认4次数据计算本地行
数据与task同节点 >> 数据与task同机架不同节点 >> 数据与task跨机架不同节点。推测执行机制 发现某个task运行慢,启动一个备份任务,取最先完成的结果任务并行执行
第四:MapReduceJava编程
Java编程(最原始的方法)Hadoop Streaming编程(支持多语言编程)
第五:项目实战
一:WordCount
代码
package
com.dev4free
.hadoop
import java
.io.IOException
import java
.util.StringTokenizer
import org
.apache.hadoop.conf.Configuration
import org
.apache.hadoop.fs.Path
import org
.apache.hadoop.io.IntWritable
import org
.apache.hadoop.io.Text
import org
.apache.hadoop.mapreduce.Job
import org
.apache.hadoop.mapreduce.Mapper
import org
.apache.hadoop.mapreduce.Reducer
import org
.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org
.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org
.apache.hadoop.util.GenericOptionsParser
public class MyWordCount {
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
private static final IntWritable one = new IntWritable(
1)
private Text word = new Text()
public void map(Object key, Text value, Context context) throws IOException,InterruptedException
{
StringTokenizer line = new StringTokenizer(value
.toString())
while (line
.hasMoreTokens()) {
word
.set(line
.nextToken())
context
.write(word, one)
}
}
}
public static class WordCountRducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final IntWritable result = new IntWritable()
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int sum =
0
for(IntWritable value:values){
sum = sum + value
.get()
}
result
.set(sum)
context
.write(key, result)
}
}
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration()
String[] otherArgs = new GenericOptionsParser(configuration,args)
.getRemainingArgs()
if (otherArgs
.length !=
2) {
System
.out.println(
"input path error")
System
.exit(
2)
}
Job job = Job
.getInstance(configuration,
"MyWordCounter")
job
.setJarByClass(MyWordCount
.class)
job
.setMapperClass(WordCountMapper
.class)
job
.setReducerClass(WordCountRducer
.class)
job
.setCombinerClass(WordCountRducer
.class)
job
.setOutputKeyClass(Text
.class)
job
.setOutputValueClass(IntWritable
.class)
FileInputFormat
.addInputPath(job, new Path(otherArgs[
0]))
FileOutputFormat
.setOutputPath(job,new Path(otherArgs[
1]))
System
.exit(job
.waitForCompletion(true)?
0:
1)
}
}
通过maven打包
-
bash-3.2$ ls
pom.xml src target
-
bash-3.2$ mvn -clean package
将jar包拷贝到hadoop集群中,运行命令
[hadoop@hadoopa ~]$ hadoop jar hadoop-
0.0.1-SNAPSHOT
.jar com.dev4free
.hadoop.MyWordCount /me /wod
结果验证
[hadoop
@hadoopa ~]
$ hdfs dfs -cat /wod/part-r-
00000
a
1
am
1
boy
1
china
1
country
2
i
2
is
1
love
1
my
2
二:倒排索引
代码
package
com.dev4free
.hadoop
import java
.io.IOException
import java
.util.HashMap
import java
.util.Map
import java
.util.StringTokenizer
import org
.apache.hadoop.conf.Configuration
import org
.apache.hadoop.fs.Path
import org
.apache.hadoop.io.Text
import org
.apache.hadoop.mapreduce.Job
import org
.apache.hadoop.mapreduce.Mapper
import org
.apache.hadoop.mapreduce.Reducer
import org
.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org
.apache.hadoop.mapreduce.lib.input.FileSplit
import org
.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org
.apache.hadoop.util.GenericOptionsParser
import
com.dev4free
.hadoop.MyInvertedIndex.InvertedIndexMapper.InvertedIndexReducer
public class MyInvertedIndex {
public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
StringTokenizer line = new StringTokenizer(value
.toString())
String fileName = ((FileSplit)context
.getInputSplit())
.getPath()
.getName()
while (line
.hasMoreTokens()) {
context
.write(new Text(line
.nextToken()), new Text(fileName))
}
}
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
Map<String, Integer> counts = new HashMap<String, Integer>()
for(Text value : values){
String fileName = value
.toString()
if (counts
.containsKey(fileName)) {
counts
.put(fileName, counts
.get(fileName) +
1)
}else {
counts
.put(fileName,
1)
}
}
context
.write(key, new Text(counts
.toString()))
}
}
}
public static void main(String[] args)throws Exception{
Configuration configuration = new Configuration()
String[] otherArgs = new GenericOptionsParser(configuration,args)
.getRemainingArgs()
if (otherArgs
.length !=
2) {
System
.out.println(
"input error")
System
.exit(
2)
}
Job job = Job
.getInstance(configuration,
"MyInvertedIndex")
job
.setJarByClass(MyInvertedIndex
.class)
job
.setMapperClass(InvertedIndexMapper
.class)
job
.setReducerClass(InvertedIndexReducer
.class)
job
.setOutputKeyClass(Text
.class)
job
.setOutputValueClass(Text
.class)
FileInputFormat
.addInputPath(job, new Path(args[
0]))
FileOutputFormat
.setOutputPath(job, new Path(args[
1]))
System
.exit(job
.waitForCompletion(true)?
0:
1)
}
}
通过maven打包
-
bash-3.2$ mvn clean package
将jar包拷贝到hadoop上,并运行
[hadoop@hadoopa test]$ hdfs dfs -put /home/hadoop/test /inverted
[hadoop@hadoopa test]$ hadoop jar /home/hadoop/hadoop-
0.0.1-SNAPSHOT
.jar com.dev4free
.hadoop.MyInvertedIndex /inverted /
out/inverted
验证结果
[hadoop@hadoopa test]$ hdfs dfs -cat /
out/inverted/part-r-
00000
china {text1
.txt=
1}
country {text1
.txt=
2, text3
.txt=
2, text2
.txt=
2}
i {text1
.txt=
1, text2
.txt=
1, text3
.txt=
1}
is {text1
.txt=
1, text3
.txt=
1, text2
.txt=
1}
japan {text2
.txt=
1}
love {text1
.txt=
1, text3
.txt=
1, text2
.txt=
1}
my {text1
.txt=
2, text2
.txt=
2, text3
.txt=
2}
usa {text3
.txt=
1}
转载请注明原文地址: https://ju.6miu.com/read-22735.html