Nothing Special   »   [go: up one dir, main page]

Chapter 05 Google Re

Download as ppt, pdf, or txt
Download as ppt, pdf, or txt
You are on page 1of 69

Distributed and Cloud Computing

Google Cloud Computing System

Copyright © 2012, Elsevier Inc. All rights reserved. 1 1-1


Parallel Computing and Programming
Enviroments

 MapReduce
 Hadoop
 BigTable
 GFS

2
What is MapReduce?
 Simple data-parallel programming model
 For large-scale data processing
 Exploits large set of commodity computers
 Executes process in distributed manner
 Offers high availability
 Pioneered by Google
 Processes 20 petabytes of data per day
 Popularized by open-source Hadoop project
 Used at Yahoo!, Facebook, Amazon, …

3
What is MapReduce used for?
 At Google:
 Index construction for Google Search
 Article clustering for Google News
 Statistical machine translation
 At Yahoo!:
 “Web map” powering Yahoo! Search
 Spam detection for Yahoo! Mail
 At Facebook:
 Data mining
 Ad optimization
 Spam detection
4
Motivation: Large Scale Data Processing

 Many tasks composed of processing lots of


data to produce lots of other data
 Want to use hundreds or thousands of CPUs
... but this needs to be easy!
 MapReduce provides
 User-defined functions
 Automatic parallelization and distribution
 Fault-tolerance
 I/O scheduling
 Status and monitoring

5
Distributed Grep

Split data grep matches


Split data grep matches
Very All
big Split data grep matches cat matches
data
Split data grep matches

grep is a command-line utility for searching plain-text data sets for lines
matching a regular expression.

cat is a standard Unix utility that concatenates and lists files

6
Distributed Word Count

Split data count count


Split data count count
Very merged
big Split data count count merge count
data
Split data count count

7
Map+Reduce

R
M E
Very Partitioning
A D Result
big Function
P U
data
C
E

 Map:  Reduce :
 Accepts input  Accepts
key/value pair intermediate
 Emits intermediate key/value* pair
key/value pair  Emits output
key/value pair
8
Architecture overview
Master node

user
Job tracker

Slave node 1 Slave node 2 Slave node N

Task tracker Task tracker Task tracker

Workers Workers Workers


9
GFS: underlying storage system
 Goal
 global view
 make huge files available in the face of node failures
 Master Node (meta server)
 Centralized, index all chunks on data servers
 Chunk server (data server)
 File is split into contiguous chunks, typically 16-
64MB.
 Each chunk replicated (usually 2x or 3x).
 Try to keep replicas in different racks.

10
GFS architecture

GFS Master
Client

C0 C1 C1 C0 C5

C5 C2 C5 C3 … C2

Chunkserver Chunkserver Chunkserver


1 2 N

11
Functions in the Model
 Map
 Process a key/value pair to generate intermediate
key/value pairs
 Reduce
 Merge all intermediate values associated with the
same key
 Partition
 By default : hash(key) mod R
 Well balanced

12
Programming Concept

 Map
 Perform a function on individual values in a data
set to create a new list of values
 Example: square x = x * x
map square [1,2,3,4,5]
returns [1,4,9,16,25]
 Reduce
 Combine values in a data set to create a new
value
 Example: sum = (each elem in arr, total +=)
reduce [1,2,3,4,5]
returns 15 (the sum of the elements)

13
14
A Simple Example
 Counting words in a large set of documents

map(string value)
The map function emits each word w
//key: document name
plus an associated count of occurrences
//value: document contents
(just a “1” is recorded in this
for each word w in value
pseudo-code)
EmitIntermediate(w, “1”);

reduce(string key, iterator values)


//key: word The reduce function sums together all
//values: list of counts counts emitted for a particular word
int results = 0;
for each v in values
result += ParseInt(v);
Emit(AsString(result));

15
A Word Counting Example on <Key, Count> Distribution

Copyright © 2012, Elsevier Inc. All rights reserved. 16 1 - 16


How Does it work?

 Map invocations are distributed across multiple machines by automatically


partitioning the input data into a set of M splits.
 Reduce invocations are distributed by paritioning the intermediate key space into R
pieces using a hash function: hash(key) mod R.
 R and the partitioning function are specified by the programmer.

17
Logical Data Flow in 5 Processing
Steps in MapReduce Process

