package cheryl.dhcc.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output. FileOutputFormat;
// 统计 /user/dhcc/medical_data下的文件中单词的词频
public class DhMapreduce {
//因为hadoop默认编码为utf-8,所以在处理gbk时需要转码
public static Text transformTextToUTF8(Text text, String encoding) {
String value = null;
try {
value = new String(text.getBytes(), 0, text.getLength(), encoding);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new Text(value);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
//输入索引和要处理的文本,输出文本和出现次数
private final static IntWritable One = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//读出字符串 hadoop默认使用utf8 先将gbk统一成utf-8处理
Text next=transformTextToUTF8(value,"GBK");
String line=next.toString();
//将非字母和汉字以及一些特殊符号(. - / : ℃)外的符号替换成空格
line=line.replaceAll("[^(0-9\\u4e00-\\u9fa5)]"," ");
//去除多余的空格 只保留一个空格
line=line.replaceAll("//s{2,}"," ");
//对本行文件作分割处理
StringTokenizer itr=new StringTokenizer(line);
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, One);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private final static IntWritable result=new IntWritable(0);
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(DhMapreduce.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputFormatClass(GbkOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}package cheryl.dhcc.mapreduce;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.*;
//因为处理的文件为gbk文件 所以输出格式需要改为gbk
@InterfaceAudience.Public
@InterfaceStability.Stable
public class GbkOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V> {
private static final String gbk = "GBK";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(gbk);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + gbk + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(gbk);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + gbk + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
out.write(o.toString().getBytes(gbk));
} else {
out.write(o.toString().getBytes(gbk));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
转载请注明原文地址: https://ju.6miu.com/read-660386.html