MapReduce

MapReduce is a programming model for processing and generating large datasets allowing massive scalability, inspired on the map and reduce primitives found in many functional programming languages. Basically, it refers to two separate tasks: the map job, which takes data and converts it into another set of data where each element is broken into key/value pairs, and the reduce job, which takes those pairs and combines them into a smaller set of data.

Programs that implement this paradigm can be automatically parallelized and executed on clusters of computers, since the input data can be partitioned. A system like that takes care of the data partitioning process, scheduling program’s execution across the cluster, handling failures and managing the inter-communications. This makes that programmers with no experience with parallel and distributed programming can make use of the resources of a distributed system, hiding the details of parallelization, fault-tolerance, locality optimization and load balancing.

 

How to Use It

The user of a MapReduce library will need to express the computation as two functions written by himself: Map and Reduce.

Map will take an input pair and produce a set of intermediate key/value pairs. The library will group up together all intermediate values associated with the same intermediate key, and pass them to the Reduce function.  

The Reduce function will accept an intermediate key and its corresponding set of values, and merge them into a possibly smaller set of values (typically just zero or one value is produced per reduce call). Those values are provided to the Reduce functions via an iterator to make it fit in memory.

In addition, the programmer must write code to fill in a mapreduce specification object, passing the input and output files and some optional tuning parameters. Then, the user invokes the MapReduce function, passing it that object.

The Map invocations are distributed across the cluster by automatically partitioning the input data, so that different machines can process them. Reduce invocations are distributed by partitioning the intermediate data. The number of partitions as well as the partitioning function are specified by the programmer.  

Examples

There are many interesting programs that can be expressed as MapReduce computations, from simple tasks like sorting to data mining and machine learning problems.

Count of URL access frequency

The map function takes the logs of web page requests and produces an output of the form (URL,1). Then, the reduce function adds together values for the same URL and returns a pair of the form (URL, total_requests).

Distributed Grep

The Map function outputs a line if it matches a certain pattern. The reduce function just copies the intermediate value to its output.
 

Hadoop: A MapReduce framework

Hadoop is a framework from Apache which mainly provides two things:

  • A distributed File System, called HDFS (Hadoop Distributed File System)

  • An API to produce and run MapReduce jobs.

This post will focus on the MapReduce API of Hadoop. Even though it is written in Java, Hadoop programs may be written in another programming language, for example, Python. This will be done using the HadoopStreaming utility for passing data between the Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

Besides, it is a prerequisite to have an Hadoop cluster up and running. If you have not, you can find a step-by-step guide in our blog on how to do this.

To show its functionality, it will be shown the process of running a very simple example: the typical WordCount job in a Hadoop cluster, using Hadoop 2.7.2 on Mac OS.

First, it is needed to define the mapper.py and reducer.py files:

mapper.py

#!/usr/bin/python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%s\t%s' % (word, 1)

This scripts reads data from STDIN, split it into words and outputs a list of tuples of the form (word,1) to STDOUT. It will not compute the sum of word’s occurrences, because that is the reducer responsibility. Then, it is needed to give this file execution permission (chmod +x mapper.py).

reducer.py

#!/usr/bin/python
from operator import itemgetter
import sys
word2count = {}
for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
        word2count[word] = word2count.get(word, 0) + count
    except ValueError:
        pass
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
for word, count in sorted_word2count:
    print '%s\t%s'% (word, count)

This script will read the output of mapper.py from STDIN, sum the occurrences of each word and output the results to STDOUT. Also make sure it has execution permission (chmod +x reducer.py).

Note that these scripts focus on code simplicity given that this post is intended to show Hadoop MapReduce functionality. However, you might want to optimize them by using, for example, iterators, which would save a lot of memory consumption given the size of the input files.

You can test them both locally before using them in a MapReduce job with Hadoop with the following commands:

echo "foo innuy foo foo innuy" | python mapper.py
echo "foo innuy foo foo innuy" | python mapper.py | sort | python reducer.py

 

Running a MapReduce job in Hadoop

You will need to get some input data. A good source for large text files is Project Gutenberg (http://www.gutenberg.org), this tutorial will use a hamlet.txt file downloaded from this site.

Firstly, copy the file from your local machine to HDFS (Hadoop’s file system):

hdfs dfs -put hamlet.txt hamlet

You can check if it was copied with the following command:

hdfs dfs -ls

Then, run the MapReduce job:

hadoop jar /usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar -input hamlet -output output -mapper mapper.py -reducer reducer.py

The job will read the hamlet file, process it, and output the results to a single file in the HDFS directory output. To see the results:

hdfs dfs -cat output/*

And you will see something like this:

Angels 1
Anon 2
Anon, 1
Another 5
Answer, 1
Antiquity 1
Appears 1
Are 9
Arm 1
...

Finally, you can get the output file to your local machine as follows:

hdfs dfs -getmerge output/* output.txt

 

This was a simple example to show how MapReduce and Hadoop work, but much more complex jobs and optimizations could be done. You can check  Google's MapReduce Paper for more detailed information.
 

References

Based on this tutorial for MapReduce job execution on Ubuntu.