(Key, Value) Pairs are generated by the Map function over multiple available Map Workers
(VM instances). These pairs are then sorted and group based on key ordering. Different key-
groups are then processed by multiple Reduce Workers in parallel.

Copyright © 2012, Elsevier Inc. All rights reserved. 18 1 - 18


Locality issue
 Master scheduling policy
 Asks GFS for locations of replicas of input file blocks
 Map tasks typically split into 64MB (== GFS block
size)
 Map tasks scheduled so GFS input block replica are
on same machine or same rack
 Effect
 Thousands of machines read input at local disk
speed
 Without this, rack switches limit read rate

19
Fault Tolerance
 Reactive way
 Worker failure
 Heartbeat, Workers are periodically pinged by master
 NO response = failed worker
 If the processor of a worker fails, the tasks of that worker are
reassigned to another worker.

 Master failure
 Master writes periodic checkpoints
 Another master can be started from the last checkpointed
state
 If eventually the master dies, the job will be aborted

20
Fault Tolerance
 Proactive way (Redundant Execution)
 The problem of “stragglers” (slow workers)
 Other jobs consuming resources on machine
 Bad disks with soft errors transfer data very slowly
 Weird things: processor caches disabled (!!)

 When computation almost done, reschedule in-


progress tasks
 Whenever either the primary or the backup
executions finishes, mark it as completed

21
Fault Tolerance
 Input error: bad records
 Map/Reduce functions sometimes fail for particular
inputs
 Best solution is to debug & fix, but not always
possible
 On segment fault
 Send UDP packet to master from signal handler
 Include sequence number of record being processed
 Skip bad records
 If master sees two failures for same record, next worker is
told to skip the record

22
Points need to be emphasized
 No reduce can begin until map is complete
 Master must communicate locations of
intermediate files
 Tasks scheduled based on location of data
 If map worker fails any time before reduce
finishes, task must be completely rerun
 MapReduce library does most of the hard work
for us!

23
Hadoop : software platform originally developed by Yahoo
enabling users to write and run applications over vast
distributed data.
Attractive Features in Hadoop :
 Scalable : can easily scale to store and process petabytes of
data in the Web space
 Economical : An open-source MapReduce minimizes the
overheads in task spawning and massive data communication.
 Efficient: Processing data with high-degree of parallelism
across a large number of commodity nodes
 Reliable : Automatically maintains multiple copies of data to
facilitate redeployment of computing tasks on failures

Copyright © 2012, Elsevier Inc. All rights reserved. 24 1 - 24


Typical Hadoop Cluster
Aggregation switch

Rack switch

 40 nodes/rack, 1000-4000 nodes in cluster


 1 Gbps bandwidth within rack, 8 Gbps out of rack
 Node specs (Yahoo terasort):
8 x 2GHz cores, 8 GB RAM, 4 disks (= 4 TB?)

25
Image from http://wiki.apache.org/hadoop-data/attachments/HadoopPresentations/attachments/YahooHadoopIntro-apachecon-us-2008.pdf
Typical Hadoop Cluster

26
Image from http://wiki.apache.org/hadoop-data/attachments/HadoopPresentations/attachments/aw-apachecon-eu-2009.pdf
Challenges
Cheap nodes fail, especially if you have many
1. Mean time between failures for 1 node = 3 years
2. Mean time between failures for 1000 nodes = 1 day
3. Solution: Build fault-tolerance into system

Commodity network = low bandwidth


1. Solution: Push computation to the data

Programming distributed systems is hard


1. Solution: Data-parallel programming model: users
write “map” & “reduce” functions, system distributes
work and handles faults
27
Hadoop Components
 Distributed file system (HDFS)
 Single namespace for entire cluster
 Replicates data 3x for fault-tolerance

 MapReduce framework
 Executes user jobs specified as “map” and
“reduce” functions
 Manages work distribution & fault-tolerance

28
Hadoop Distributed File System

 Files split into 128MB


Namenode
blocks File1
1
 Blocks replicated across 2
3
several datanodes (usually 4

3)
 Single namenode stores
metadata (file names, block
locations, etc)
1 2 1 3
 Optimized for large files, 2
4
1
3
4
3
2
4
sequential reads
Datanodes
 Files are append-only
29
Copyright © 2012, Elsevier Inc. All rights reserved. 30 1 - 30
Higher-level languages over
Hadoop: Hive

31
Motivation
 Many parallel algorithms can be expressed by
