hadoop 下多个小文件的合并上传

    xiaoxiao2025-09-04  442

    package org.hadoop.hdfs.utils; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.io.IOUtils; /** * * 这是注释 *<p>title:PutMargeUtils.java</p> *<p>Description:向hdfs上传文件的时候对小文件合并在上传 </p> *<p>Company: 幻方</p> *@author: wuyihuai 邮箱1049153718@qq.com *@date 下午12:02:11 */ public class PutMargeUtils { /** * * <p>Title:put </p> * <p>Description: 本地指定目录下所有文件进行上传 合并上传</p> * <p>Company: 幻方</p> * @param localDir 本地指定目录 * @param hdfsFileDir hdfs 目录 * @param Configuration hdfs 的Configuration * @author: wuyihuai 邮箱1049153718@qq.com * @throws IOException * @date: 下午12:04:40 */ public static void putList(String localDir,String hdfsFileDir ,Configuration conf) throws IOException{ //获取本地的文件系统 FileSystem localfs=FileSystem.getLocal(conf); //获取hdfs文件系统 FileSystem hdfs=FileSystem.get(conf); //格式化本地路径 Path localPath=new Path(localDir); //格式化hdfs 路径 Path hdfsPaht=new Path(hdfsFileDir); //获取当前路径下的所有文件的FileStatus FileStatus[] fileStatus=localfs.listStatus(localPath); //获取hdfs的输出流 FSDataOutputStream fsDataOutputStream=hdfs.create(hdfsPaht); for(FileStatus fileStatu:fileStatus){ Path path=fileStatu.getPath(); System.err.println("文件为"+path.getName()); //获取文件的输入流 FSDataInputStream fsDataInputStream=localfs.open(path); byte[] buffer=new byte[2049]; int len=0; while((len=fsDataInputStream.read(buffer))>0){ fsDataOutputStream.write(buffer,0,len); } fsDataInputStream.close(); } fsDataOutputStream.close(); } /** * * <p>Title:putDirectory </p> * <p>Description: 对指定的多个文件进行合并上传 </p> * <p>Company: 幻方</p> * @param localDir 文件路径数组 * @param hdfsFileDir Hdfs路径 * @param conf hdfs 的Configuration * @throws IOException void * @author: wuyihuai 邮箱1049153718@qq.com * @date: 下午12:38:43 */ public static void putDirectory(String[] localDirs,String hdfsFileDir ,Configuration conf) throws IOException{ //获取本地的文件系统 FileSystem localfs=FileSystem.getLocal(conf); //获取hdfs文件系统 FileSystem hdfs=FileSystem.get(conf); //格式化hdfs 路径 Path hdfsPaht=new Path(hdfsFileDir); //获取hdfs的输出流 FSDataOutputStream fsDataOutputStream=hdfs.create(hdfsPaht); for(String localDir:localDirs){ Path path=new Path(localDir); //获取文件的输入流 FSDataInputStream fsDataInputStream=localfs.open(path); byte[] buffer=new byte[1024]; int len=0; while((len=fsDataInputStream.read(buffer))>0){ //流的写入 fsDataOutputStream.write(buffer,0,len); } //关闭当前文件的输入流 fsDataInputStream.close(); } //关闭输出流 IOUtils.closeStream(fsDataOutputStream); } }
    转载请注明原文地址: https://ju.6miu.com/read-1302299.html
    最新回复(0)