用Python写一个 Hadoop MapReduce 程序

    xiaoxiao2021-12-12  6

    写作缘由

    尽管Hadoop的框架是用Java写的,但是基于Hadoop运行的程序并不一定要用Java来写,我们可以选择一些其他的编程语言比如Python或者C++。不过,Hadoop的文档以及Hadoop网站上给出的典型Python例子可能让人觉得必须先将Python的代码用Jython转成一个Java文件。显然,如果你需要使用一些Jython所不能提供的Python特性的话这会很不方便。使用Jython的另外一个问题是使用这种方式与Hadoop交互会带来额外的开销。看一下$HADOOP_HOME/src/examples/python/WordCount.py 这个例子你就会明白了。

    到此,本教程的目的也就很明确了:用一种更Pythonic的方式来写Hadoop 上的MapReduce(一种你更为熟悉的方式)

     

    写作目的

    本文的目的是用Python写一个简单的运行在Hadoop上的MapReduce程序,该程序不需要使用JythonPython源码转成Java文件。这个程序将模仿WordCount(读取文本文件并统计单词的词频),输出的文本文件每行包括一个单词和该单词的词频,并用tab分隔。

     

    前提

    因为需要实际动手操作,你至少有一个Hadoop集群能正常运行,如果你没有Hadoop集群的话,下面的教程可以帮你建立一个。下面的教程是基于Ubuntu系统建立的,但是你可以将他们应用到其它LinuxUnix系统上。

            Running Hadoop On Ubuntu Linux (Multi-Node Cluster)

     Running Hadoop On Ubuntu Linux (Single-Node Cluster)

    Python MapReduce 源码解析

     

    以下Python代码所使用的技巧是基于Hadoop 数据流的接口实现的。借助该接口,我们可以通过标准输入(stdin)和标准输出(stdout)来传递MapReduce过程之间的数据。这里我们简单地使用Pythonsys.stdin读取数据并通过sys.stdout输出数据,剩下的事情就全交给Hadoop~

     

    Map 

    将以下代码写入 /home/hduser/mapper.py 它会从stdin读取数据,分割其中的单词然后按行输出单词和其词频到stdout 。不过整个Map处理过程并不会统计每个单词出现总的次数,而是直接输出(<word>, 1)元组。尽管某些单词会出现不止一次,但只要单词出现一次我们就输出一个(<word>, 1)元组。在接下来的Reduce过程我们会统计单词出现的总的次数。当然,你也可以不这么做,但是出于“教程”这一目的,接下来的内容会按照这一思路来写。  :-)

    注意:确保该文件是可执行的(chmod +x /home/hduser/mapper.py),否则会出问题。另外第一行记得添加 #!/usr/bin/env python

      

    mapper.py 01  #!/usr/bin/env python 02  03  import  sys 04  05  # input comes from STDIN (standard input) 06  for  line  in  sys . stdin : 07     # remove leading and trailing whitespace 08     line  =  line . strip() 09     # split the line into words 10     words  =  line . split() 11     # increase counters 12     for  word  in  words : 13         # write the results to STDOUT (standard output); 14         # what we output here will be the input for the 15         # Reduce step, i.e. the input for reducer.py 16         # 17         # tab-delimited; the trivial word count is 1 18         print  ' %s \t %s '  % ( word ,  1)

     

    Reduce

    将以下代码保存到/home/hduser/reducer.py 它会从stdin读取mapper.py的结果(因此mapper.py的输出格式应该与reducer.py的输入格式一致),然后统计每个单词出现的总的次数并输出到stdout

    注意:确保该文件是可执行的(chmod +x /home/hduser/mapper.py),否则会出问题。另外第一行记得添加 #!/usr/bin/env python

    reducer.py 01  #!/usr/bin/env python 02  03  from  operator  import  itemgetter 04  import  sys 05  06  current_word  =  None 07  current_count  =  0 08  word  =  None 09  10  # input comes from STDIN 11  for  line  in  sys . stdin : 12     # remove leading and trailing whitespace 13     line  =  line . strip() 14  15     # parse the input we got from mapper.py 16     word ,  count  =  line . split( ' \t ' ,  1) 17  18     # convert count (currently a string) to int 19     try : 20         count  =  int( count) 21     except  ValueError : 22         # count was not a number, so silently 23         # ignore/discard this line 24         continue 25  26     # this IF-switch only works because Hadoop sorts map output 27     # by key (here: word) before it is passed to the reducer 28     if  current_word  ==  word : 29         current_count  +=  count 30     else : 31         if  current_word : 32             # write result to STDOUT 33             print  ' %s \t %s '  % ( current_word ,  current_count) 34         current_count  =  count 35         current_word  =  word 36  37  # do not forget to output the last word if needed! 38  if  current_word  ==  word : 39     print  ' %s \t %s '  % ( current_word ,  current_count)

     

     

    测试你的代码(cat data | map | sort | reduce)

    建议在Hadoop上实际运行mapreduce之前先在本地测试mapper.pyreducer.py否侧可能会出现程序能正常执行但却完全没有输出结果或者输出不是你想要的结果。如果这发生的话,多半是你自己搞砸了......

    test 01  # very basic test 02 hduser@ubuntu:~ echo  "foo foo quux labs foo bar quux" | /home/hduser/mapper.py 03 foo     1 04 foo     1 05 quux    1 06 labs    1 07 foo     1 08 bar     1 09 quux    1 10  11 hduser@ubuntu:~ echo  "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py 12 bar     1 13 foo     3 14 labs    1 15 quux    2 16  17  # using one of the ebooks as example input 18  # (see below on where to get the ebooks) 19 hduser@ubuntu:~ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py 20  The     1 21  Project 1 22  Gutenberg       1 23  EBook   1 24  of      1 25   [... ] 26  (you get the idea)

     

    Hadoop上运行 Python代码

     

    下载测试用的输入数据

    我们使用古腾堡项目中的三本电子书作为测试:

    The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson The Notebooks of Leonardo Da Vinci Ulysses by James Joyce

    下载这些电子书的 txt格式,然后将这些文件保存到一个临时文件夹比如 /tmp/gutenberg

    hduser@ubuntu:~ ls -l /tmp/gutenberg/ total 3604 -rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt -rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt -rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt hduser@ubuntu:~ $

     

    将本地的数据拷贝到HDFS

    在运行MapReduce任务之前,我们必须先将本地文件拷贝到Hadoop的文件系统上

     

      01 hduser@ubuntu:/usr/local/hadoop bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg 02 hduser@ubuntu:/usr/local/hadoop bin/hadoop dfs -ls 03 Found 1 items 04 drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg 05 hduser@ubuntu:/usr/local/hadoop bin/hadoop dfs -ls /user/hduser/gutenberg 06 Found 3 items 07 -rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt 08 -rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt 09 -rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt 10 hduser@ubuntu:/usr/local/hadoop $

     

    运行MapReduce任务

    现在一切都准备好了,我们可以通过Hadoop的数据流API来传送MapReduce过程中间的数据。

     

    hduser@ubuntu:/usr/local/hadoop bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar  \ -file /home/hduser/mapper.py    -mapper /home/hduser/mapper.py  \ -file /home/hduser/reducer.py   -reducer /home/hduser/reducer.py  \ -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

     

    你可以通过指定 -D参数来更改一些Hadoop 设置,比如增加reducer数量

     

      hduser@ubuntu:/usr/local/hadoop bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks =16 ...

     

    注意在命令行中可以接受mapred.reducetasks参数来指定reduce的个数,但是不能仅仅通过指定mapred.reduce.tasks来指定map tasks的个数。

     

       

        整个任务会从HDFS的路径/user/huser/gutenberg 上读取所有的文件,然后处理并输出到HDFS的路径/user/huser/gutnberg-output事实上Hadoop会给每个reducer创建一个输出文件,在我们的例子中只会输出一个文件因为输入的文件很小。上面命令行的输出示例如下:

    01 hduser@ubuntu:/usr/local/hadoop bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output 02  additionalConfSpec_:null 03   null =@@@userJobConfProps_.get(stream.shipped.hadoopstreaming 04  packageJobJar:  [/app/hadoop/tmp/hadoop-unjar54543/ ] 05   [] /tmp/streamjob54544.jar  tmpDir =null 06   [... ] INFO mapred.FileInputFormat: Total input paths to process : 7 07   [... ] INFO streaming.StreamJob: getLocalDirs():  [/app/hadoop/tmp/mapred/local ] 08   [... ] INFO streaming.StreamJob: Running job: job_200803031615_0021 09   [... ] 10   [... ] INFO streaming.StreamJob:  map 0%  reduce 0% 11   [... ] INFO streaming.StreamJob:  map 43%  reduce 0% 12   [... ] INFO streaming.StreamJob:  map 86%  reduce 0% 13   [... ] INFO streaming.StreamJob:  map 100%  reduce 0% 14   [... ] INFO streaming.StreamJob:  map 100%  reduce 33% 15   [... ] INFO streaming.StreamJob:  map 100%  reduce 70% 16   [... ] INFO streaming.StreamJob:  map 100%  reduce 77% 17   [... ] INFO streaming.StreamJob:  map 100%  reduce 100% 18   [... ] INFO streaming.StreamJob: Job  complete: job_200803031615_0021 19   [... ] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output 20 hduser@ubuntu:/usr/local/hadoop $

     

    从上面的输出可以看到,Hadoop还为一些统计信息提供了一个基本的网页接口。在Hadoop集群运行时可以打开 http://localhost:50030

     

     

     

    查看HDFS路径/user/hduser/gutenberg-output上的文件

    你可以通过 dfs -cat命令查看输出文件的内容

      01 hduser@ubuntu:/usr/local/hadoop bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000 02  "(Lo)cra"       1 03  "1490   1 04  "1498, " 1 05  "35 "    1 06  "40, "   1 07  "A      2 08  "AS-IS".        2 09  "A_     1 10  "Absoluti       1 11  [... ] 12 hduser@ubuntu:/usr/local/hadoop $

    注意上面截图中单词两边的双引号并不是Hadoop自己加上去的,而是python程序将单词分割后生成的,不信的话可以查看完整的输出文件。

     

    使用python的迭代器和生成器改进mapperreducer代码

    上面的例子应该让你明白了怎样构建一个MapReduce应用,不过上面那些代码侧重于易读性,特别是针对Python程序员新手。然而,在真实的应用中,你可能需要使用Python的迭代器和生成器来优化你的代码。

    一般来说,迭代器和生成器(产生迭代的函数,比如包含yield输出语句)的优点是只有在你需要使用一个序列的元素时它才会生成该元素。这对于计算量很大或者内存开销很大的任务来说是很有用的。

    注意,下面的MapReduce脚本只有在Hadoop的环境下才能正常运行,也就是说使用本地的命令“cat Data | ./mapper.py |sort -k1,1 |./reucer.py”并不会正常运行,因为有些函数特性在不能离开Hadoop

     

    mapper.py 01  #!/usr/bin/env python 02  """A more advanced Mapper, using Python iterators and generators.""" 03  04  import  sys 05  06  def  read_input( file ): 07      for  line  in  file : 08          # split the line into words 09          yield  line . split() 10  11  def  main( separator = ' \t ' ): 12      # input comes from STDIN (standard input) 13      data  =  read_input( sys . stdin) 14      for  words  in  data : 15          # write the results to STDOUT (standard output); 16          # what we output here will be the input for the 17          # Reduce step, i.e. the input for reducer.py 18          # 19          # tab-delimited; the trivial word count is 1 20          for  word  in  words : 21              print  ' %s%s%d '  % ( word ,  separator ,  1) 22  23  if  __name__  ==  "__main__" : 24      main()

     

     

    reducer.py 01  #!/usr/bin/env python 02  """A more advanced Reducer, using Python iterators and generators.""" 03  04  from  itertools  import  groupby 05  from  operator  import  itemgetter 06  import  sys 07  08  def  read_mapper_output( file ,  separator = ' \t ' ): 09      for  line  in  file : 10          yield  line . rstrip() . split( separator ,  1) 11  12  def  main( separator = ' \t ' ): 13      # input comes from STDIN (standard input) 14      data  =  read_mapper_output( sys . stdin ,  separator = separator) 15      # groupby groups multiple word-count pairs by word, 16      # and creates an iterator that returns consecutive keys and their group: 17      #   current_word - string containing a word (the key) 18      #   group - iterator yielding all ["<current_word>", "<count>"] items 19      for  current_word ,  group  in  groupby( data ,  itemgetter( 0 )): 20          try : 21              total_count  =  sum( int( countfor  current_word ,  count  in  group) 22              print  " %s%s%d "  % ( current_word ,  separator ,  total_count) 23          except  ValueError : 24              # count was not a number, so silently discard this item 25              pass 26  27  if  __name__  ==  "__main__" : 28      main()
    转载请注明原文地址: https://ju.6miu.com/read-900140.html

    最新回复(0)