Nothing Special   »   [go: up one dir, main page]

Distributed and Cloud Computing

Download as ppt, pdf, or txt
Download as ppt, pdf, or txt
You are on page 1of 58

Distributed and Cloud Computing

Chapter: Cloud Programming


and Software Environments
Part 1

Adapted from University of Southern California


with additions from
Matei Zaharia, EECS, UC Berkeley

Copyright © 2012, Elsevier Inc. All rights reserved. 1 1-1


Parallel Computing and Programming
Enviroments

 MapReduce
 Hadoop
 Amazon Web Services

2
What is MapReduce?
 Simple data-parallel programming model
 For large-scale data processing
 Exploits large set of commodity computers
 Executes process in distributed manner
 Offers high availability
 Pioneered by Google
 Processes 20 petabytes of data per day
 Popularized by open-source Hadoop project
 Used at Yahoo!, Facebook, Amazon, …

3
What is MapReduce used for?
 At Google:
 Index construction for Google Search
 Article clustering for Google News
 Statistical machine translation
 At Yahoo!:
 “Web map” powering Yahoo! Search
 Spam detection for Yahoo! Mail
 At Facebook:
 Data mining
 Ad optimization
 Spam detection
4
Motivation: Large Scale Data Processing

 Many tasks composed of processing lots of


data to produce lots of other data
 Want to use hundreds or thousands of CPUs
... but this needs to be easy!
 MapReduce provides
 User-defined functions
 Automatic parallelization and distribution
 Fault-tolerance
 I/O scheduling
 Status and monitoring

5
What is MapReduce used for?
 In research:
 Astronomical image analysis (Washington)
 Bioinformatics (Maryland)
 Analyzing Wikipedia conflicts (PARC)
 Natural language processing (CMU)
 Particle physics (Nebraska)
 Ocean climate simulation (Washington)
 <Your application here>

6
Distributed Grep

Split data grep matches


Split data grep matches
Very All
big Split data grep matches cat matches
data
Split data grep matches

grep is a command-line utility for searching plain-text data sets for lines
matching a regular expression.

cat is a standard Unix utility that concatenates and lists files

7
Distributed Word Count

Split data count count


Split data count count
Very merged
big Split data count count merge count
data
Split data count count

8
Map+Reduce

R
M E
Very Partitioning
A D Result
big Function
P U
data
C
E

 Map:  Reduce :
 Accepts input  Accepts
key/value pair intermediate
 Emits intermediate key/value* pair
key/value pair  Emits output
key/value pair
9
Architecture overview
Master node

user
Job tracker

Slave node 1 Slave node 2 Slave node N

Task tracker Task tracker Task tracker

Workers Workers Workers


10
GFS: underlying storage system
 Goal
 global view
 make huge files available in the face of node failures
 Master Node (meta server)
 Centralized, index all chunks on data servers
 Chunk server (data server)
 File is split into contiguous chunks, typically 16-
64MB.
 Each chunk replicated (usually 2x or 3x).
 Try to keep replicas in different racks.

11
GFS architecture

GFS Master
Client

C0 C1 C1 C0 C5

C5 C2 C5 C3 … C2

Chunkserver Chunkserver Chunkserver


1 2 N

12
Functions in the Model
 Map
 Process a key/value pair to generate intermediate
key/value pairs
 Reduce
 Merge all intermediate values associated with the
same key
 Partition
 By default : hash(key) mod R
 Well balanced

13
Programming Concept

 Map
 Perform a function on individual values in a data
set to create a new list of values
 Example: square x = x * x
map square [1,2,3,4,5]
returns [1,4,9,16,25]
 Reduce
 Combine values in a data set to create a new
value
 Example: sum = (each elem in arr, total +=)
reduce [1,2,3,4,5]
returns 15 (the sum of the elements)

14
15
Fig.6.5
Dataflow
Implementation
of MapReduce

Copyright © 2012, Elsevier Inc. All rights reserved. 16 1 - 16


A Simple Example
 Counting words in a large set of documents

map(string value)
The map function emits each word w
//key: document name
plus an associated count of occurrences
//value: document contents
(just a “1” is recorded in this
for each word w in value
pseudo-code)
EmitIntermediate(w, “1”);

