MongoDB与Hadoop结合部署 下载mongo-hadoop-core-2.0.1.jar http://central.maven.org/maven2/org/mongodb/mongo-hadoop/mongo-hadoop-core/2.0.1/mongo-hadoop-core-2.0.1.jar 下载mongo-java-driver-3.3.0.jar http://central.maven.org/maven2/org/mongodb/mongo-java-driver/3.3.0/mongo-java-driver-3.3.0.jar 将mongo-hadoop-core-2.0.1.jar和mongo-java-driver-3.3.0.jar两个包发布到Hadoop集群节点上:
方案一:将文件复制到所有节点目录$HADOOP_PREFIX/share/hadoop/common方案二:使用DistributedCache分发到集群节点上。方案三:使用hadoop jar命令的-libjars选项。 $ hadoop jar -libjars mongo-hadoop-core.jar,mongo-java-driver.jar MyJob.jar com.mycompany.HadoopJob编写MapReduce程序 MongoDB Hadoop Connector使用MongoDB集群或BSON文件处理input/output formats Mapreduce 1.x对应的包:com.mongodb.hadoop.mapred Mapreduce 2.x对应的包:com.mongodb.hadoop Enron E-mails实例 Enron数据资料下载:http://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/ 解压下载的数据文件enron_mongo.tar.bz2
$ tar jxf enron_mongo.tar.bz2将数据资料恢复到mongodb
$ bin/mongorestore -d testdb --drop /opt/datas/mongodb/backup/dump/enron_mail/ -h mongodb01 --port 27051将恢复后的collection: messages创建index
mongos> db.messages.ensureIndex({filename:1});将collection: messages设置为分片
mongos> sh.shardCollection(testdb.messages,{filename:1});mongodb集群搭建参考我的博文【MongoDB实现分片复制集】http://blog.csdn.net/liuguangrong/article/details/52383953 需求分析 每封邮件对应messages中的一个document。headers中包含sender和recipients, 此需求是统计sender和recipients对在messages集合中出现的次数。 messages document样例
{ "_id" : ObjectId("4f2ad4c4d1e2d3f15a000000"), "body" : "Here is our forecast\n\n ", "subFolder" : "allen-p/_sent_mail", "mailbox" : "maildir", "filename" : "1.", "headers" : { "X-cc" : "", "From" : "phillip.allen@enron.com", "Subject" : "", "X-Folder" : "\\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\'Sent Mail", "Content-Transfer-Encoding" : "7bit", "X-bcc" : "", "To" : "tim.belden@enron.com", "X-Origin" : "Allen-P", "X-FileName" : "pallen (Non-Privileged).pst", "X-From" : "Phillip K Allen", "Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)", "X-To" : "Tim Belden ", "Message-ID" : "<18782981.1075855378110.JavaMail.evans@thyme>", "Content-Type" : "text/plain; charset=us-ascii", "Mime-Version" : "1.0" } }编程实现 MailPair.java
package mongo.enron.emails; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class MailPair implements WritableComparable { private String from; private String to; public MailPair() { } public MailPair(final String from, final String to) { this.set(from, to); } public void set(final String from, final String to) { this.setFrom(from); this.setTo(to); } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public void readFields(final DataInput in) throws IOException { this.setFrom(in.readUTF()); this.setTo(in.readUTF()); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.from); out.writeUTF(this.to); } @Override public int compareTo(Object o) { if (!(o instanceof MailPair)) { return -1; } MailPair mp = (MailPair) o; int first = from.compareTo(mp.from); if (first != 0) { return first; } int second = to.compareTo(mp.to); if (second != 0) { return second; } return 0; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((from == null) ? 0 : from.hashCode()); result = prime * result + ((to == null) ? 0 : to.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; MailPair other = (MailPair) obj; if (from == null) { if (other.from != null) return false; } else if (!from.equals(other.from)) return false; if (to == null) { if (other.to != null) return false; } else if (!to.equals(other.to)) return false; return true; } }EnronMailMapper.java
package mongo.enron.emails; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.bson.BSONObject; public class EnronMailMapper extends Mapper<Object, BSONObject, MailPair, IntWritable> { private final IntWritable outputValue; private final MailPair outputKey; public EnronMailMapper() { super(); outputValue = new IntWritable(1); outputKey = new MailPair(); } @Override protected void map(Object key, BSONObject value, Context context) throws IOException, InterruptedException { BSONObject headers = (BSONObject) value.get("headers"); String to = (String) headers.get("To"); if (null != to) { String[] recipients = to.split(","); for (final String recip : recipients) { String recipient = recip.trim(); if (recip.length() > 0) { outputKey.set((String) key, recipient); context.write(outputKey, outputValue); } } } } }EnronMailReducer.java
package mongo.enron.emails; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.bson.BSONObject; import com.mongodb.BasicDBObjectBuilder; import com.mongodb.hadoop.io.BSONWritable; public class EnronMailReducer extends Reducer<MailPair, IntWritable, BSONWritable, IntWritable> { private BSONWritable outputKey; private IntWritable outputValue; public EnronMailReducer() { outputKey = new BSONWritable(); outputValue = new IntWritable(); } @Override protected void reduce(MailPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (final IntWritable value : values) { sum += value.get(); } BSONObject bson = BasicDBObjectBuilder.start().add("f", key.getFrom()) .add("t", key.getTo()).get(); outputKey.setDoc(bson); outputValue.set(sum); context.write(outputKey, outputValue); } }EnronMail.java
package mongo.enron.emails; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ToolRunner; import com.mongodb.hadoop.MongoConfig; import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.hadoop.MongoOutputFormat; import com.mongodb.hadoop.util.MongoConfigUtil; import com.mongodb.hadoop.util.MongoTool; public class EnronMail extends MongoTool { public EnronMail() { JobConf conf = new JobConf(new Configuration()); if (MongoTool.isMapRedV1()) { } else { MongoConfigUtil.setInputFormat(conf, MongoInputFormat.class); MongoConfigUtil.setOutputFormat(conf, MongoOutputFormat.class); } //FileInputFormat.addInputPath(conf, new Path("/messages")); MongoConfig config = new MongoConfig(conf); config.setInputKey("headers.From"); config.setMapper(EnronMailMapper.class); config.setReducer(EnronMailReducer.class); config.setMapperOutputKey(MailPair.class); config.setMapperOutputValue(IntWritable.class); config.setOutputKey(MailPair.class); config.setOutputValue(IntWritable.class); config.setInputURI("mongodb://mongodb01:27051,mongodb02:27051,mongodb03:27051/testdb.messages"); config.setOutputURI("mongodb://mongodb01:27051,mongodb02:27051,mongodb03:27051/testdb.mailpairs"); setConf(conf); } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new EnronMail(), args)); } }打jar包程序(略),在Hadoop YARN环境运行这个mapreduce实例:
$ bin/yarn jar enronmail.jar参考资料 https://github.com/mongodb/mongo-hadoop/wiki/MapReduce-Usage