Friday, February 28, 2014

Hadoop 2.x cluster setup tutorial

Recently I had the opportunity to setup a multi-node hadoop cluster. The apache documentation is a little thin and I had to spend several hours trouble shooting issues and googling for help before I got it right.  The purpose of this tutorial is to save time for those setting up a hadoop cluster for the first time. If you are new to hadoop, you may read my tutorial on single node setup at Hadoop 2.x tutorial. If you have never setup hadoop before, it is a good idea to to do a single node setup the first time and then try the multi node setup.

In this tutorial we will

(1) set up a multi node hadoop cluster with 4 servers
(2) To test, copy files to hdfs and cat the files.
(3) To test, run a simple map reduce job that we developed in the single node tutorial

Step 1: Download a stable hadoop release and untar it.

Download a stable 2.x.x release from
I downloaded hadoop-2.2.0.tar.gz.
Untar the file to a directory say ~/hadoop-2.2.0.

Step 2: Decide on the topology

For this tutorial , we shall setup a 4 node cluster.

Host : master
Host : slave1
Host : slave2
Host : slave3

On the master we will run the namenode, resourcemanager, datanode, nodemanager and historyserver. On the slaves , we run the datanode and nodemanager.

To make it more real world, we will bring up and test the cluster first with just master, slave1 and slave2. Typically you add more capacity as needed. So we will add slave3 after the fact.

Step 3: Ensure proper network connectivity

I am not going to cover networking details here. But it goes without saying, the master should be able to reach the slaves using their hostnames and the slaves should be able to reach the master. So you may have to add the hostname to ip address mapping in /etc/hosts.

Several startup scripts use ssh to connect and start processes on hosts. So ssh must be setup for password less login on all hosts.

Step 4: Set these environment variables
export HADOOP_HOME=path_to_hadoop_install_dir
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
To hadoop_install_dir/libexec/, Add
export JAVA_HOME=path_to_jdk
Step 5: Update the config files
These files are located at hadoop_install_dir/etc/hadoop

 This applies to all servers.

This applies to all servers.
    <name>dfs.replication </name>
    <value>2 </value>

</property >
   <name>dfs.permissions </name>
   <value>false </value>

  <name> </name>
  <value>file:/mnt/hadoop/data/namenode </value>

  <name> </name>
  <value>file:/mnt/hadoop/data/datanode </value>

</configuration> points to location where namenode stores metadata. points to location where datanode stores data. It is important to put these directories on disks that have lots of free space ( terra bytes). Default block sizes are large and if there is not enough space you will encounter errors that do not clearly point out the space issue.

This applies to all servers.







The last 3 properties tell the nodemanager how to connect to the resourcemanager.

Only in the yarn-site.xml of the master, add
Without this, nodemanager will not start on the master.




This is necessary on the master only and is used by the scripts to start datanodes and nodemanagers on the servers listed.


Step 5: Start the processes

Change to the directory where hadoop is installed.
cd hadoop_install_dir

If you are running hadoop for the first time, the following command will format HDFS. Do not run this everytime as it formats and thus deletes any existing data

hadoop_install_dir$ bin/hadoop namenode -format

Start the namenode.
hadoop_install_dir$ sbin/ start namenode

Start the datanode.
hadoop_install_dir$ bin/ start datanode

Note that the script name is -daemons. This starts datanode on all the servers listed in the slaves file. If you use the -daemon script, it will only start the datanode on the server on which you ran the script.

In hadoop 2.x , there is no jobtracker. Instead there is a resourcemanager and a nodemanager.
Start the resourcemanager.
hadoop_install_dir$ sbin/ start resourcemanager

Start the nodemanager.
hadoop_install_dir$ sbin/ start nodemanager
As mentioned in the case of  datanode, the -daemons script will start the nodemanager on all servers listed in the slave file , where as -daemon script with start it only the server on which the script is executed.

Start the history server.
hadoop_install_dir$ sbin/ start historyserver

On the master, type jps. It lists the java processes running. Check that all the processes are started

hadoop_install_dir$ jps

1380 DataNode
1558 Jps
1433 ResourceManager
1536 JobHistoryServer
1335 NameNode
1849 NodeManager

Do the same on each of the slaves

hadoop_install_dir$ jps

1380 DataNode
1558 Jps
1849 NodeManager

The jps command is a good check to ensure all necessary processes are started.

You can use the following urls to see the state of the cluster.

For YARN/Mapreduce

Step 6: Test HDFS

The HDFS commands are documented at

hadoop_install_dir$ bin/hdfs dfs -ls /

will list the root directory