reduce(string key, iterator values)


//key: word The reduce function sums together all
//values: list of counts counts emitted for a particular word
int results = 0;
for each v in values
result += ParseInt(v);
Emit(AsString(result));

17
A Word Counting Example on <Key, Count> Distribution

Copyright © 2012, Elsevier Inc. All rights reserved. 18 1 - 18


How Does it work?

 Map invocations are distributed across multiple machines by automatically


partitioning the input data into a set of M splits.
 Reduce invocations are distributed by paritioning the intermediate key space into R
pieces using a hash function: hash(key) mod R.
 R and the partitioning function are specified by the programmer.

19
MapReduce : Operation Steps
When the user program calls the MapReduce function, the
following sequence of actions occurs :

1) The MapReduce library in the user program first splits the


input files into M pieces – 16 megabytes to 64 megabytes (MB)
per piece. It then starts up many copies of program on a cluster
of machines.

2) One of the copies of program is master. The rest are workers


that are assigned work by the master.

20
20
MapReduce : Operation Steps
3) A worker who is assigned a map task :
 reads the contents of the corresponding input split
 parses key/value pairs out of the input data and passes each
pair to the user - defined Map function.
The intermediate key/value pairs produced by the Map function
are buffered in memory.

4) The buffered pairs are written to local disk, partitioned into R


regions by the partitioning function.
The location of these buffered pairs on the local disk are
passed back to the master, who forwards these locations to the
reduce workers.

21
21
MapReduce : Operation Steps
5) When a reduce worker is notified by the master about
these locations, it reads the buffered data from the local
disks of the map workers.
When a reduce worker has read all intermediate data, it
sorts it by the intermediate keys so that all occurrences
of the same key are grouped together.

6) The reduce worker iterates over the sorted


intermediate data and for each unique intermediate key,
it passes the key and the corresponding set of
intermediate values to the user’s Reduce function.
The output of the Reduce function is appended to a
final output file.

22
22
MapReduce : Operation Steps

7) When all map tasks and reduce tasks have been


completed, the master wakes up the user program.
At this point, MapReduce call in the user program
returns back to the user code.
After successful completion, output of the mapreduce
execution is available in the R output files.

23
23
Logical Data Flow in 5 Processing
Steps in MapReduce Process

(Key, Value) Pairs are generated by the Map function over multiple available Map Workers
(VM instances). These pairs are then sorted and group based on key ordering. Different key-
groups are then processed by multiple Reduce Workers in parallel.

Copyright © 2012, Elsevier Inc. All rights reserved. 24 1 - 24


Locality issue
 Master scheduling policy
 Asks GFS for locations of replicas of input file blocks
 Map tasks typically split into 64MB (== GFS block
size)
 Map tasks scheduled so GFS input block replica are
on same machine or same rack
 Effect
 Thousands of machines read input at local disk
speed
 Without this, rack switches limit read rate

25
Fault Tolerance
 Reactive way
 Worker failure
 Heartbeat, Workers are periodically pinged by master
 NO response = failed worker
 If the processor of a worker fails, the tasks of that worker are
reassigned to another worker.

 Master failure
 Master writes periodic checkpoints
 Another master can be started from the last checkpointed
state
 If eventually the master dies, the job will be aborted

26
Fault Tolerance
 Proactive way (Redundant Execution)
 The problem of “stragglers” (slow workers)
 Other jobs consuming resources on machine
 Bad disks with soft errors transfer data very slowly
 Weird things: processor caches disabled (!!)

 When computation almost done, reschedule in-


progress tasks
 Whenever either the primary or the backup
executions finishes, mark it as completed

27
Fault Tolerance
 Input error: bad records
 Map/Reduce functions sometimes fail for particular
inputs
 Best solution is to debug & fix, but not always
possible
 On segment fault
 Send UDP packet to master from signal handler
 Include sequence number of record being processed
 Skip bad records
 If master sees two failures for same record, next worker is
told to skip the record

28
Status monitor

29
Points need to be emphasized
 No reduce can begin until map is complete
 Master must communicate locations of
