程序包 org.apache.hadoop.mapred.lib.aggregate

Classes for performing various counting and aggregations.

请参阅: 说明

程序包org.apache.hadoop.mapred.lib.aggregate的说明

Classes for performing various counting and aggregations.

Aggregate framework

Generally speaking, in order to implement an application using Map/Reduce model, the developer needs to implement Map and Reduce functions (and possibly Combine function). However, for a lot of applications related to counting and statistics computing, these functions have very similar characteristics. This provides a package implementing those patterns. In particular, the package provides a generic mapper class, a reducer class and a combiner class, and a set of built-in value aggregators. It also provides a generic utility class, ValueAggregatorJob, that offers a static function that creates map/reduce jobs:

public static JobConf createValueAggregatorJob(String args[]) throws IOException;
To call this function, the user needs to pass in arguments specifying the input directories, the output directory, the number of reducers, the input data format (textinputformat or sequencefileinputformat), and a file specifying user plugin class(es) to load by the mapper. A user plugin class is responsible for specifying what aggregators to use and what values are for which aggregators. A plugin class must implement the following interface:
 public interface ValueAggregatorDescriptor { 
     public ArrayList<Entry> generateKeyValPairs(Object key, Object value); 
     public void configure(JobConfjob); 
} 
Function generateKeyValPairs will generate aggregation key/value pairs for the input key/value pair. Each aggregation key encodes two pieces of information: the aggregation type and aggregation ID. The value is the value to be aggregated onto the aggregation ID according to the aggregation type. Here is a simple example user plugin class for counting the words in the input texts:
public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor { 
    public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
        String words [] = val.toString().split(" |\t");
        ArrayList<Entry> retv = new ArrayList<Entry>();
        for (int i = 0; i < words.length; i++) {
            retv.add(generateEntry(LONG_VALUE_SUM, words[i], ONE))
        }
        return retv;
    }
    public void configure(JobConf job) {}
} 
In the above code, LONG_VALUE_SUM is a string denoting the aggregation type LongValueSum, which sums over long values. ONE denotes a string "1". Function generateEntry(LONG_VALUE_SUM, words[i], ONE) will inperpret the first argument as an aggregation type, the second as an aggregation ID, and the the third argumnent as the value to be aggregated. The output will look like: "LongValueSum:xxxx", where XXXX is the string value of words[i]. The value will be "1". The mapper will call generateKeyValPairs(Object key, Object val) for each input key/value pair to generate the desired aggregation id/value pairs. The down stream combiner/reducer will interpret these pairs as adding one to the aggregator XXXX.

Class ValueAggregatorBaseDescriptor is a base class that user plugin classes can extend. Here is the XML fragment specifying the user plugin class:

<property>
    <name>aggregator.descriptor.num</name>
    <value>1</value>
</property>
<property>
   <name>aggregator.descriptor.0</name>
   <value>UserDefined,org.apache.hadoop.mapred.lib.aggregate.examples.WordCountAggregatorDescriptor</value>
</property> 
Class ValueAggregatorBaseDescriptor itself provides a default implementation for generateKeyValPairs:
public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
   ArrayList<Entry> retv = new ArrayList<Entry>();     
   String countType = LONG_VALUE_SUM;
   String id = "record_count";
   retv.add(generateEntry(countType, id, ONE));
   return retv;
}
Thus, if no user plugin class is specified, the default behavior of the map/reduce job is to count the number of records (lines) in the imput files.

During runtime, the mapper will invoke the generateKeyValPairs function for each input key/value pair, and emit the generated key/value pairs:

public void map(WritableComparable key, Writable value,
            OutputCollector output, Reporter reporter) throws IOException {
   Iterator iter = this.aggregatorDescriptorList.iterator();
   while (iter.hasNext()) {
       ValueAggregatorDescriptor ad = (ValueAggregatorDescriptor) iter.next();
       Iterator<Entry> ens = ad.generateKeyValPairs(key, value).iterator();
       while (ens.hasNext()) {
           Entry en = ens.next();
           output.collect((WritableComparable)en.getKey(), (Writable)en.getValue());
       }
   }
}
The reducer will create an aggregator object for each key/value list pair, and perform the appropriate aggregation. At the end, it will emit the aggregator's results:
public void reduce(WritableComparable key, Iterator values,
            OutputCollector output, Reporter reporter) throws IOException {
   String keyStr = key.toString();
   int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
   String type = keyStr.substring(0,pos);
   keyStr = keyStr.substring(pos+ValueAggregatorDescriptor.TYPE_SEPARATOR.length());       
   ValueAggregator aggregator = 
       ValueAggregatorBaseDescriptor.generateValueAggregator(type);
   while (values.hasNext()) {
       aggregator.addNextValue(values.next());
   }         
   String val = aggregator.getReport();
   key = new Text(keyStr);
   output.collect(key, new Text(val)); 
}
In order to be able to use combiner, all the aggregation type be aggregators must be associative and communitive. The following are the types supported:

Create and run an application

To create an application, the user needs to do the following things:

1. Implement a user plugin:

import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor;
import org.apache.hadoop.mapred.JobConf;

public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor {
   public void map(WritableComparable key, Writable value,
            OutputCollector output, Reporter reporter) throws IOException {
   }
   public void configure(JobConf job) {
    
   } 
}
2. Create an xml file specifying the user plugin.

3. Compile your java class and create a jar file, say wc.jar.

Finally, run the job:

        hadoop jar wc.jar org.apache.hadoop.mapred.lib.aggregate..ValueAggregatorJob indirs outdir numofreducers textinputformat|sequencefileinputformat spec_file

Copyright © 2009 The Apache Software Foundation