An Introduction to Map Reduce with disco and python

Computer Science Department - University of Puerto Rico

Prepared by: José R. Ortiz-Ubarri

Map Reduce

MapReduce is a programming model for processing large data sets with a parallel, distributed algorithm on a cluster.

Map step: The master node takes the input, divides it into smaller sub-problems, and distributes them to worker nodes. A worker node may do this again in turn, leading to a multi-level tree structure. The worker node processes the smaller problem, and passes the answer back to its master node.

Reduce step: The master node then collects the answers to all the sub-problems and combines them in some way to form the output – the answer to the problem it was originally trying to solve.

Some examples:

  1. Disco (Python)
  2. Hadoop (Java)
  3. Amazon Elastic MapReduce (SQL-Like)

Formal definitions thanks to: http://en.wikipedia.org/wiki/MapReduce

disco: A MapReduce framework for python

Disco is a distributed computing framework based on the MapReduce paradigm.

Find it here: http://discoproject.org/

a tutorial here: http://discoproject.org/doc/disco/start/tutorial.html

and the API documentation here: http://discoproject.org/doc/disco/lib/

disco: Important Objects

from disco.core import Job

Job - For this presentation the most important object is Job.

Job is the object that links the input, the map and the reduce functions.

A job specifies a worker, the worker environment, a list of inputs, and some additional information about how to run the job.

Job routines:

  1. run - Attemp to run the job, and sets the workers job information on how to run the job.
  2. wait - to wait until the job has finished. Returns results of the job once has it has finished.

disco: Important Routines

from disco.core import result_iterator
from disco.util import kvgroup
  1. result_iterator - iterates through all key-value pairs in the results of the reduce function.

  2. kvgroup - Funtion that groups the values of all the consecutive keys that compare equal. It does not sorts the input. Therefore it must be sorted first (use the sorted function). It will return a list of keys, each with an iterator of a list of values.

disco: The Map function

Example:
 map(line, params) 

The first parameter is an input entry, which is by default a text line.

The second parameter can be an input object specified by the user.

Note: The name of the map funtion can be any valid identifier.

disco: The Reduce function

Example:
reduce(iter, params)

The first parameter is an iterator over those keys and values produced by the map function.

The second parameter can be an input object specified by the user.

Note: The name of the reduce funtion can be any valid identifier.

disco Simple Program template



from disco.core import Job, result_iterator

def map(line, params):
    # Definition of the map function goes here

def reduce(iter, params):
    # Definitions of the reduce function goes here

if __name__ == '__main__':
    job = Job().run(input=["input1",..., "inputn"], # list of inputs
                    map=map,
                    reduce=reduce)

result = result_iterator(job.wait(show=True))

# Code to do something with the result iterator.
# For example print the results

Note: The name of the map and reduce funtions can be any valid identifier, as long as the name is well set in the run function.

count_words.py: The hello world of MapReduce.


from disco.core import Job, result_iterator

def map(line, params):
    for word in line.split():
        yield word, 1

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

if __name__ == '__main__':
    job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
                    map=map,
                    reduce=reduce)
    for word, count in result_iterator(job.wait(show=True)):
        print(word, count)

disco: Inputs

The input is a list of inputs which for the purpose of this tutorial can be a file in the web or a file in the system using file instead of http.

Example:
job = Job().run(input=["file:///bccd/home/jortiz/bigfile.txt"],... 

There are more types that are used for files with the disco DDFS

Why disco?

Running MapReduce python programs

Just:
python count_words.py

Example

Pitbull song in spanish. Very famous, and it doesn't say a lot.

Echa pa' allá, todo lo malo echa pa' allá
Say it with me
Echa pa' allá, todo lo malo echa pa' allá
Say it loud
Echa pa' allá, todo lo malo echa pa' allá
Say it louder
Echa pa' allá, todo lo malo echa pa' allá

Sube las manos pa' arriba,
Dale pa' abajo, dale pa' un lado, pa'l otro lado
Sube las manos pa' arriba,
Dale pa' abajo, dale pa' un lado, pa'l otro lado
Sube las manos pa' arriba,
Dale pa' abajo, dale pa' un lado, pa'l otro lado
Sube las manos pa' arriba,
Dale pa' abajo, dale pa' un lado, pa'l otro lado

A mí qué me importa el dinero,
A mí qué me importa la fama.
Aquí lo que importa es salud, familia, el futuro, los niños y las almas.

Example (continued)

#python count_words.py
pa 20
lado 8
allá 8
lo 5
las 5
un 4
todo 4
pal 4
otro 4
manos 4
malo 4
echa 4
dale 4
arriba 4
abajo 4
Sube 4
Echa 4
Dale 4
me 3
it 3
importa 3

References

Disco project, http://discoproject.org/

Python, www.python.org

Wikipedia, http://en.wikipedia.org/wiki/MapReduce