用Python写一个 Hadoop MapReduce 程序

    xiaoxiao2021-12-12  16


    尽管Hadoop的框架是用Java写的,但是基于Hadoop运行的程序并不一定要用Java来写,我们可以选择一些其他的编程语言比如Python或者C++。不过,Hadoop的文档以及Hadoop网站上给出的典型Python例子可能让人觉得必须先将Python的代码用Jython转成一个Java文件。显然,如果你需要使用一些Jython所不能提供的Python特性的话这会很不方便。使用Jython的另外一个问题是使用这种方式与Hadoop交互会带来额外的开销。看一下$HADOOP_HOME/src/examples/python/WordCount.py 这个例子你就会明白了。

    到此,本教程的目的也就很明确了:用一种更Pythonic的方式来写Hadoop 上的MapReduce(一种你更为熟悉的方式)







            Running Hadoop On Ubuntu Linux (Multi-Node Cluster)

     Running Hadoop On Ubuntu Linux (Single-Node Cluster)

    Python MapReduce 源码解析


    以下Python代码所使用的技巧是基于Hadoop 数据流的接口实现的。借助该接口,我们可以通过标准输入(stdin)和标准输出(stdout)来传递MapReduce过程之间的数据。这里我们简单地使用Pythonsys.stdin读取数据并通过sys.stdout输出数据,剩下的事情就全交给Hadoop~



    将以下代码写入 /home/hduser/mapper.py 它会从stdin读取数据,分割其中的单词然后按行输出单词和其词频到stdout 。不过整个Map处理过程并不会统计每个单词出现总的次数,而是直接输出(<word>, 1)元组。尽管某些单词会出现不止一次,但只要单词出现一次我们就输出一个(<word>, 1)元组。在接下来的Reduce过程我们会统计单词出现的总的次数。当然,你也可以不这么做,但是出于“教程”这一目的,接下来的内容会按照这一思路来写。  :-)

    注意:确保该文件是可执行的(chmod +x /home/hduser/mapper.py),否则会出问题。另外第一行记得添加 #!/usr/bin/env python


    mapper.py 01  #!/usr/bin/env python 02  03  import  sys 04  05  # input comes from STDIN (standard input) 06  for  line  in  sys . stdin : 07     # remove leading and trailing whitespace 08     line  =  line . strip() 09     # split the line into words 10     words  =  line . split() 11     # increase counters 12     for  word  in  words : 13         # write the results to STDOUT (standard output); 14         # what we output here will be the input for the 15         # Reduce step, i.e. the input for reducer.py 16         # 17         # tab-delimited; the trivial word count is 1 18         print  ' %s \t %s '  % ( word ,  1)



    将以下代码保存到/home/hduser/reducer.py 它会从stdin读取mapper.py的结果(因此mapper.py的输出格式应该与reducer.py的输入格式一致),然后统计每个单词出现的总的次数并输出到stdout

    注意:确保该文件是可执行的(chmod +x /home/hduser/mapper.py),否则会出问题。另外第一行记得添加 #!/usr/bin/env python

    reducer.py 01  #!/usr/bin/env python 02  03  from  operator  import  itemgetter 04  import  sys 05  06  current_word  =  None 07  current_count  =  0 08  word  =  None 09  10  # input comes from STDIN 11  for  line  in  sys . stdin : 12     # remove leading and trailing whitespace 13     line  =  line . strip() 14  15     # parse the input we got from mapper.py 16     word ,  count  =  line . split( ' \t ' ,  1) 17  18     # convert count (currently a string) to int 19     try : 20         count  =  int( count) 21     except  ValueError : 22         # count was not a number, so silently 23         # ignore/discard this line 24         continue 25  26     # this IF-switch only works because Hadoop sorts map output 27     # by key (here: word) before it is passed to the reducer 28     if  current_word  ==  word : 29         current_count  +=  count 30     else : 31         if  current_word : 32             # write result to STDOUT 33             print  ' %s \t %s '  % ( current_word ,  current_count) 34         current_count  =  count 35         current_word  =  word 36  37  # do not forget to output the last word if needed! 38  if  current_word  ==  word : 39     print  ' %s \t %s '  % ( current_word ,  current_count)



    测试你的代码(cat data | map | sort | reduce)


    test 01  # very basic test 02 hduser@ubuntu:~ echo  "foo foo quux labs foo bar quux" | /home/hduser/mapper.py 03 foo     1 04 foo     1 05 quux    1 06 labs    1 07 foo     1 08 bar     1 09 quux    1 10  11 hduser@ubuntu:~ echo  "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py 12 bar     1 13 foo     3 14 labs    1 15 quux    2 16  17  # using one of the ebooks as example input 18  # (see below on where to get the ebooks) 19 hduser@ubuntu:~ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py 20  The     1 21  Project 1 22  Gutenberg       1 23  EBook   1 24  of      1 25   [... ] 26  (you get the idea)


    Hadoop上运行 Python代码




    The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson The Notebooks of Leonardo Da Vinci Ulysses by James Joyce

    下载这些电子书的 txt格式,然后将这些文件保存到一个临时文件夹比如 /tmp/gutenberg

    hduser@ubuntu:~ ls -l /tmp/gutenberg/ total 3604 -rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt -rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt -rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt hduser@ubuntu:~ $





      01 hduser@ubuntu:/usr/local/hadoop bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg 02 hduser@ubuntu:/usr/local/hadoop bin/hadoop dfs -ls 03 Found 1 items 04 drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg 05 hduser@ubuntu:/usr/local/hadoop bin/hadoop dfs -ls /user/hduser/gutenberg 06 Found 3 items 07 -rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt 08 -rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt 09 -rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt 10 hduser@ubuntu:/usr/local/hadoop $





    hduser@ubuntu:/usr/local/hadoop bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar  \ -file /home/hduser/mapper.py    -mapper /home/hduser/mapper.py  \ -file /home/hduser/reducer.py   -reducer /home/hduser/reducer.py  \ -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output


    你可以通过指定 -D参数来更改一些Hadoop 设置,比如增加reducer数量


      hduser@ubuntu:/usr/local/hadoop bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks =16 ...


    注意在命令行中可以接受mapred.reducetasks参数来指定reduce的个数,但是不能仅仅通过指定mapred.reduce.tasks来指定map tasks的个数。



        整个任务会从HDFS的路径/user/huser/gutenberg 上读取所有的文件,然后处理并输出到HDFS的路径/user/huser/gutnberg-output事实上Hadoop会给每个reducer创建一个输出文件,在我们的例子中只会输出一个文件因为输入的文件很小。上面命令行的输出示例如下:

    01 hduser@ubuntu:/usr/local/hadoop bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output 02  additionalConfSpec_:null 03   null =@@@userJobConfProps_.get(stream.shipped.hadoopstreaming 04  packageJobJar:  [/app/hadoop/tmp/hadoop-unjar54543/ ] 05   [] /tmp/streamjob54544.jar  tmpDir =null 06   [... ] INFO mapred.FileInputFormat: Total input paths to process : 7 07   [... ] INFO streaming.StreamJob: getLocalDirs():  [/app/hadoop/tmp/mapred/local ] 08   [... ] INFO streaming.StreamJob: Running job: job_200803031615_0021 09   [... ] 10   [... ] INFO streaming.StreamJob:  map 0%  reduce 0% 11   [... ] INFO streaming.StreamJob:  map 43%  reduce 0% 12   [... ] INFO streaming.StreamJob:  map 86%  reduce 0% 13   [... ] INFO streaming.StreamJob:  map 100%  reduce 0% 14   [... ] INFO streaming.StreamJob:  map 100%  reduce 33% 15   [... ] INFO streaming.StreamJob:  map 100%  reduce 70% 16   [... ] INFO streaming.StreamJob:  map 100%  reduce 77% 17   [... ] INFO streaming.StreamJob:  map 100%  reduce 100% 18   [... ] INFO streaming.StreamJob: Job  complete: job_200803031615_0021 19   [... ] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output 20 hduser@ubuntu:/usr/local/hadoop $


    从上面的输出可以看到,Hadoop还为一些统计信息提供了一个基本的网页接口。在Hadoop集群运行时可以打开 http://localhost:50030





    你可以通过 dfs -cat命令查看输出文件的内容

      01 hduser@ubuntu:/usr/local/hadoop bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000 02  "(Lo)cra"       1 03  "1490   1 04  "1498, " 1 05  "35 "    1 06  "40, "   1 07  "A      2 08  "AS-IS".        2 09  "A_     1 10  "Absoluti       1 11  [... ] 12 hduser@ubuntu:/usr/local/hadoop $






    注意,下面的MapReduce脚本只有在Hadoop的环境下才能正常运行,也就是说使用本地的命令“cat Data | ./mapper.py |sort -k1,1 |./reucer.py”并不会正常运行,因为有些函数特性在不能离开Hadoop


    mapper.py 01  #!/usr/bin/env python 02  """A more advanced Mapper, using Python iterators and generators.""" 03  04  import  sys 05  06  def  read_input( file ): 07      for  line  in  file : 08          # split the line into words 09          yield  line . split() 10  11  def  main( separator = ' \t ' ): 12      # input comes from STDIN (standard input) 13      data  =  read_input( sys . stdin) 14      for  words  in  data : 15          # write the results to STDOUT (standard output); 16          # what we output here will be the input for the 17          # Reduce step, i.e. the input for reducer.py 18          # 19          # tab-delimited; the trivial word count is 1 20          for  word  in  words : 21              print  ' %s%s%d '  % ( word ,  separator ,  1) 22  23  if  __name__  ==  "__main__" : 24      main()



    reducer.py 01  #!/usr/bin/env python 02  """A more advanced Reducer, using Python iterators and generators.""" 03  04  from  itertools  import  groupby 05  from  operator  import  itemgetter 06  import  sys 07  08  def  read_mapper_output( file ,  separator = ' \t ' ): 09      for  line  in  file : 10          yield  line . rstrip() . split( separator ,  1) 11  12  def  main( separator = ' \t ' ): 13      # input comes from STDIN (standard input) 14      data  =  read_mapper_output( sys . stdin ,  separator = separator) 15      # groupby groups multiple word-count pairs by word, 16      # and creates an iterator that returns consecutive keys and their group: 17      #   current_word - string containing a word (the key) 18      #   group - iterator yielding all ["<current_word>", "<count>"] items 19      for  current_word ,  group  in  groupby( data ,  itemgetter( 0 )): 20          try : 21              total_count  =  sum( int( countfor  current_word ,  count  in  group) 22              print  " %s%s%d "  % ( current_word ,  separator ,  total_count) 23          except  ValueError : 24              # count was not a number, so silently discard this item 25              pass 26  27  if  __name__  ==  "__main__" : 28      main()
    转载请注明原文地址: https://ju.6miu.com/read-900140.html
