Monthly Archives: November 2013

Implementing Custom Writables in Hadoop – BigramCount

Hello everyone,

Apologies for the delay in coming up with this post. I was caught up with my studies. Anyways, today we are going to see how to implement a custom Writable in Hadoop. But before we get into that, let us understand some basics and get the motivation behind implementing a custom Writable.

We will discuss the following in this post:

  1. What is a Writable in Hadoop?
  2. Why does Hadoop use Writable(s)?
  3. Limitation of primitive Hadoop Writable classes
  4. Custom Writable
  5. BigramCount Example
    • Code the custom Writable class
    • Code the Mapper class
    • Code the Reducer class
    • Code the Driver class
  6. Setup the input directory in HDFS
  7. Run the job

What is a Writable in Hadoop?

If you have gone through the “Hello World” of MapReduce post, or any other Hadoop program, you must have seen data types different from regular Java defined data types. In wordCount post, you must have seen LongWritable, IntWrtitable and Text. It is fairly easy to understand the relation between them and Java’s primitive types. LongWritable is equivalent to long, IntWritable to int and Text to String.

Any value in Hadoop must be Writable. A Writable in an interface in Hadoop and types in Hadoop must implement this interface. Hadoop provides these writable wrappers for almost all Java primitive types and some other types.

Now the obvious question is why does Hadoop use these types instead of Java types?

Why does Hadoop use Writable(s)?

As we already know, data needs to be transmitted between different nodes in a distributed computing environment. This requires serialization and deserialization of data to convert the data that is in structured format to byte stream and vice-versa. Hadoop therefore uses simple and efficient serialization protocol to serialize data between map and reduce phase and these are called Writable(s). Some of the examples of writables as already mentioned before are IntWritable, LongWritable, BooleanWritable and FloatWritable. The entire list is in org.apache.hadoop.io package of the Hadoop Source (http://hadoop.apache.org/docs/current/api/index.html).

Limitation of primitive Hadoop Writable classes

In the wordCount example we emit Text as the key and IntWritable as the value from the Mappers and Reducers. Although Hadoop provides many primitive Writable that can be used in simple applications like wordcount, but clearly these cannot serve our purpose all the time.

Consider a scenario where we would like to transmit a 3-D point as a value from the Mappers/Reducers. The structure of the 3D point would be like,

class point3D
{
    public float x;
    public float y;
    public float z;
}

Now if you want to still use the primitive Hadoop Writable(s), you would have to convert the value into a string and transmit it. However it gets very messy when you have to deal with string manipulations.

Also, what if you want to transmit this as a key? As we already know Hadoop does the sorting and shuffling automatically, then these point will get sorted based on string values, which would not be correct. So clearly we need to write custom data types that can be used in Hadoop.

Custom Writable

So any user defined class that implements the Writable interface is a custom writable. So let us first look into the structure of writable interface.

public interface Writable
{
    void readFields(DataInput in);
    void write(DataOutput out);
}

So the class implementing this interface must provide the implementation of these two method at the very least. So let us now look into these two methods in detail.

write(DataOutput out) – It is used to serialize the fields of the object to ‘out’.
readFields(DataInput in) – It is used to deserialize the fields of the object from ‘in’.

However, we need a custom Writable comparable if our custom data type is going to be used as key rather that the value. We then need the class to implement WritableComparable interface. The WritableComparable interface extends from the Writable interface and the Compararble interface its structure is as given below:

public interface WritableComparable extends Writable, Comparable
{
    void readFields(DataInput in);
    void write(DataOutput out);
    int compareTo(WritableComparable o)
}

compareTo(WritableComparable o) – It is inherited from Comparable interface and it allows Hadoop to sort the keys in the sort and shuffle phase.

BigramCount Example

Let us know look into the BigramCount example which will solidify the concepts that we have learnt till now in this post. This example is a good extension to the wordCount example, and will also teach us how to write a custom Writable.

Code the custom Writable class

In BigramCount we need to count the frequency of the occurrence of two words together in the text. So we are going to define a custom class that is going to hold the two words together.

The code for that is as given below:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable {

	private Text first;
	private Text second;

	public TextPair(Text first, Text second) {
		set(first, second);
	}

	public TextPair() {
		set(new Text(), new Text());
	}

	public TextPair(String first, String second) {
		set(new Text(first), new Text(second));
	}

	public Text getFirst() {
		return first;
	}

	public Text getSecond() {
		return second;
	}

	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}

	@Override
	public String toString() {
		return first + " " + second;
	}

	@Override
	public int compareTo(TextPair tp) {
		int cmp = first.compareTo(tp.first);

		if (cmp != 0) {
			return cmp;
		}

		return second.compareTo(tp.second);
	}

	@Override
	public int hashCode(){
		return first.hashCode()*163 + second.hashCode();
	}

	@Override
	public boolean equals(Object o)
	{
		if(o instanceof TextPair)
		{
			TextPair tp = (TextPair) o;
			return first.equals(tp.first) && second.equals(tp.second);
		}
		return false;
	}

}

