Category Archives: Hadoop Daemons

Install Hadoop in Pseudo Distributed mode

Installing Hadoop in pseudo distributed mode lets you mimic multi server cluster on a single machine. Unlike standalone mode, this mode has all the daemons running. Also the data in pseudo distributed mode is stored in HDFS rather than the local hard disk.

If you have followed the last post, the first three steps of this tutorial are the same.

  1. Create Hadoop user
  2. Install Java
  3. Download and unpack Hadoop
  4. Configure SSH
  5. Configure Hadoop
  6. Format Hadoop NameNode
  7. Start Hadoop
  8. Test Hadoop installation

Create Hadoop User

It is recommended to create a dedicated Hadoop user account to separate the Hadoop installation from other services running on the same machine.

Open System Preference > Users & Groups

Click the ‘+’ button at the bottom of the small window with the list of already existing users. You may need to click on the lock image and enter the administrator name and password. After entering the admin name and password correctly, click on the ‘+’ button and enter the following:-

Full Name: hadoop
Account Name: hadoop

Also set the password for the account. Click on “Create User”. You can now login to the hadoop account to install Hadoop.

Install Java

If you running Mac OS, then you will already have Java installed on your system. But just to make sure, open the terminal and enter the following command.

$:~ java -version
java version "1.6.0_37"
Java(TM) SE Runtime Environment (build 1.6.0_37-b06-434-11M3909)
Java HotSpot(TM) 64-Bit Server VM (build 20.12-b01-434, mixed mode)

By doing so, you will see the version of Java installed on your system (1.6.0_37 in this case). Java 6 or later is required to run Hadoop. If your version number suggest otherwise, please download the latest JDK.

Download and unpack Hadoop

