原文翻译自:http://gora.apache.org/current/tutorial.html#mapreduce-support
在构建Hadoop的Job对象时,我们要确定是否使用Gora作为job的输入、输出或者都使用。类似于FileInputFormat和FileOutputFormat,Gora自己定义了GoraInputFormat, 和GoraOutputFormat,它使用了DataStore作为Job的输入源和输出接收器。Gora的Input和Output类提供了静态的方法去正确的建立一个Job,如果你的mapper和reducer继承了Gora的mapper和reducer那么你就可以去使用这些静态方法而且这样会更加方便。 在这个例子中我们使用Gora作为输入和输出,从下面的createJob函数中我们可以看到,我们使用GoraMapper#initMapperJob()和GoraReducer#initReducerJob()进行输入参数的设置。
public Job createJob(DataStore<Long, Pageview> inStore, DataStore<String, MetricDatum> outStore, int numReducer) throws IOException { Job job = new Job(getConf()); job.setJobName("Log Analytics"); job.setNumReduceTasks(numReducer); job.setJarByClass(getClass()); /* Mappers are initialized with GoraMapper.initMapper() or GoraInputFormat.setInput()*/ GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class, LogAnalyticsMapper.class, true); /* Reducers are initialized with GoraReducer#initReducer(). * If the output is not to be persisted via Gora, any reducer * can be used instead. */ GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class); return job; }GoraMapper#initMapperJob()方法使用了一个dataStore以及一个可选的query去取得数据,在指定了query参数后query所得的结果即作为Job的输入。具体使用的Mapper以及map输出的键和值的类型也都要作为initMapperJob()的参数。GoraReducer#initReducerJob()以一个dataStore以及Reduce类作为参数,GoraReducer#initReducerJob()和GoraMapper#initMapperJob()方法也重写了datastore类而不是datastore实例的方法。
通常情况下,如果gora要作为job的输入,那么Mapper类就需要继承GoraMapper,但是gora并不强制去这样做,其他的方式也同样可以被使用。mapper接收Input query结果的键值对并且将这个接收到的数据传给map。在这里需要注意一点,map的输出结果与输入或输出datastore是不相关的,所以任何的Hadoop序列化键值类都可以被使用。不过Gora的持久化类同样是hadoop可序列化的,Hadoop序列化由PersistentSerialization类处理。Gora还定义了StringSerialization类,以便轻松地序列化字符串。 从下面的代码中可以看到LogAnalytics类中定义了继承GoraMapper的LogAnalyticsMapper类,map函数接受代表行数的Long类型的key,PageView对象从输入的data store中读取。map简单的使用时间戳的日期,并将该键作为<URL, day>的元组输出。
private TextLong tuple; protected void map(Long key, Pageview pageview, Context context) throws IOException, InterruptedException { Utf8 url = pageview.getUrl(); long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString()); tuple.getValue().set(day); context.write(tuple, one); } public class TextLong extends KeyValueWritable<Text, LongWritable> { public TextLong() { key = new Text(); value = new LongWritable(); } }与mapper一样,通常情况下Gora如果要作为job的输出,那么Reducer类需要继承GoraReducer,reduce得到的结果将被存入输出datastore作为job的结果。 以下面代码为例,LogAnalyticsReducer的内部类继承了GoraReducer,这个内部类被作为reducer,reducer只是简单的将所有传入的<URL,day>键值对中的值求和,之后metric dimension对象被构建和操作,即存入输出的datastore中。
protected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0L; for (LongWritable value : values) { sum += value.get(); } String dimension = tuple.getKey().toString(); long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension)); metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString(); metricDatum.setMetric(sum); context.write(key, metricDatum); }