We have already seen the explanation of readFields(), write() and compareTo(). And just as you would for any value object you write in Java, you should override the hashCode()equals(), and toString() methods from java.lang.Object. The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) to choose a reduce partition, so you should make sure that you write a good hash function that mixes well to ensure reduce partitions are of a similar size.

Code the Mapper

The Mapper just as the mapper of the wordCount example, takes the combination to two adjacent words and emits the TextPair and a value of ‘1’.

The code for the Mapper is as given below:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class BigramCountMapper extends Mapper<LongWritable, Text, TextPair, IntWritable>{

	private static Text lastWord = null;
	private static TextPair textPair = new TextPair();
	private static Text wordText = new Text();
	private static IntWritable one = new IntWritable(1);

	@Override
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
	{
		String line = value.toString();
		line = line.replace(",", "");
		line = line.replace(".", "");

		for(String word: line.split("\\W+"))
		{
			if(lastWord == null)
			{
				lastWord = new Text(word);
			}
			else
			{
				wordText.set(word);
				textPair.set(lastWord, wordText);
				context.write(textPair, one);
				lastWord.set(wordText.toString());
			}
		}
	}
}
Code the Reducer

Hadoop takes all the emitted key-value pair from the Mapper and does the sorting and shuffling. After that all the values that have the same TextPair associated with them is put in the iterable list. This value is then provided to the Reducer. In Reducer we just add the values in the list, just as we had done in case of the wordCount.

The code for the Reducer is as given below:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class BigramCountReducer extends Reducer<TextPair, IntWritable, Text, IntWritable>{
	private static Text textPairText = new Text();
	@Override
	public void reduce(TextPair key, Iterable values, Context context) throws IOException, InterruptedException
	{
        int count=0;
        for(IntWritable value: values)
        {
            count += value.get();
        }

        textPairText.set(key.toString());
        context.write(textPairText, new IntWritable(count));
	}
}
Code the Driver

Finally, we will code the driver class that controls the job. Here we will need to mention the MapperOutputKey class as TextPair.class, which is the custom writable class.