Download Hadoop from the Hadoop release pages (http://hadoop.apache.org/releases.html). Make sure to download the latest stable version of Hadoop (hadoop-1.2.1.tar.gz as of this post). Save the file in /Users/hadoop (or any other location of your choice).

Unpack the file using the following command:-

$:~ tar -xzvf hadoop-1.2.1.tar.gz

Set the owner of the extracted hadoop files to be the hadoop user and group.

$:~ chown -R hadoop hadoop-1.2.1

Configure SSH

SSH (secure shell) allows two networked devices to exchange data using a secure channel. As Pseudo Distributed mode mimics multi server cluster, Hadoop control scripts need SSH to perform cluster wide operations. For example, there is a script start-all.sh to start all the daemons running in the cluster.

To work seamlessly with SSH, we need to setup password-less for hadoop user for machines on the cluster. Since we are in Pseudo distributed mode, we therefore need to setup password-less login to localhost.

To do this, we need to generate public/private key pair and place it in the NFS location that is shared across the cluster.

First generate the key pair by typing the following command in the hadoop user account:

$:~ ssh-keygen -t rsa -f ~/.ssh/id_rsa

This will generate the key pair. It will store the private key in ~/.ssh/id_rsa and the public key will be stored in ~/.ssh/id_rsa.pub.

Now we would like to share the public key will all the machines on the cluster. To do this, we need to make sure that the public key is stored in ~/.ssh/authorized_keys file on all the machines in the cluster that we want to connect to.

To do that, type the following command:

$:~ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

You can now test the password-less ssh login by the following command:

$:~ ssh localhost
$:~ Last login: Mon Aug 19 10:49:42 2013

Configure Hadoop

The Hadoop configuration is more elaborate in comparison to the one in standalone mode as we need to configure the daemons.

Before we go any further, let us understand the different Hadoop configuration files and their usage. These files are stored in $Hadoop_HOME/conf folder.

  1. hadoop-env.sh – Environment variables that are stored in the scripts to run hadoop.
  2. core-site.xml – Configuration settings for Hadoop core, common to HDFS and Mapreduce.
  3. hdfs-site.xml – Configuration settings for HDFS daemons.
  4. mapred-site.xml – Configuration settings for MapReduce daemons.

To start off, we will first of all set the Java Home path so that Hadoop can find the version of Java you want to use. To do this, enter the following in hadoop-env.sh:

export JAVA_HOME=/Library/Java/Home

Set the property fs.default.name, which specifies the location where HDFS resides. We do this by adding the following in core-site.xml under configuration tags:

<property>
   <name>fs.default.name</name>
   <value>http://localhost:9000</value>
</property>

Set the property dfs.replication, which tell HDFS how many copies to make of a block. To do this by adding the following in hdfs-site.xml:

<property>
   <name>dfs.replication</name>
   <value>1</value>
</property>

Set the property mapred.job.tracker, which gives the location where the JobTracker runs. To do this add the following lines in mapred-site.xml.

<property>
   <name>mapred.job.tracker</name>
   <value>http://localhost:9001</value>
</property>

Format Hadoop NameNode

The first step is to format the Hadoop filesystem that is implemented on top of HDFS. You need to do this for the first time you setup hadoop installation.

You can do this by typing the following command:

$:~ hadoop-1.2.1/bin/hadoop namenode -format

You will get an output similar to the following:

13/08/19 12:08:34 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = Prashants-MacBook-Pro.local/***.***.*.**
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 1.1.2
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1440782; compiled by 'hortonfo' on Thu Jan 31 02:03:24 UTC 2013
************************************************************/
13/08/19 12:08:40 INFO util.GSet: VM type       = 64-bit
13/08/19 12:08:40 INFO util.GSet: 2% max memory = 39.83375 MB
13/08/19 12:08:40 INFO util.GSet: capacity      = 2^22 = 4194304 entries
13/08/19 12:08:40 INFO util.GSet: recommended=4194304, actual=4194304
13/08/19 12:08:41 INFO namenode.FSNamesystem: fsOwner=hadoop
13/08/19 12:08:41 INFO namenode.FSNamesystem: supergroup=supergroup
13/08/19 12:08:41 INFO namenode.FSNamesystem: isPermissionEnabled=true
13/08/19 12:08:41 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
13/08/19 12:08:41 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
13/08/19 12:08:41 INFO namenode.NameNode: Caching file names occuring more than 10 times
13/08/19 12:08:41 INFO common.Storage: Image file of size 112 saved in 0 seconds.
13/08/19 12:08:41 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/tmp/hadoop-hadoop/dfs/name/current/edits
13/08/19 12:08:41 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/tmp/hadoop-hadoop/dfs/name/current/edits
13/08/19 12:08:41 INFO common.Storage: Storage directory /tmp/hadoop-hadoop/dfs/name has been successfully formatted.
13/08/19 12:08:41 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at ******-MacBook-Pro.local/***.***.*.**
************************************************************/

Start Hadoop

Start Hadoop essential means running all the Hadoop daemons. To do this, we execute the following command:

$:~ hadoop-1.2.1/bin/start-all.sh
starting namenode, logging to /Users/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-namenode-Prashants-MacBook-Pro.local.out
localhost: starting datanode, logging to /Users/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-datanode-Prashants-MacBook-Pro.local.out
localhost: starting secondarynamenode, logging to /Users/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-secondarynamenode-Prashants-MacBook-Pro.local.out
starting jobtracker, logging to /Users/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-jobtracker-Prashants-MacBook-Pro.local.out
localhost: starting tasktracker, logging to /Users/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-tasktracker-Prashants-MacBook-Pro.local.out

Test Hadoop installation

To test the hadoop installation execute the following command:

$:~ hadoop-1.1.2/bin/hadoop jar hadoop-1.1.2/hadoop-examples-1.1.2.jar pi 10 100

This concludes Hadoop installation in Pseudo Distributed mode. However if you are beginner like me I strongly suggest that you install Cloudera Hadoop Demo Virtual Machine with Virtual Box. Please follow the next post to see how it can done.


References

1. Hadoop – The Definitive Guide (Tom White)
2. http://wiki.apache.org/hadoop/Running_Hadoop_On_OS_X_10.5_64-bit_(Single-Node_Cluster)

Hadoop ≅ HDFS + MapReduce (Part – II)

In this post, we will discuss the following with regards to MapReduce framework :-

  1. Motivation for a parallel processing framework
  2. What is MapReduce?
  3. How MapReduce solves a problem?
  4. MapReduce in detail
    1. Map Phase
    2. Shuffle & Sort Phase
    3. Reduce Phase

Motivation for a parallel processing framework

As you know we are living in the age of Data. Today the size of internet is almost 40+ billion web pages.  If we assume that each page has 20Kb of data, we looking at somewhere close to 800+ terabyte of data. With the reading speed of 100 Mbps, it will take a computer more than 4 months to read the web. And just to store such huge data, we will need 1000’s of computers. 

Clearly we need a mechanism to divide such huge work among various computers. If we give the same work of reading 400+ terabytes of data to say 1000 computers, it can be done in less than 3 hours.

But our task is not limited to reading data. We need also need to process such massive data. Clearly consolidating the data from the nodes and transferring it to the compute nodes will take a lot of time. We therefore need a programming framework that allows for distributing the computation task and running them in parallel.

What is MapReduce?

MapReduce is one such programming model designed for processing large volumes of data in parallel by dividing the work into a set of independent tasks. Each of these tasks are then run on individual node in a cluster. The advantage of MapReduce framework is that, it not only divides the work but also also abstracts the programmer from all the issues that come with distributed computing such as communication and coordination, recovering from machine failure, status reporting, debugging, optimization and data locality.

How MapReduce solves a problem?

A typical problem solved by MapReduce can be broken down into the following steps :-

  1. Read a lot of data
  2. Extract something you are interested in from every record
  3. Shuffle and sort the intermediate results
  4. Aggregate (filter, sort or transform) the intermediate result
  5. Write out the result

MapReduce in detail

The figure below will give you an overview of MapReduce. In MapReduce everything is in terms of key-value pairs. The phases of the MapReduce task are Map, Sort & Shuffle and Reduce.

The two Daemons associated with MapReduce are JobTracker and TaskTracker. The Client submits the job to the master node which runs the JobTracker. The JobTracker then assigns Map and Reduce tasks to other nodes in the cluster. These nodes run the TaskTracker daemons that starts the Map and Reduce tasks on the nodes and send the progress report to the JobTracker. So basically JobTracker is the MasterNode and TaskTrackers are the slaves.

Overview of MapReduce

Overview of MapReduce

Map Phase

One of the major advantages of using Hadoop is Data Locality. Hadoop always tries to take the computation to the data rather than to transfer data to the computation node. In other words a node is made the Mapper that is closest to the data (if possible the same node that has the data is made the mapper). This greatly reduces the network traffic and the time to transfer data from one node to another.

If you can see the figure above, multiple mappers run the Map tasks in parallel, each processing a portion if the input data. The Mappers receives the input interms of key-value pairs. It then emits a list of intermediate key and value pair (zero of more).

map(key, value) -> (intermediate_key, intermediate_value)

Map is essentially a function that takes a key-value pair as input and emits intermediate key-value pair. Let us concrete our understanding by taking an example of “word frequency” Mapper. Here our goal is to find the frequency of the words in a document. The key here is the offset of the word (where the word is found in the document) and the value is the word itself. When this is passed to the Mapper, it emits the word as key and 1 as the value.

For instance, let the document be “to be or not to be”.

So the Map function would look something like,

map(k, v) = (v, 1)

map(offest1, 'to') = ('to', 1)

map(offest2, 'be') = ('be', 1) 

map(offest3, 'or') = ('or', 1)

map(offest4, 'not') = ('not', 1)

map(offest5, 'to') = ('to', 1)

map(offest6, 'be') = ('be', 1)

Sort & Shuffle phase

After all the Map phases are complete, every pair has an intermediate key and value. Now all the values that have the same key are combined together and then they are sorted. This is the sort and shuffle phase of MapReduce. It is important to note that the output of the Map phase is stored on the local hard disk and not on the HDFS.

Continuing with our example, the output after Sort & Shuffle phase will be,

('to', [1, 1]), ('be', [1, 1]), ('or', [1]), ('not', [1])

Reduce Phase

After the Sort & Shuffle phase, the intermediate lists of key-value pair is fed to the Reducer, where it is subjected to the Reduce function. The Reducer emits zero/more final key value pair. Usually, a reducer will emit one key value pair for every intermediate key. The output of the Reducer is written on to the HDFS.

The Reduce function here will be to add all the intermediate values associated with a particular key. So the final output will be,

('to', 2), ('be', 2), ('or', 1)

As you see, the Reducer will give us the required output (which is the word frequency in the document).

To conclude, I reiterate that MapReduce is a great abstraction that allows programmers to focus only on the problem rather that all the messy details that need to be looked into while programming for large scale distributed development.


References

1. http://developer.yahoo.com/hadoop/tutorial/module4.html
2. Hadoop – The Definitive Guide (Tom White)

Hadoop ≅ HDFS + MapReduce (Part – I)

We discussed in the last post that Hadoop has many components in its ecosystem such as Pig, Hive, HBase, Flume, Sqoop, Oozie etc. But the two core components that forms the kernel of Hadoop are HDFS and MapReduce. We will discuss HDFS in more detail in this post.

HDFS is Hadoop Distributed File System, which is responsible for storing data on the cluster in Hadoop. Files in HDFS are split into blocks before they are stored on the cluster. The typical size of a block is 64MB or 128MB. The blocks belonging to one file are then stored on different nodes. The blocks are also replicated to ensure high reliability. To delve deeper into how HDFS achieves all this, we need to first understand Hadoop Daemons.

Hadoop Daemons

Hadoop Daemons

Daemons in computing terms is a process that runs in the background. Hadoop has five such daemons. They are NameNode, Secondary NameNode, DataNode, JobTracker and TaskTracker. Each daemons runs separately in its own JVM. We discuss about NameNode, Secondary NameNode and DataNode in this post as they are associated with HDFS.

  • NameNode – Is is the Master node which is responsible for storing the meta-data for all the files and directories. It has information such as the blocks that make a file, and where are those blocks located in the cluster.
  • DataNode – It is the Slave node that contains the actual data. It reports information of the blocks it contains to the NameNode in a periodic fashion.

It should be understood that it is highly important the NameNode runs all the time. The failure of the NameNode makes the cluster inaccessible as there would be no information on where the files are located in the cluster. For the very reason, we have secondary NameNode.

  • Secondary NameNode – It periodically merges changes in the NameNode with the edit log so that it doesn’t grow too large in size. It also keeps a copy of the image which can be used in case of failure of NameNode.

We will now discuss the following with respects to HDFS:-

  1. Writing file on the cluster
  2. Reading file from the cluster
  3. Fault tolerance Strategy
  4. Replication Strategy

Writing file on the cluster

  1. The user through a client, requests to write data on Hadoop cluster.
  2. The user sets the replication factor (default 3) and block size through the configuration options.
  3. The client splits the file into blocks and contacts the NameNode.
  4. The NameNode returns the DataNodes (in increasing order of the distance from the client node).
  5. The Client sends the data to the first DataNode, which while receiving the data, transfers the same to the next DataNode (which does the same and this forms the replication pipeline).
  6. The DataNodes send acknowledgments to the NameNode on successfully receiving the data.
  7. The Client repeats the same process for all the other blocks that constitute the file.
  8. When all the blocks are written on the cluster, the NameNode closes the file and stores the meta-data information.

Reading data from the cluster

  1. The user provides the filename to the client.
  2. The Client passes the filename to the NameNode.
  3. The NameNode sends the name of the blocks that constitute the file. Its also sends the location (DataNode) where the blocks are available (again in increasing order of the distance from the client).
  4. The Client then downloads the data from the nearest DataNode.

Fault tolerance strategy

There are three types of failure that can occur, namely, node failure, communication failure and data corruption.

  • In case of NameNode failure, the responsibility of the Secondary NameNode comes into play. The NameNode then has to restored with the help of the merged copy of the NameNode image.
  • The DataNode sends a heartbeat message to the NameNode every 3 seconds to inform the NameNode that it is alive. If the NameNode doesn’t receive a heartbeat message from the DataNode in 10 mins (configurable), it considers the DataNode to be dead. It then stores the replica of the block in some other DataNode.
  • The Client receives an ACK form the DataNode that it has received the data. If it doesn’t after several tries, it is understood that either there is network failure or the DataNode has failed.
  • Checksum is sent along with the data to look for data corruption.
  • Periodically the DataNodes sends the report containing the list of blocks that are uncorrupted. The NameNode then updates the list of valid blocks a DataNode contains.
  • For all such under replicated blocks, the NameNode adds other DataNodes to the replication pipeline.

Replication strategy

The replication factor is set the 3 by default (can be configured). The cluster is split in terms of racks, where each rack contains DataNodes.

  • The NameNode tries to make the client as the first DataNode replica. If it is not free then any node in the same rack as that of the client is made the first replica.
  • Then the other two replicas are stored on two different DataNodes on a rack different from the rack of the first replica.

Before I end this post. I would like to point out that HDFS is not fit for all types of applications and files.

HDFS is a fit when,

  • Files to be stored are large in size.
  • Your application need to write once and read many times.
  •  You want to use cheap, commonly available hardware.

And it is not a fit when,

  • You want to store a large number of small files. It is better to store million of large file when compare to billions of small files.
  • There are multiple writers. It is only designed for writing at the end of file and not at a random offset.

I hope this post gives you an overview of HDFS and how it provides failure support, data recoverability and consistency. In the next post we will talk about MapReduce in detail.


References

1. Hadoop – The Definitive Guide (Tom White)