CSE 124: Networked Services

Writing a MapReduce

Introduction

MapReduce comes from two standard features in many function programming languages.

The Map function takes a {key, value}. performs some computation on them, then outputs a {key, value}.

Map: (key, value) -> (key, value)

The Reduce function takes a {key, value[]}, performs some computation on them, then outputs a {key, value}[].

Reduce: (key, value[]) -> (key, value)[]

output.collect(key, value); is how Map and Reduce functions emit their {key, value} pairs.

Writing a MapReduce is as simple as writing these two functions (or using already-provided functions), then telling a job object which functions you want to use.

Trivial Identity Example

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class Trivial {

  public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(Trivial.class);
    conf.setJobName("trivial");

    conf.setMapperClass(Trivial.IdentityMapper.class);
    conf.setReducerClass(Trivial.IdentityReducer.class);

    if (args.length < 2) {
      System.out.println("ERROR: Wrong number of parameters");
      System.out.println("trivial <input_path> <output_path>");
      System.exit(1);
    }

    conf.setInputPath(new Path(args[0]));
    conf.setOutputPath(new Path(args[1]));

    JobClient.runJob(conf);
  }

  public static class IdentityMapper extends MapReduceBase implements Mapper {
    public void map(WritableComparable key, Writable val,
                    OutputCollector output, Reporter reporter)
      throws IOException {
      output.collect(key, val);
    }
  }

  public static class IdentityReducer extends MapReduceBase implements Reducer {
    public void reduce(WritableComparable key, Iterator values,
                       OutputCollector output, Reporter reporter)
      throws IOException {
      while (values.hasNext()) {
        output.collect(key, (Writable)values.next());
      }
    }
  }
}
    

This may seem like a lot of code to setup, but it's pretty simple and straightforward.

Compiling your Map Reduce Program

In this example, we will assume that the above program resides in a directory called example in your home directory. Copy this Makefile and put it into the example directory. Now do:

    $ cd example
    $ make
    

At this point you should have a file called build.jar under the examples directory. You should now be able to launch your first Hadoop job!

    $ cd ~
    $ /net/global/cse124/default/bin/hadoop jar example/build.jar Trivial input-file output-dir
    

Note that this ASSUMES that you have already done the following (as outlined here):

  • Formatted your name node (/net/global/cse124/default/bin/hadoop namenode -format)
  • Started Hadoop on all nodes (/net/global/cse124/default/bin/start-all.sh)
  • Added input-file to your Distributed File System (DFS) (/net/global/cse124/default/bin/hadoop dfs -put localfile remotefile)

Test this with a small input file first. Hadoop should report the progress of your job on standard output. For more detailed status report, you can check the status pages as described here. Once the job finishes, the output will be available under output-dir inside your DFS. So you will have to do a dfs -get to retrieve the output to your local file system if you want to directly view/analyze it.

JobConf

There are many options for JobConf objects. For complete details see JobConf on the Hadoop API page. I shall cover what I have found to be the most useful options here.

  • JobConf(Class main_class);

    This initializes the configuration. Use the class for your main file to assist with logging.

    For those of you not familiar with reflection, you can use the ClassName.class to specify a class statically (which is mostly what you will be doing).

  • conf.setCombinerClass(Class combiner);

    combiner implements Reducer

    The Combiner class is like a Reducer, in fact, it is often the same class as the Reducer. It operates after all the Maps have been done on a single node, but before gathering up the keys from the other nodes. This is an optimization to make use of all the data already in memory on one machine. It is most useful in examples like WordCount, in which the Map simply outputs (key,1) pairs. The Combiner can easily reduce the amount of data sent to the Reduce phase by pre-summing the pairs.

  • conf.setOutputKeyClass(Class obj);

    obj implements WritableComparable

    This is the type of the keys output by the Reducer function. Unless setMapOutputKey is used, it is also the type of the keys emitted by Map.

  • conf.setOutputValueClass(Class obj);

    obj implements WritableComparable

    This is the type of the values output by the Reduce function. Unless setMapOutputKey is used, it is also the type of the keys emitted by Map.

  • conf.setInputFormat(Class format);

    format extends InputFormatBase

    This class provides methods for reading in custom input formats. This is done using a (usually inlined) subclass of RecordReader. (See InputFormatBase)

    You may optionally specify how files are to be split if they are too big. As well as which files within a directory are going to be used as input, instead of the all of them.

  • conf.setNumReduceTasks(1);

    This controls the number of reduce tasks, and consequently the number of output files.

    You usually want to specify this in the conf/hadoop-site.xml config file, but sometimes it only makes sense to have a single output file, as when sorting. In other cases setting this should be avoided, so as to make best use of cluster resources.

  • conf.set(String property_name, String value);
    conf.get(String property_name, String default_value);

    In theory, value can be an Object, but when it is, it is just .toString'd.

    Use these methods to pass other parameters to your Map and Reduce classes.

Mapper & Reducer

While map is the only required function to overload in a Mapper class, and reduce is the only required function to overload in a Reducer class, there are a few other useful ones which are inherited from MapReduceBase. For complete details see MapReduceBase on the Hadoop API page. I shall cover what I have found to be the most useful options here.

  • configure(JobConf job)

    Use this method to initialize your Mapper or Reducer object. This is where you have access to the JobConf object for the current job. You must extract any parameters here. It is called before map and reduce.

  • close()

    This method is called after map and reduce. Use it to perform any cleanup you initialized in configure().

<- Hadoop Intro.