a series of MapReduce jobs

 But MapReduce is fairly low-level: must think


about keys, values, partitioning, etc

 Can we capture common “job building blocks”?

32
Hive
 Developed at Facebook
 Used for majority of Facebook jobs
 “Relational database” built on Hadoop
 Maintains list of table schemas
 SQL-like query language (HQL)
 Can call Hadoop Streaming scripts from HQL
 Supports table partitioning, clustering, complex
data types, some optimizations

33
Sample Hive Queries
• Find top 5 pages visited by users aged 18-25:
SELECT p.url, COUNT(1) as clicks
FROM users u JOIN page_views p ON (u.name = p.user)
WHERE u.age >= 18 AND u.age <= 25
GROUP BY p.url
ORDER BY clicks
LIMIT 5;

• Filter page views through Python script:


SELECT TRANSFORM(p.user, p.date)
USING 'map_script.py'
AS dt, uid CLUSTER BY dt
FROM page_views p;

34
Amazon Elastic MapReduce
 Provides a web-based interface and command-
line tools for running Hadoop jobs on Amazon
EC2
 Data stored in Amazon S3
 Monitors job and shuts down machines after
use
 Small extra charge on top of EC2 pricing

 If you want more control over how you Hadoop


runs, you can launch a Hadoop cluster on EC2
manually using the scripts in src/contrib/ec2

35
Elastic MapReduce Workflow

36
Elastic MapReduce Workflow

37
Elastic MapReduce Workflow

38
Elastic MapReduce Workflow

39
Google File System

40
Distributed File Systems
before GFS

41
What is Different?
 Component failures are norm rather than
exception
 File system consists of 100s/1,000s commodity
storage servers
 Comparable number of clients

 Much bigger files


 GBs file sizes common
 TBs data sizes: hard to manipulate billions of KB size
blocks

42
What is Different?
 Files are mutated by appending new data, and
not overwriting
 Random writes practically non-existent
 Once written, files are only read sequentially

 Own both applications and file system  can


efectively co-designing applications and file
system APIs
 Relaxed consistency model
 Atomic append operation: allow multiple clients to
append data to a file with no extra synchronization
between them

43
Assumptions and Design
Requirements
 Should monitor, detect, tolerate, and recover from failures
 Store a modest number of large files (millions of 100s MB
files)
 Small files supported, but no need to be efficient

 Workload
 Large sequential reads
 Small reads supported, but ok to batch them
 Large, sequential writes that append data to files
 Small random writes supported but no need to be efficient

44
Assumptions and Design
Requirements
 Implement well-defined semantics for concurrent
appends
 Producer-consumer model
 Support hundreds of producers concurrently appending
to a file
 Provide atomicity with minimal synchronization overhead
 Support consumers reading the file as it is written

 High sustained bandwidth more important than low


latency
45
Architecture
 Single Master

46
Architecture
 64MB chunks identified by unique 64 bit identifier

47
Consistency Model

 Mutation: write/append operation to a chunk


 Use lease mechanism to ensure consistency:
 Master grants a chunk lease to one of replicas (primary)
 Primary picks a serial order for all mutations to the chunk
 All replicas follow this order when applying mutations
 Global mutation order defined first by
 lease grant order chosen by the master
 serial numbers assigned by the primary within lease
 If master doesn’t hear from primary, it grant lease to
another replica after lease expires

48
Write Control and data Flow

49
Atomic Writes Appends
 Client specifies only the data

 GFS picks offset to append data and returns it


to client

 Primary checks if appending record will exceed


chunk max size
 If yes
 Pad chunk to maximum size
 Tell client to try on a new chunk

50
Master Operation
 Namespace locking management
 Each master operation acquires a set of locks
 Locking scheme allows concurrent mutations in
same directory
 Locks are acquired in a consistent total order to
prevent deadlock

 Replica Placement
 Spread chunk replicas across racks to maximize
availability, reliability, and network bandwidth

51
Others

 Chunk creation
 Equalize disk utilization
 Limit the number of creation on chunk server
 Spread replicas across racks
 Rebalancing
 Move replica for better disk space and load balancing.
 Remove replicas on chunk servers with below average free
space

52
BigTable