hadoop_install_dir$ bin/hdfs dfs -mkdir /input
creates a directory input

In the local filesystem create a file app.log with the data


Let us pretend this is a log file from a web application where for each request we have logged userid and some additional data. We will later use this as input for a MapReduce program.
You can move it to hdfs using the command

hadoop_install_dir$ bin/hdfs dfs -moveFromLocal ~/projects/app.log /input/

To print the file just moved to hdfs
hadoop_install_dir$ bin/hdfs dfs -cat /input/app.log

Step 7: Run a map reduce program

Writing and running MR program is no different from what we did in the single node tutorial. See Step 6 and Step 7 of the single node tutorial at Hadoop 2.x tutorial.

Step 8: Add additional capacity

Your hadoop cluster is working well. But you are running out of space and your MR jobs are backed up because not enough mappers or reducers are available. Let us increase capacity by adding an additional server slave3.

On slave3, do
Step 1 -- untar that hadoop binaries
Step 3 -- ensure network connectivity
Step 4 -- set the environment variables
Step 5 -- setup the config files

Start the data node
hadoop_install_dir$ bin/ start datanode

Start the nodemanager
hadoop_install_dir$ sbin/ start nodemanager

Use jps to check processes started. Use the web urls to check that the node got added to the cluster. 

Hoping these steps help jumpstart your hadoop cluster setup and save you time.

Friday, February 15, 2013

Hadoop Secondary Sort: Sorting values

Sorting is a core strength of the Hadoop MapReduce framework. But by default it sorts only the keys. The values for each key are not sorted. There are many use cases where the values for a key are required in a sorted order. For example, let us say a web application logs users interaction - user id, time and other log data. The log data is distributed across log files from different servers. The requirement is to get users log records in  a chronological order.

The input to the map is the set of log records. Let us say (userid,time) not in any order.

The output from the reducer needs to be sorted by user by time.

user1, t1
user1, t2
user1, tn
usern, t1

where for each user, t1 < t2 ..... < tn.

We know Hadoop sorts the keys. We could make the map output a key that is a combination of user and time. In other words, a composite key that is a combination of userid and time. We need to write a comparator that hadoop can use to sort using the composite key. However a side effect of this is that hadoop will send records for the same user to different reducers. This means you will be not be able to reduce all the users records as a group.

Remember, the hadoop framework sorts the output of a Map by key. It then partitions the output. Each partition is intended for a Reducer.  All values for a key from a Map are in the same partition. We want all the records for a user to go to the same Reducer. This implies they need to be in the same partition. Fortunately there is a way to influence partitioning. We tell hadoop to put all records for a user in the same partition by implementing a partioner that partitions just based on user id and not the composite key.

A Reducer receives partitions from several Mappers. Remember that reducer is called with a key and list of values for that key. Before the framework can call the reducer, it has to group the values for that key from all the partitions.  We want the grouping to happen based on userid ( not userid + time).
So we need to implement a grouping comparator that hadoop uses for grouping and this will compare userids. Since the records in each partition are sorted by userid and time, grouping which is a merge process preserves the sort order - like a merge sort.

In summary you need to

Step 1: Make the map output key a composite of the natural key and value
 Make the Map ouput key a composite of the natural key (userid) and the value (time). A composite key implements a WritableComparable. You need to override the compareTo method to use both userid and time. This method is used to order the keys using the composite key. You also need to override the write and readFields method which are called for serialization and deserialization.
You tell Hadoop to use this composite key by calling the method
job.setOutputKeyClass(UserTime.class) ;

Map output would be like

(user1,t1) , t1
Step 2: Partition Map output using only the natural key
To ensure all values for the key go the same reducer, you need to implement a partitioner. This is a class that extends org.apache.hadoop.mapreduce.Partitioner. Override the getPartition method to return a partition based on the natural key user id. You tell hadoop to partition using this partitioner with the call
job.setPartitionerClass(NaturalKeyPartitioner.class) ;

Partition from Map 1:

Partition from Map 2:

Step 3: Group values using only the natural key
To ensure the reducer gets called with all the values for the key, you need to implement a WritableComparator based on the natural key user id. Override the compare method to compare based on the userid. You tell hadoop to use this comparator for grouping with the call
job.setGroupingComparatorClass(NaturalKeyGroupComparator.class) ;

Input to Reducer 1:
key = (user1,t1), values = t1,t2,t3,t4

Input to Reducer 2:
key = (user2,t1), values = t1,t2,t3,t4

Output from Reducer1:
user1, t1
user1, t2
user1, t3
user1, t4

The complete sample source code is at SecondarySort.jar.