Sunday, December 15, 2013

Map/Reduce Programming Paradigm Example

MapReduce is the heart of Hadoop. It is the programming paradigm that allows for massive scalability across hundreds or thousands of servers in a Hadoop cluster. The MapReduce concept is fairly simple to understand for those who are familiar with clustered scale-out data processing solutions.

The term MapReduce actually refers to two separate and distinct tasks that Hadoop programs perform. The first is the map job, which takes a set of data and converts it into another set of data, where individual elements are broken down into tuples (key/value pairs). The reduce job takes the output from a map as input and combines those data tuples into a smaller set of tuples. As the sequence of the name MapReduce implies, the reduce job is always performed after the map job.

Lets start with Word Count Example :
Assume we have 3 web documents, and each documents contains web text. In that a real application won’t be quite so simple, as it’s likely to contain millions or even billions of rows, and they might not be neatly formatted rows at all; in fact, no matter how big or small the amount of data we need to analyze, the key principles which is covering here remains the same.

Doc 1 : Sachin is playing 200th Test.
Doc 2 : Dhoni is batting with Sachin in his 50th Test.Test is going good.
Doc 3 : Sachin and Dhoni playing Test Cricket Match in Mumbai.

In map phase the sentence would be split as words and form the initial key value pair as
Doc 1 :
<Sachin,1>
<is,1>
<playing,1>
<200th,1>
<Test,1>

Doc 2 :
<Dhoni,1>
<is,2>
<batting,1>
<with,1>
<Sachin,1>
<in,1>
<his,1>
<50th,1>
<Test,2>
<going,1>
<good,1>

Doc 3:
<Sachin,1>
<and,1>
<Dhoni,1>
<playing,1>
<Test,1>
<Cricket,1>
<Match,1>
<in,1>
<Mumbai,1>

In the reduce phase the keys are grouped together and the values for similar keys are added. So here there are only one pair of similar keys would be added so the out put key value pairs would be
<Sachin,2>
<is,3>
<playing,2>
<200th,1>
<Test,4>
<Dhoni,2>
<batting,1>
<with,1>
<in,2>
<his,1>
<50th,1>
<going,1>
<good,1>
<Cricket,1>
<Match,1>
<Mumbai,1>

This would give the number of occurrence of each word in the input. Thus reduce forms an aggregation phase for key

The point to be noted here is that first the mapper class executes completely on the entire data set splitting the words and forming the initial key value pairs. Only after this entire process is completed the reducer starts.


This Map/Reduce paradigm divide into 3 parts :
Part 1: Mapper Class
Part 2 : Reducer Class
Part 3 : Driver Class which will trigger Map/Reduce job.

Part 1 : Code of Mapper Class :
The functionality of the mapper method is as follows
 Step 1 : Create a IntWritable variable ‘one’ with value as 1
 Step 2 : Convert the input line in Text type to a String
 Step 3 : Use a tokenizer to split the line into words
 Step 4 : Iterate through each word and a form key value pairs as
          a) Assign each work from the tokenizer(of String type) to a Text ‘word’
          b) Form key value pairs for each word as <word,one> and push it to the output collector



import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
{
      /*Hadoop supported data types. The data types provided here are Hadoop specific data types designed for operational efficiency suited for massive parallel and lightning fast read write operations*/

      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
   
      /*Map method that performs the tokenizer job and framing the initial key value pairs
        Mapper<LongWritable, Text, Text, IntWritable> , it refers to the data type of input and output key value pairs specific to the mapper or rateher the map method, ie Mapper<Input Key Type, Input Value Type, Output Key Type, Output Value Type>. */

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            //taking one line at a time and tokenizing the same
            String line = value.toString();
          StringTokenizer tokenizer = new StringTokenizer(line);
       
          //iterating through all the words available in that line and forming the key value pair
            while (tokenizer.hasMoreTokens())
            {
               word.set(tokenizer.nextToken());
               //sending to output collector which inturn passes the same to reducer
                 output.collect(word, one);
            }
       }
}



Part 2 : Code of Reducer Class :
The functionality of the reduce method is as follows
   Step a) Initaize a variable ‘sum’ as 0
   Step b) Iterate through all the values with respect to a key and sum up all of them
   Step c) Push to the output collector the Key and the obtained sum as value


import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
{
      //reduce method accepts the Key Value pairs from mappers, do the aggregation based on keys and produce the final out put
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            int sum = 0;
            /*iterates through all the values available with a key and add them together and give the
            final result as the key and sum of its values*/
          while (values.hasNext())
          {
               sum += values.next().get();
          }
          output.collect(key, new IntWritable(sum));
      }
}

Part 3 : Code of  Driver Class which will trigger Map/Reduce job.

The functionality of the Driver method is as follows
   Step a) Creating a JobConf object 
   Step b) Setting configuration object with key and value class for Map/Reduce.
   Step c) Provide the mapper and reducer class names
   Step d) Provide the I/P and O/P HDFS directory path for Map/Reduce.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;


public class WordCountDriver extends Configured implements Tool{
      public int run(String[] args) throws Exception
      {
            //creating a JobConf object and assigning a job name for identification purposes
            JobConf conf = new JobConf(getConf(), WordCountDriver.class);
            conf.setJobName("WordCount");

            //Setting configuration object with the Data Type of output Key and Value
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);

            //Providing the mapper and reducer class names
            conf.setMapperClass(WordCountMapper.class);
            conf.setReducerClass(WordCountReducer.class);

            //the hdfs input and output directory to be fetched from the command line
            FileInputFormat.addInputPath(conf, new Path(args[0]));
            FileOutputFormat.setOutputPath(conf, new Path(args[1]));

            JobClient.runJob(conf);
            return 0;
      }
     
      public static void main(String[] args) throws Exception
      {
            int res = ToolRunner.run(new Configuration(), new WordCountDriver(),args);
            System.exit(res);
      }
}