FileSystem
import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; public class HdfsUtil { FileSystem fs=null; @Before public void conf() throws Exception{ Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://wuke01:9000/"); fs=FileSystem.get(new URI("hdfs://wuke01:9000/"), conf, "hadoop"); } /** * 原始下载文件 * @throws Exception */ @Test public void download() throws Exception{ FSDataInputStream in=fs.open(new Path("hdfs://wukecomputer:9000/jdk-7u65-linux-i586.tar.gz")); FileOutputStream out=new FileOutputStream("D:/jdk.tar.gz"); IOUtils.copy(in, out); } /** * 封装下载文件 * @throws Exception */ @Test public void download2() throws Exception{ fs.copyToLocalFile(new Path("hdfs://wukecomputer:9000/jdk-7u65-linux-i586.tar.gz"), new Path("D:/jdk2.txt")); } /** * 上传文件 * @throws Exception * @throws IOException */ @Test public void upload() throws Exception, IOException{ File file=new File("D:/aa.txt"); FileOutputStream o = new FileOutputStream(file); o.write("wuke\001dukun\001pang\001ding".getBytes("utf-8")); o.close(); FileInputStream in=new FileInputStream("D:/aa.txt"); FSDataOutputStream out=fs.create(new Path("hdfs://wuke01:9000/spark/up.txt")); IOUtils.copy(in, out); } /** * 精简上传文件 */ @Test public void upload2() throws Exception{ fs.copyFromLocalFile(new Path("C:/IFRToolLog.txt"), new Path("hdfs://wukecomputer:9000/aa/up2.txt")); } }
mapreduce
Mapper:
import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //Mapper<keyin,valuein,keyout,valueout> //默认情况向,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的偏移量,value是这一行的内容 public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ //mapreduce框架每读一行数据,调用一次map方法 @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line=value.toString(); //切分内容 String[] words=StringUtils.split(line," "); for(String word:words){ context.write(new Text(word), new LongWritable(1)); } } }
Reducer:
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ //mapreduce框架处理完map后,将所有key-value对缓存,然后传递一组<key,value{}>,调用依次reducer方法 //<hello,{1,1,1,1}> @Override protected void reduce(Text text, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { long count=0; //遍历value的list,累加求和 for(LongWritable value:values){ count+=value.get(); } //输出这一个单词的统计结果 context.write(text, new LongWritable(count)); } }
Runner:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 用来描述一个特定作业 * 比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reducer * 还可以指定数据输入、输出路径 * @author ke * */ public class WCRunner { public static void main(String[] args) throws Throwable { Configuration conf=new Configuration(); Job wcJob=Job.getInstance(conf); //指定maper reducer jar包所在的位置 wcJob.setJarByClass(WCRunner.class); //指定mapper和reducer类 wcJob.setMapperClass(WCMapper.class); wcJob.setReducerClass(WCReducer.class); //指定reducer输出数据类型 //(此处输出类型是为为reducer和map设置的。如果mapper输出类型和reducer一致,可只写下面两行设置) wcJob.setOutputKeyClass(Text.class); wcJob.setOutputValueClass(LongWritable.class); //指定Mapper输出数据类型 wcJob.setMapOutputKeyClass(Text.class); wcJob.setMapOutputValueClass(LongWritable.class); //指定输入输出数据路径 FileInputFormat.setInputPaths(wcJob, new Path("/wc/input/")); FileOutputFormat.setOutputPath(wcJob, new Path("/wc/output2/")); //向集群提交此job,参数true代表打印进度 wcJob.waitForCompletion(true); } }