The code for the Driver is as given below:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class BigramCount {
	public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException {
		if (args.length != 2) {
			System.err.println("Inavlid Command!");
			System.err.println("Usage: BigramCount <input type="text" /> <output>");
			System.exit(0);
		}

		Configuration conf = new Configuration();
		conf.set("mapreduce.jobtracker.address", "local");
		conf.set("fs.defaultFS","file:///");

		Job job = new Job(conf);

		job.setJarByClass(BigramCount.class);
		job.setJobName("Word Count");

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setMapperClass(BigramCountMapper.class);
		job.setReducerClass(BigramCountReducer.class);

		job.setMapOutputKeyClass(TextPair.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
Setup the input directory in HDFS

Download ebooks from Project Gutenberg(http://www.gutenberg.org/). Save the ebook as plain text in a directory with the name ‘input’.

Later, we need to move this directory in HDFS. To do that, type the following in the terminal:

$ hadoop-1.1.2/bin/hadoop fs -put ~/Desktop/input/ .

This will move the directory in HDFS as seen below.

$ hadoop-1.1.2/bin/hadoop fs -ls
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2013-11-20 23:13 /user/hadoop/input
Run the job
$ hadoop-1.1.2/bin/hadoop jar ~/Desktop/bigramCount.jar BigramCount input output
13/11/20 23:13:28 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/11/20 23:13:28 INFO input.FileInputFormat: Total input paths to process : 1
13/11/20 23:13:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/20 23:13:28 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/20 23:13:28 INFO mapred.JobClient: Running job: job_201311202308_0003
13/11/20 23:13:29 INFO mapred.JobClient:  map 0% reduce 0%
13/11/20 23:13:35 INFO mapred.JobClient:  map 100% reduce 0%
13/11/20 23:13:43 INFO mapred.JobClient:  map 100% reduce 33%
13/11/20 23:13:45 INFO mapred.JobClient:  map 100% reduce 100%
13/11/20 23:13:46 INFO mapred.JobClient: Job complete: job_201311202308_0003
13/11/20 23:13:46 INFO mapred.JobClient: Counters: 26
13/11/20 23:13:46 INFO mapred.JobClient:   Job Counters
13/11/20 23:13:46 INFO mapred.JobClient:     Launched reduce tasks=1
13/11/20 23:13:46 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=5779
13/11/20 23:13:46 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/11/20 23:13:46 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/11/20 23:13:46 INFO mapred.JobClient:     Launched map tasks=1
13/11/20 23:13:46 INFO mapred.JobClient:     Data-local map tasks=1
13/11/20 23:13:46 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=9545
13/11/20 23:13:46 INFO mapred.JobClient:   File Output Format Counters
13/11/20 23:13:46 INFO mapred.JobClient:     Bytes Written=343198
13/11/20 23:13:46 INFO mapred.JobClient:   FileSystemCounters
13/11/20 23:13:46 INFO mapred.JobClient:     FILE_BYTES_READ=803716
13/11/20 23:13:46 INFO mapred.JobClient:     HDFS_BYTES_READ=274173
13/11/20 23:13:46 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1711913
13/11/20 23:13:46 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=343198
13/11/20 23:13:46 INFO mapred.JobClient:   File Input Format Counters
13/11/20 23:13:46 INFO mapred.JobClient:     Bytes Read=274059
13/11/20 23:13:46 INFO mapred.JobClient:   Map-Reduce Framework
13/11/20 23:13:46 INFO mapred.JobClient:     Map output materialized bytes=803716
13/11/20 23:13:46 INFO mapred.JobClient:     Map input records=4893
13/11/20 23:13:46 INFO mapred.JobClient:     Reduce shuffle bytes=803716
13/11/20 23:13:46 INFO mapred.JobClient:     Spilled Records=93962
13/11/20 23:13:46 INFO mapred.JobClient:     Map output bytes=709748
13/11/20 23:13:46 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
13/11/20 23:13:46 INFO mapred.JobClient:     Combine input records=0
13/11/20 23:13:46 INFO mapred.JobClient:     SPLIT_RAW_BYTES=114
13/11/20 23:13:46 INFO mapred.JobClient:     Reduce input records=46981
13/11/20 23:13:46 INFO mapred.JobClient:     Reduce input groups=24292
13/11/20 23:13:46 INFO mapred.JobClient:     Combine output records=0
13/11/20 23:13:46 INFO mapred.JobClient:     Reduce output records=24292
13/11/20 23:13:46 INFO mapred.JobClient:     Map output records=46981

You can now view the output from HDFS itself or download the directory on the local hard disk using the get command.

The output would look similar to the following:

...
command of	4
command the	1
commanded by	4
commanded the	1
commanded with	2
commander 10	1
commander Colonel	2
commander General	1
commander Prince	1
commander dated	1
commander decided	1
commander hastily	1
commander of	8
commander sent	1
...

Note – Going through the wordCount post before this post is strongly advised.

—–
References :-

  1. http://developer.yahoo.com/hadoop/tutorial/module5.html
  2. Hadoop – The Definitive Guide (Tom White)