mapreduce中读取文件并缓存

    xiaoxiao2021-04-17  38

    //输入维表(type=0时直接输入该文件(文件地址是全路径加文件名), type=1时输入该文件夹下的文件,地址只到文件夹,文件夹下面包含多个文件) public static void InputDimensionFile(String path_file, String type, Configuration conf, FileSystem fs) throws Exception {//读入hdfs上的维表数据 if(type.equals("0")){ DistributedCache.addCacheFile(new Path(path_file).toUri(), conf); System.out.println("CORRECT!\t"+path_file ); } else if(type.equals("1")){ FileStatus[] fileStatuses = fs.listStatus(new Path(path_file)); if(fileStatuses.length ==1 ){ DistributedCache.addCacheFile(fileStatuses[0].getPath().toUri(), conf); System.out.println("CORRECT!\t"+path_file ); } else { System.out.println("Error!\t"+path_file ); return; } } }

    public static Map<String, String> cde_media = new HashMap<String, String>();//<媒体名称,媒体编号> public static Map<String, String> cde_media_area = new HashMap<String, String>(); // <媒体名称,地域> public static Set<String> phone_del_str=new HashSet<String>(); /** * 载入维表数据 * * 媒体维表cde_media_theme_channel_hsen * 电商黑名单词cde_phone_title_del_hw * * @param context * @param index * @throws IOException * @throws InterruptedException */ public void LoadInfo(Mapper.Context context, int index) throws IOException, InterruptedException { BufferedReader reader = null; Path[] cacheFiles = null; try { cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); if (null != cacheFiles && cacheFiles.length > 0) { reader = new BufferedReader( new InputStreamReader( new FileInputStream(cacheFiles[index].toString()), "UTF-8")); } String line = null; if (index == 0){ while ((line = reader.readLine()) != null){ String[] arr_media = line.split("\t"); if(arr_media.length>=16){ String media_cd = arr_media[0]; String media_type_cd = arr_media[2]; String media_channel_cd = arr_media[4]; String media_area = arr_media[15]; String media_channel = media_type_cd + ";"+ media_channel_cd; cde_media.put(media_cd,media_channel); //放入集合中方便之后取,其中key的值相当于是 on 的值 cde_media_area.put(media_cd, media_area); //同上 } } } else if (index == 1) {//文件只有一列:手机过滤词: 手机膜 、手机壳 之类 while ((line = reader.readLine()) != null) { String str = line; phone_del_str.add(str); //放入集合中方便之后取 } } } catch (IOException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e2) { e2.printStackTrace(); } } } }

    /** * 在setup中读入维表信息 * 此过程只执行一次。用于读取配置、维表以及设置之类的 */ public void setup(Mapper.Context context) throws IOException, InterruptedException { LoadInfo(context,0);//读入媒体类型维表 LoadInfo(context,1);//读入标题关键词黑名单 }

    在Process中的job.set之前,做如下设置: /** * 读取维表地址,然后再用LoadInfo()方法读入地址里的数据 * 此处的维表读入顺序与上边的LoadInfo() 方法中的 index的前后顺序一致 * index 的值从 0 开始。 */ InputDimensionFile(properties.getProperty("cde_media_theme_channel_hsen"),"1",conf,fs); InputDimensionFile(properties.getProperty("cde_phone_title_del_hw"),"1",conf,fs);

    转载请注明原文地址: https://ju.6miu.com/read-674037.html

    最新回复(0)