53
Motivation

 Highly available distributed storage for structured data, e.g.,


 URLs: content, metadata, links, anchors, page rank
 User data: preferences, account info, recent queries
 Geography: roads, satellite images, points of interest, annotations

 Large scale
 Petabytes of data across thousands of servers
 Billions of URLs with many versions per page
 Hundreds of millions of users
 Thousands of queries per second
 100TB+ satellite image data

54
(Big) Tables

 “A BigTable is a sparse, distributed, persistent multidimensional sorted


map”
 (row:string, column:string, time:int64)  cell content

55
Column Families

 Column Family
 Group of column keys
 Basic unit of data access
 Data in a column family is typically of the same type
 Data within the same column family is compressed
 Identified by family:qualifier, e.g.,
 “language”:language_id
 “anchor”:referring_site Cell content
 Example: <a href="http://www.w3.org/">CERN</a> appearing in
www.berkeley.edu
Referring site

56
Another Example

57
Timestamps
 Each cell in a Bigtable can contain multiple versions of same data
 Version indexed by a 64-bit timestamp: real time or assigned by client

 Per-column-family settings for garbage collection


 Keep only latest n versions
 Or keep only versions written since time t

 Retrieve most recent version if no version specified


 If specified, return version where timestamp ≤ requested time

58
Tablets

 Table partitioned dynamically by rows into tablets


 Tablet = range of contiguous rows
 Unit of distribution and load balancing
 Nearby rows will usually be served by same server
 Accessing nearby rows requires communication with small # of servers
 Usually, 100-200 MB per tablet

 Users can control related rows to be in same tablet by row keys


 E.g., store maps.google.com/index.html under key
com.google.maps/index.html

59
SSTable (Sorted String Table)

 Immutable, sorted file of key-value pairs


 Chunks of data plus an index
 Index of block ranges, not values
 Index loaded into memory when SSTable is opened
 Lookup is a single disk seek
 Client can map SSTable into mem
64K 64K 64K SSTable
block block block
Index

60
Putting Everything Together

 SSTables can be shared


 Tablets do not overlap, SSTables can overlap
Tablet Tablet
aardvark apple apple_two_E boat

SSTable SSTable SSTable SSTable

61
API

 Tables and column families: create, delete, update, control


rights
 Rows: atomic read and write, read-modify-write sequences
 Values: delete, and lookup values in individual rows
 Others
 Iterate over subset of data in table
 No transactions across rows, but support batching writes across
rows

62
Architecture
 Google-File-System (GFS) to store log and data
files.
 SSTable file format

 Chubby as a lock service (future lecture)


 Ensure at most one active master exists
 Store bootstrap location of Bigtable data
 Discover tablet servers
 Store Bigtable schema information (column family info
for each table)
 Store access control lists
63
Implementation
 Components:
 Library linked with every client
 Master server
 Many tablet servers

 Master responsible for assigning tablets to tablet


servers
 Tablet servers can be added or removed dynamically
 Tablet server store typically 10-1000 tablets
 Tablet server handle read and writes and splitting of
tablets
 Client data does not move through master
64
Tablet Location Hierarchy

 Able to address 234 tablets

65
Tablet Assignment

 Master keeps track of live tablet servers, current


assignments, and unassigned tablets
 Master assigns unassigned tablets to tablet servers
 Tablet servers are linked to files in Chubby directory
 Upon a master starting
 Acquire master lock in Chubby
 Scan live tablet servers
 Get list of tablets from each tablet server, to find out assigned
tablets
 Learn set of existing tablets → adds unassigned tablets to list

66
Tablet Representation

 Tablet recovery:
 METADATA: list of SSTables that comprise a tablet and
set of redo points
 Reconstructs the memtable by applying all of the
updates that have committed since the redo points
67
BigTable vs. Relational DB
 No data independency:
 Clients dynamically control whether to serve data form
memory or disk
 Client control locality by key names
 Uninterpreted values: clients can serialize,
deserialize data
 No multi-row transactions
 No table-wide integrity constraints
 Immutable data, similar to DBs
 Can specify: keep last N versions or last N days
 API: C++, not SQL(no complex queries)
68
Conclusions
 MapReduce programming model hides the complexity of
work distribution and fault tolerance

 Principal design philosophies:


 Make it scalable, so you can throw hardware at problems
 Make it cheap, lowering hardware, programming and admin
costs

 MapReduce is not suitable for all problems, but when it


works, it may save you quite a bit of time

 Cloud computing makes it straightforward to start using


Hadoop (or other parallel software) at scale

69

You might also like