Friday, February 15, 2013

Hadoop Secondary Sort: Sorting values

Sorting is a core strength of the Hadoop MapReduce framework. But by default it sorts only the keys. The values for each key are not sorted. There are many use cases where the values for a key are required in a sorted order. For example, let us say a web application logs users interaction - user id, time and other log data. The log data is distributed across log files from different servers. The requirement is to get users log records in  a chronological order.

The input to the map is the set of log records. Let us say (userid,time) not in any order.

The output from the reducer needs to be sorted by user by time.

user1, t1
user1, t2
user1, tn
usern, t1

where for each user, t1 < t2 ..... < tn.

We know Hadoop sorts the keys. We could make the map output a key that is a combination of user and time. In other words, a composite key that is a combination of userid and time. We need to write a comparator that hadoop can use to sort using the composite key. However a side effect of this is that hadoop will send records for the same user to different reducers. This means you will be not be able to reduce all the users records as a group.

Remember, the hadoop framework sorts the output of a Map by key. It then partitions the output. Each partition is intended for a Reducer.  All values for a key from a Map are in the same partition. We want all the records for a user to go to the same Reducer. This implies they need to be in the same partition. Fortunately there is a way to influence partitioning. We tell hadoop to put all records for a user in the same partition by implementing a partioner that partitions just based on user id and not the composite key.

A Reducer receives partitions from several Mappers. Remember that reducer is called with a key and list of values for that key. Before the framework can call the reducer, it has to group the values for that key from all the partitions.  We want the grouping to happen based on userid ( not userid + time).
So we need to implement a grouping comparator that hadoop uses for grouping and this will compare userids. Since the records in each partition are sorted by userid and time, grouping which is a merge process preserves the sort order - like a merge sort.

In summary you need to

Step 1: Make the map output key a composite of the natural key and value
 Make the Map ouput key a composite of the natural key (userid) and the value (time). A composite key implements a WritableComparable. You need to override the compareTo method to use both userid and time. This method is used to order the keys using the composite key. You also need to override the write and readFields method which are called for serialization and deserialization.
You tell Hadoop to use this composite key by calling the method
job.setOutputKeyClass(UserTime.class) ;

Map output would be like

(user1,t1) , t1
Step 2: Partition Map output using only the natural key
To ensure all values for the key go the same reducer, you need to implement a partitioner. This is a class that extends org.apache.hadoop.mapreduce.Partitioner. Override the getPartition method to return a partition based on the natural key user id. You tell hadoop to partition using this partitioner with the call
job.setPartitionerClass(NaturalKeyPartitioner.class) ;

Partition from Map 1:

Partition from Map 2:

Step 3: Group values using only the natural key
To ensure the reducer gets called with all the values for the key, you need to implement a WritableComparator based on the natural key user id. Override the compare method to compare based on the userid. You tell hadoop to use this comparator for grouping with the call
job.setGroupingComparatorClass(NaturalKeyGroupComparator.class) ;

Input to Reducer 1:
key = (user1,t1), values = t1,t2,t3,t4

Input to Reducer 2:
key = (user2,t1), values = t1,t2,t3,t4

Output from Reducer1:
user1, t1
user1, t2
user1, t3
user1, t4

The complete sample source code is at SecondarySort.jar.