intermediate files
 Tasks scheduled based on location of data
 If map worker fails any time before reduce
finishes, task must be completely rerun
 MapReduce library does most of the hard work
for us!

30
Other Examples
 Distributed Grep:
 Map function emits a line if it matches a supplied pattern.
 Reduce function is an identity function that copies the supplied intermediate
data to the output.
 Count of URL accesses:
 Map function processes logs of web page requests and outputs <URL, 1>,
 Reduce function adds together all values for the same URL, emitting <URL,
total count> pairs.
 Reverse Web-Link graph; e.g., all URLs with reference to http://dblab.usc.edu:
 Map function outputs <tgt, src> for each link to a tgt in a page named src,
 Reduce concatenates the list of all src URLS associated with a given tgt
URL and emits the pair: <tgt, list(src)>.
 Inverted Index; e.g., all URLs with 585 as a word:
 Map function parses each document, emitting a sequence of <word,
doc_ID>,
 Reduce accepts all pairs for a given word, sorts the corresponding doc_IDs
and emits a <word, list(doc_ID)> pair.
 Set of all output pairs forms a simple inverted index.

31
MapReduce Implementations

MapReduce

Cluster, Multicore CPU,


1, Google Phoenix @ stanford
GPU,
2, Apache Hadoop Mars@HKUST

32
Hadoop : software platform originally developed by Yahoo
enabling users to write and run applications over vast
distributed data.
Attractive Features in Hadoop :
 Scalable : can easily scale to store and process petabytes of
data in the Web space
 Economical : An open-source MapReduce minimizes the
overheads in task spawning and massive data communication.
 Efficient: Processing data with high-degree of parallelism
across a large number of commodity nodes
 Reliable : Automatically maintains multiple copies of data to
facilitate redeployment of computing tasks on failures

Copyright © 2012, Elsevier Inc. All rights reserved. 33 1 - 33


Typical Hadoop Cluster
Aggregation switch

Rack switch

 40 nodes/rack, 1000-4000 nodes in cluster


 1 Gbps bandwidth within rack, 8 Gbps out of rack
 Node specs (Yahoo terasort):
8 x 2GHz cores, 8 GB RAM, 4 disks (= 4 TB?)

34
Image from http://wiki.apache.org/hadoop-data/attachments/HadoopPresentations/attachments/YahooHadoopIntro-apachecon-us-2008.pdf
Typical Hadoop Cluster

35
Image from http://wiki.apache.org/hadoop-data/attachments/HadoopPresentations/attachments/aw-apachecon-eu-2009.pdf
Challenges
Cheap nodes fail, especially if you have many
1. Mean time between failures for 1 node = 3 years
2. Mean time between failures for 1000 nodes = 1 day
3. Solution: Build fault-tolerance into system

Commodity network = low bandwidth


1. Solution: Push computation to the data

Programming distributed systems is hard


1. Solution: Data-parallel programming model: users
write “map” & “reduce” functions, system distributes
work and handles faults
36
Hadoop Components
 Distributed file system (HDFS)
 Single namespace for entire cluster
 Replicates data 3x for fault-tolerance

 MapReduce framework
 Executes user jobs specified as “map” and
“reduce” functions
 Manages work distribution & fault-tolerance

37
Hadoop Distributed File System

 Files split into 128MB


Namenode
blocks File1
1
 Blocks replicated across 2
3
several datanodes (usually 4

3)
 Single namenode stores
metadata (file names, block
locations, etc)
1 2 1 3
 Optimized for large files, 2
4
1
3
4
3
2
4
sequential reads
Datanodes
 Files are append-only
38
Copyright © 2012, Elsevier Inc. All rights reserved. 39 1 - 39
Secure Query Processing with
Hadoop/MapReduce
 Query Rewriting and Optimization Principles defined and
implemented for two types of data
 (i) Relational data: Secure query processing with HIVE
 (ii) RDF Data: Secure query processing with SPARQL
 Demonstrated with XACML Policies (content, temporal,
association)
 Joint demonstration with Kings College and U. of Insubria
 First demo (2010): Each party submits their data and policies
 Our cloud will manage the data and policies
 Second demo (2011): Multiple clouds

Copyright © 2012, Elsevier Inc. All rights reserved. 40 1 - 40


Higher-level languages over
Hadoop: Pig and Hive

41
Motivation
 Many parallel algorithms can be expressed by
a series of MapReduce jobs

 But MapReduce is fairly low-level: must think


about keys, values, partitioning, etc

 Can we capture common “job building blocks”?

42
Pig
 Started at Yahoo! Research
 Runs about 30% of Yahoo!’s jobs
 Features:
 Expresses sequences of MapReduce jobs
 Data model: nested “bags” of items
 Provides relational (SQL) operators (JOIN, GROUP
BY, etc)
 Easy to plug in Java functions
 Pig Pen development environment for Eclipse

43
An Example Problem
Load Users Load Pages
Suppose you have
user data in one file, Filter by age

page view data in


another, and you need Join on name

to find the top 5 most Group on url


visited pages by users
Count clicks
aged 18 - 25.
Order by clicks

Take top 5

44
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
In MapReduce
import ja va .io.IOExce ption; re porte r.se tSta tus("OK"); lp.se tOutputKe yCla ss(Te xt.c la ss);
import ja va .util.Arra yList; } lp.se tOutputVa lue Cla ss(Te xt.c la ss);
import ja va .util.Ite ra tor; lp.se tMa ppe rCla ss(Loa dPa ge s.c la ss);
import ja va .util.List; // Do the c ross produc t a nd c olle c t the va lue s File InputForma t.a ddInputPa th(lp, new
for (String s1 : first) { Pa th("/ use r/ga te s/pa ge s"));
import org.a pac he .hadoop.fs.Pa th; for (String s2 : se c ond) { File OutputForma t.se tOutputPa th(lp,
import org.a pac he .hadoop.io.LongWrita ble ; String outva l = ke y + "," + s1 + "," + s2; ne w Pa th("/user/ga te s/tmp/inde xe d_page s"));
import org.a pac he .hadoop.io.Te xt; oc .c olle c t(null, ne w Te xt(outva l)); lp.se tNumRe duc e Ta sks(0);
import org.a pac he .hadoop.io.Writa ble ; re porte r.se tSta tus("OK"); Job loa dPa ge s = ne w Job(lp);
i mport org.a pa c he .ha doop.io.Writable Compa ra ble ; }
import org.a pac he .hadoop.mapred.File InputForma t; } JobConf lfu = ne w JobConf(MRExample .c la ss);
import org.a pac he .hadoop.mapred.File OutputFormat; } lfu.s e tJobNa me ("Loa d a nd Filte r Use rs");
import org.a pac he .hadoop.mapred.JobConf; } lfu.se tInputForma t(Te xtInputForma t.c la ss);
import org.a pac he .hadoop.mapred.Ke yVa lue Te xtInputForma t; public sta tic c la ss Loa dJoined e xte nds Ma pRe duc e Ba se lfu.se tOutputK eyCla ss(Text.c la ss);
import org.a pa c he .ha doop.ma pre d.Ma ppe r; imple me nts Ma ppe r<Te xt, Text, Te xt, LongWrita ble > { lfu.se tOutputVa lue Cla ss(Te xt.c la ss);
import org.a pac he .hadoop.mapred.Ma pRe duc e Ba se ; lfu.se tMa ppe rClass(Loa dA ndFilte rUse rs.c la ss);
import org.a pac he .hadoop.mapred.OutputColle c tor; public void ma p( File InputForma t.a dd InputPa th(lfu, ne w
import org.a pac he .hadoop.mapred.Re c ordRe ade r; Te xt k, Pa th("/use r/ga te s/users"));
import org.a pac he .hadoop.mapred.Re duce r; Te xt va l, File OutputForma t.se tOutputPa th(lfu,
import org.a pac he .hadoop.mapred.Re porte r; OutputColle c tor< Te xt, LongWrita ble > oc , ne w Pa th("/user/ga te s/tmp/filte re d_use rs"));
imp ort org.a pac he .ha doop.mapre d.Se que nc eFileInputForma t; Re porte r re porte r) throws IOExc e ption { lfu.se tNumReduce Ta sks(0);
import org.a pac he .hadoop.mapred.Se que nc eFileOutputForma t; // Find the url Job loa dUse rs = ne w Job(lfu);
import org.a pac he .hadoop.mapred.Te xtInputForma t; String line = val.toString();
import org.a pac he .hadoop.mapred.jobc ontrol.Job; int firstComma = line .inde xOf(','); JobConf join = ne w JobConf( MRExa mple .c la ss);
import org.a pac he .hadoop.mapred.jobc ontrol.JobC ontrol; int se c .inde xOf(',', first Comma ); join.se tJobN a me ("Join Use rs a nd Page s");
import org.a pac he .hadoop.mapred.lib.IdentityMa ppe r; String ke y = line .substring(firstComma , se condComma ); join.se tInputForma t(Ke yVa lue Te xtInputForma t.c la ss);
// drop the re st of the rec ord, I don't ne e d it a nymore , join.se tO utputKe yCla ss(Te xt.c la ss);
public c la ss MRExa mple { // just pass a 1 for the c ombine r/reduce r to sum inste a d. join.se tO utputVa lue Cla ss(Te xt.c la ss);
public sta tic c la ss Loa dPa ge s exte nds Ma pRe duc e Ba se Te xt outKe y = ne w Te xt(ke y); join.se tMa ppe rCla ss(Ide ntityMa p per.cla ss);
imple me nts Ma ppe r< LongWritable , Te xt, Te xt, Te xt> { oc .c olle c t(outKe y, ne w LongWrita ble (1L)); join.se tRe duc e rCla ss(Join.c la ss);
} File InputForma t.a ddInputPa th(join, ne w
public void map(LongWrita ble k, Te xt va l, } Pa th("/use r/ga te s/tmp/inde xe d_pa ge s"));
OutputColle c tor< Te xt, Te xt> oc, public sta tic c la ss Re duc e Urls exte nds Ma pRe duc e Ba se File InputForma t.a ddInputPa th(join, ne w
Reporte r reporte r) throws IO Exc e ption { imple me nts Re duc e r< Te xt, LongWrita ble , Writa ble Compa ra ble, Pa th("/use r/ga te s/tmp/filte re d_use rs"));
// Pull the key out Writa ble > { File OutputForma t.se tOutputPa th(join, ne w
String line = va l.toString(); Pa th("/use r/ga te s/tmp/joine d"));
int firstComma = line .indexO f(','); public void re duc e ( join.se tN umRe duc e Ta sks(50);
String ke y = line .sub string(0, firstComma ); Te xt ke y, Job joinJob = ne w Job(join);
String va lue = line .substring(firstComma + 1); Ite ra tor< LongWrita ble > ite r, joinJob.a ddDe pe ndingJob(loa dPa ge s);
Te xt outKe y = ne w Te xt(ke y); OutputColle c tor< Writa ble Compa rable , Writa ble> oc , joinJob.a ddDe pe ndingJob(loa dUse rs);
// Prepe nd a n inde x to the va lue so we know w hic h file Re porte r re porte r) throws IOExc e ption {
// it c a me from. // Add up a ll the value s we se e JobConf group = ne w JobConf(MRE xample .c la ss);
Te xt outVa l = ne w Te xt("1 " + va lue ); group.se tJobNa me ("Group URLs");
oc .c olle c t(outKe y, outVa l); long sum = 0; group.se tInputForma t(KeyVa lueTe xtInputForma t.c la ss);
} wh ile (ite r.ha sNe xt()) { group.se tO utputKe yCla ss(Te xt.cla ss);
} sum + = ite r.ne xt().get(); group.se tO utputVa lue Cla ss(LongWrita ble .c la ss);
public sta tic c la ss Loa dAndFilte rUse rs e xte nds Ma pRe duc eBa se re porte r.se tSta tus("OK"); group.se tO utputForma t(Se que nce Fi le O utputFormat.c la ss);
imple me nts Ma ppe r< LongWritable , Te xt, Te xt, Te xt> { } group.se tMa ppe rCla ss(Loa dJoine d.c la ss);
group.se tCombine rCla ss(Re duc e U rls.c lass);
public void map(LongWrita ble k, Te xt va l, oc .c olle c t(ke y, ne w LongWrita ble(sum)); group.se tRe duc e rCla ss(Re duc e U rls.c la ss);
OutputColle ctor< Te xt, Te xt> oc , } File InputForma t.a ddInputPa th(group, ne w
Reporte r reporte r) throws IO Exc e ption { } Pa th("/use r/ga te s/tmp/joine d"));
// Pull the key out public sta tic c la ss Loa dClic ks e xte nds Ma pRe duc e Ba se File OutputForma t.se tOutputPa th(group, ne w
String line = va l.toString(); i mple me nts Ma pper<Writable Compa ra ble , Writa ble , LongWrita ble , Pa th("/use r/ga te s/tmp/groupe d"));
int firstComma = line .indexO f(','); Te xt> { group.se tN umRe duc e Ta sks(50);
String va lue = line .substring( firstComma + 1); Job groupJob = ne w Job(group);
int a ge = Intege r.pa rseInt(va lue ); public void ma p( groupJob.a ddDe pe ndingJob(joinJob);
if (a ge < 18 || a ge > 25) re turn; Writa ble Compa ra ble ke y,
String ke y = line .substring(0, firstComma ); Writa ble va l, JobConf top100 = ne w JobConf(MRExa mple .c la ss);
Te xt outKe y = ne w Te xt(ke y); OutputColle c tor< LongWrita ble , Te xt> oc , top100.se tJobNa me ("Top 100 sites");
// Prepe nd a n inde x to the va lue so w e know whic h file Re porte r re porte r) throws IO Exc e ption { top100.se tInputForma t(Seque nc e File InputForma t.c la ss);
// it c a me from. oc .c olle c t((LongWrita ble )va l, (Text)ke y); top100.se tOutputKe yCla ss(LongWrita ble .c la ss);
Te xt outVa l = ne w Te xt("2" + va lue ); } top100.se tOutputVa lue Cla ss(Te xt.c la ss);
oc .c olle c t(outKe y, outVa l); } top100.se tOutputForma t(Se que nc e File OutputF ormat.cla ss);
} public sta tic c la ss LimitClic ks e xte nds Ma pRe duc e Ba se top100.se tMa ppe rCla ss(Loa dClic ks.c la ss);
} imple me nts Re duc e r< LongWrita ble , Te xt, LongWrita ble , Te xt> { top100.se tCombine rCla ss(LimitClic ks.c la ss);
public sta tic c la ss Join exte nds Ma pRe duc e Ba se top100.se tRe duc e rCla ss(LimitClic ks.c la ss);
imple me nts Reduc e r< Te xt, Te xt, Te xt, Te xt> { int c ount = 0; File InputForma t.a ddInputPa th(top100, ne w
public void re duc e ( Pa th("/use r/ga te s/tmp/groupe d"));
public void re duc e (Te xt ke y, LongWrita ble ke y, FileOutputForma t.se tOutputPa th(top100, ne w
Iterator< Text> ite r, Ite ra tor< Te xt> ite r, Pa th("/use r/ga te s/top100site sforuse rs18to25"));
OutputCollec tor< Te xt, Te xt> oc, OutputCollec tor< LongWrita ble , Te xt> oc , top100.se tNumRe duc e Ta sks(1);
Reporte r reporte r) throws IO Exc e ption { Re porte r re porte r) throws IOExc e ption { Job limit = ne w Job(top100);
// For e a ch value , figure out whic h file it's from a nd limit.a ddDe pe ndingJob(groupJob);
store it // Only output the first 100 re c ords
// a c c ordingly. while (count < 100 && ite r.ha sNe xt()) { JobControl jc = ne w JobControl("Find top 100 site s for use rs
List<String> first = ne w ArrayList< String> (); oc .c olle c t(ke y, ite r.ne xt()); 18 to 25");
List<String> se c w Arra yList< String> (); count++ ; jc .a ddJob(loa dPa ge s);
} jc .a ddJob(loa dUse rs);
while (iter.ha sNe xt()) { } jc .a ddJob(joinJob);
Te xt t = ite r.ne xt(); } jc .a ddJob(groupJob);
String va lue = t.to String(); public sta tic void ma in(String[] a rgs) throws IOExc e ption { jc .a ddJob(limit);
if (va lue .c ha rAt(0) = = '1') JobConf lp = ne w JobConf(MRExa mple.c la ss); jc .run();
first.a dd(va lue.substring(1)); lp.se tJobN ame ("Loa d Pa ge s"); }
e lse se c ond.a dd(va lue .substring(1)); lp.se tInputForma t(TextInputFormat.c la ss); }

45
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
In Pig Latin
Users = load ‘users’ as (name, age);
Filtered = filter Users by
age >= 18 and age <= 25;
Pages = load ‘pages’ as (user, url);
Joined = join Filtered by name, Pages by user;
Grouped = group Joined by url;
Summed = foreach Grouped generate group,
count(Joined) as clicks;
Sorted = order Summed by clicks desc;
Top5 = limit Sorted 5;

store Top5 into ‘top5sites’;

46
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
Ease of Translation
Notice how naturally the components of the job translate into Pig Latin.

Load Users Load Pages

Users = load …
Filter by age
Filtered = filter …
Pages = load …
Join on name
Joined = join …
Group on url
Grouped = group …
Summed = … count()…
Count clicks Sorted = order …
Top5 = limit …
Order by clicks

Take top 5

47
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
Ease of Translation
Notice how naturally the components of the job translate into Pig Latin.

Load Users Load Pages

Users = load …
Filter by age
Filtered = filter …
Pages = load …
Join on name
Joined = join …
Job 1
Group on url
Grouped = group …
Job 2 Summed = … count()…
Count clicks Sorted = order …
Top5 = limit …
Order by clicks
Job 3
Take top 5

48
Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt
Hive
 Developed at Facebook
 Used for majority of Facebook jobs
 “Relational database” built on Hadoop
 Maintains list of table schemas
 SQL-like query language (HQL)
 Can call Hadoop Streaming scripts from HQL
 Supports table partitioning, clustering, complex
data types, some optimizations

49
Sample Hive Queries
• Find top 5 pages visited by users aged 18-25:
SELECT p.url, COUNT(1) as clicks
FROM users u JOIN page_views p ON (u.name = p.user)
WHERE u.age >= 18 AND u.age <= 25
GROUP BY p.url
ORDER BY clicks
LIMIT 5;

• Filter page views through Python script:


SELECT TRANSFORM(p.user, p.date)
USING 'map_script.py'
AS dt, uid CLUSTER BY dt
FROM page_views p;

50
Amazon Elastic MapReduce
 Provides a web-based interface and command-
line tools for running Hadoop jobs on Amazon
EC2
 Data stored in Amazon S3
 Monitors job and shuts down machines after
use
 Small extra charge on top of EC2 pricing

 If you want more control over how you Hadoop


runs, you can launch a Hadoop cluster on EC2
manually using the scripts in src/contrib/ec2

51
Elastic MapReduce Workflow

52
Elastic MapReduce Workflow

53
Elastic MapReduce Workflow

54
Elastic MapReduce Workflow

55
Conclusions
 MapReduce programming model hides the complexity of
work distribution and fault tolerance

 Principal design philosophies:


 Make it scalable, so you can throw hardware at problems
 Make it cheap, lowering hardware, programming and admin
costs

 MapReduce is not suitable for all problems, but when it


works, it may save you quite a bit of time

 Cloud computing makes it straightforward to start using


Hadoop (or other parallel software) at scale

56
Resources
 Hadoop: http://hadoop.apache.org/core/
 Pig: http://hadoop.apache.org/pig
 Hive: http://hadoop.apache.org/hive
 Video tutorials: http://www.cloudera.com/hadoop-training

 Amazon Web Services: http://aws.amazon.com/


 Amazon Elastic MapReduce guide:
http://docs.amazonwebservices.com/ElasticMapReduce/lat
est/GettingStartedGuide/

57
Thank You

For Queries:
 Contact :
 drramesh@iitism.ac.in

58

You might also like