Slides 04
Slides 04
Slides 04
(3rd Edition)
Application protocol
Application 7
Presentation protocol
Presentation 6
Session protocol
Session 5
Transport protocol
Transport 4
Network protocol
Network 3
Data link protocol
Data link 2
Physical protocol
Physical 1
Network
Drawbacks
Focus on message-passing only
Often unneeded or unwanted functionality
Violates access transparency
Low-level layers
Recap
Physical layer: contains the specification and implementation of bits, and
their transmission between sender and receiver
Data link layer: prescribes the transmission of a series of bits into a frame
to allow for error and flow control
Network layer: describes how packets in a network of computers are to be
routed.
Observation
For many distributed systems, the lowest-level interface is that of the network
layer.
Transport Layer
Important
The transport layer provides the actual communication facilities for most
distributed systems.
Middleware layer
Observation
Middleware is invented to provide common services and protocols that can be
used by many different applications
A rich set of communication protocols
(Un)marshaling of data, necessary for integrated systems
Naming protocols, to allow easy sharing of resources
Security protocols for secure communication
Scaling mechanisms, such as for replication and caching
Note
What remains are truly application-specific protocols... such as?
Middleware protocols 5 / 49
Communication: Foundations Layered Protocols
Application protocol
Application
Middleware protocol
Middleware
Physical/Link-level protocol
Hardware
Network
Middleware protocols 6 / 49
Communication: Foundations Types of Communication
Types of communication
Distinguish...
Synchronize at Synchronize at Synchronize after
request submission request delivery processing by server
Client
Request
Transmission
interrupt
Storage
facility
Reply
Server Time
7 / 49
Communication: Foundations Types of Communication
Types of communication
Transient versus persistent
Synchronize at Synchronize at Synchronize after
request submission request delivery processing by server
Client
Request
Transmission
interrupt
Storage
facility
Reply
Server Time
Types of communication
Request
Transmission
interrupt
Storage
facility
Reply
Server Time
At request submission
At request delivery
After request processing
9 / 49
Communication: Foundations Types of Communication
Client/Server
Some observations
Client/Server computing is generally based on a model of transient
synchronous communication:
Client and server have to be active at time of communication
Client issues request and blocks until it receives reply
Server essentially waits only for incoming requests, and subsequently
processes them
10 / 49
Communication: Foundations Types of Communication
Client/Server
Some observations
Client/Server computing is generally based on a model of transient
synchronous communication:
Client and server have to be active at time of communication
Client issues request and blocks until it receives reply
Server essentially waits only for incoming requests, and subsequently
processes them
10 / 49
Communication: Foundations Types of Communication
Messaging
Message-oriented middleware
Aims at high-level persistent asynchronous communication:
Processes send each other messages, which are queued
Sender need not wait for immediate reply, but can do other things
Middleware often ensures fault tolerance
11 / 49
Communication: Remote procedure call Basic RPC operation
Observations
Application developers are familiar with simple procedure model
Well-engineered procedures operate in isolation (black box)
There is no fundamental reason not to execute procedures on separate
machine
Wait for result
Client
12 / 49
Communication: Remote procedure call Basic RPC operation
3. Message is sent
across the network
1 Client procedure calls client stub. 6 Server does local call; returns result to stub.
2 Stub builds message; calls local OS. 7 Stub builds message; calls OS.
3 OS sends message to remote OS. 8 OS sends message to client’s OS.
4 Remote OS gives message to stub. 9 Client’s OS gives message to stub.
5 Stub unpacks parameters; calls 10 Client stub unpacks result; returns to client.
server.
13 / 49
Communication: Remote procedure call Parameter passing
Conclusion
Client and server need to properly interpret messages, transforming them into
machine-dependent representations.
14 / 49
Communication: Remote procedure call Parameter passing
Some assumptions
Copy in/copy out semantics: while procedure is executed, nothing can be
assumed about parameter values.
All data that is to be operated on is passed by parameters. Excludes
passing references to (global) data.
15 / 49
Communication: Remote procedure call Parameter passing
Some assumptions
Copy in/copy out semantics: while procedure is executed, nothing can be
assumed about parameter values.
All data that is to be operated on is passed by parameters. Excludes
passing references to (global) data.
Conclusion
Full access transparency cannot be realized.
15 / 49
Communication: Remote procedure call Parameter passing
Some assumptions
Copy in/copy out semantics: while procedure is executed, nothing can be
assumed about parameter values.
All data that is to be operated on is passed by parameters. Excludes
passing references to (global) data.
Conclusion
Full access transparency cannot be realized.
15 / 49
Communication: Remote procedure call Variations on RPC
Asynchronous RPCs
Essence
Try to get rid of the strict request-reply behavior, but let the client continue
without waiting for an answer from the server.
Asynchronous RPC 16 / 49
Communication: Remote procedure call Variations on RPC
Essence
Sending an RPC request to a group of servers.
Callbacks to client
Client
Call remote
procedures
Multicast RPC 17 / 49
Communication: Remote procedure call Example: DCE RPC
RPC in practice
Uuidgen
Interface
definition file
IDL compiler
#include #include
Runtime Runtime
Linker Linker
library library
Client Server
binary binary
Issues
(1) Client must locate server machine, and (2) locate the server.
Directory machine
Directory
server
2. Register service
3. Look up server
Server machine
Client machine
Server
socket bind listen accept receive send close
20 / 49
Communication: Message-oriented communication Simple transient messaging with sockets
Client
1 from socket import *
2 s = socket(AF_INET, SOCK_STREAM)
3 s.connect((HOST, PORT)) # connect to server (block until accepted)
4 s.send(’Hello, world’) # send same data
5 data = s.recv(1024) # receive the response
6 print data # print the result
7 s.close() # close the connection
21 / 49
Communication: Message-oriented communication Advanced transient messaging
Observation
Sockets are rather low level and programming mistakes are easily made.
However, the way that they are used is often the same (such as in a
client-server setting).
Alternative: ZeroMQ
Provides a higher level of expression by pairing sockets: one for sending
messages at process P and a corresponding one at process Q for receiving
messages. All communication is asynchronous.
Three patterns
Request-reply
Publish-subscribe
Pipeline
Request-reply
Server
1 import zmq
2 context = zmq.Context()
3
4 p1 = "tcp://"+ HOST +":"+ PORT1 # how and where to connect
5 p2 = "tcp://"+ HOST +":"+ PORT2 # how and where to connect
6 s = context.socket(zmq.REP) # create reply socket
7
8 s.bind(p1) # bind socket to address
9 s.bind(p2) # bind socket to address
10 while True:
11 message = s.recv() # wait for incoming message
12 if not "STOP" in message: # if not to stop...
13 s.send(message + "*") # append "*" to message
14 else: # else...
15 break # break out of loop and end
Request-reply
Client
1 import zmq
2 context = zmq.Context()
3
4 php = "tcp://"+ HOST +":"+ PORT # how and where to connect
5 s = context.socket(zmq.REQ) # create socket
6
7 s.connect(php) # block until connected
8 s.send("Hello World") # send message
9 message = s.recv() # block until response
10 s.send("STOP") # tell server to stop
11 print message # print result
Publish-subscribe
Server
1 import zmq, time
2
3 context = zmq.Context()
4 s = context.socket(zmq.PUB) # create a publisher socket
5 p = "tcp://"+ HOST +":"+ PORT # how and where to communicate
6 s.bind(p) # bind socket to the address
7 while True:
8 time.sleep(5) # wait every 5 seconds
9 s.send("TIME " + time.asctime()) # publish the current time
Client
1 import zmq
2
3 context = zmq.Context()
4 s = context.socket(zmq.SUB) # create a subscriber socket
5 p = "tcp://"+ HOST +":"+ PORT # how and where to communicate
6 s.connect(p) # connect to the server
7 s.setsockopt(zmq.SUBSCRIBE, "TIME") # subscribe to TIME messages
8
9 for i in range(5): # Five iterations
10 time = s.recv() # receive a message
11 print time
Using messaging patterns: ZeroMQ 25 / 49
Communication: Message-oriented communication Advanced transient messaging
Pipeline
Source
1 import zmq, time, pickle, sys, random
2
3 context = zmq.Context()
4 me = str(sys.argv[1])
5 s = context.socket(zmq.PUSH) # create a push socket
6 src = SRC1 if me == ’1’ else SRC2 # check task source host
7 prt = PORT1 if me == ’1’ else PORT2 # check task source port
8 p = "tcp://"+ src +":"+ prt # how and where to connect
9 s.bind(p) # bind socket to address
10
11 for i in range(100): # generate 100 workloads
12 workload = random.randint(1, 100) # compute workload
13 s.send(pickle.dumps((me,workload))) # send workload to worker
Pipeline
Worker
1 import zmq, time, pickle, sys
2
3 context = zmq.Context()
4 me = str(sys.argv[1])
5 r = context.socket(zmq.PULL) # create a pull socket
6 p1 = "tcp://"+ SRC1 +":"+ PORT1 # address first task source
7 p2 = "tcp://"+ SRC2 +":"+ PORT2 # address second task source
8 r.connect(p1) # connect to task source 1
9 r.connect(p2) # connect to task source 2
10
11 while True:
12 work = pickle.loads(r.recv()) # receive work from a source
13 time.sleep(work[1]*0.01) # pretend to work
Representative operations
Operation Description
MPI bsend Append outgoing message to a local send buffer
MPI send Send a message and wait until copied to local or
remote buffer
MPI ssend Send a message and wait until transmission starts
MPI sendrecv Send a message and wait for reply
MPI isend Pass reference to outgoing message, and continue
MPI issend Pass reference to outgoing message, and wait until
receipt starts
MPI recv Receive a message; block if there is none
MPI irecv Check if there is an incoming message, but do not
block
Message-oriented middleware
Essence
Asynchronous persistent communication through support of middleware-level
queues. Queues correspond to buffers at communication servers.
Operations
Operation Description
put Append a message to a specified queue
get Block until the specified queue is nonempty, and
remove the first message
poll Check a specified queue for messages, and remove
the first. Never block
notify Install a handler to be called when a message is put
into the specified queue
Message-queuing model 29 / 49
Communication: Message-oriented communication Message-oriented persistent communication
General model
Queue managers
Queues are managed by queue managers. An application can put messages
only into a local queue. Getting a message is possible by extracting it from a
local queue only ⇒ queue managers need to route messages.
Routing
Look up
Source queue contact address Destination queue
manager of destination manager
queue manager
Logical
queue-level
address (name)
Contact
Network address
Message broker
Observation
Message queuing systems assume a common messaging protocol: all
applications agree on message format (i.e., structure and data representation)
Message brokers 31 / 49
Communication: Message-oriented communication Message-oriented persistent communication
Application Application
Interface Interface
Queuing
layer
Local OS Local OS Local OS
Message brokers 32 / 49
Communication: Message-oriented communication Example: IBM’s WebSphere message-queuing system
IBM’s WebSphere MQ
Basic concepts
Application-specific messages are put into, and removed from queues
Queues reside under the regime of a queue manager
Processes can put messages only in local queues, or through an RPC
mechanism
Message transfer
Messages are transferred between queues
Message transfer between queues at different processes, requires a
channel
At each end point of channel is a message channel agent
Message channel agents are responsible for:
Setting up channels using lower-level network communication
facilities (e.g., TCP/IP)
(Un)wrapping messages from/in transport-level packets
Sending/receiving packets
Overview 33 / 49
Communication: Message-oriented communication Example: IBM’s WebSphere message-queuing system
IBM’s WebSphere MQ
Schematic overview
Client's receive
Sending client Routing table Send queue queue Receiving client
Queue Queue
manager manager
MQ Interface MQ Interface
Channels 35 / 49
Communication: Message-oriented communication Example: IBM’s WebSphere message-queuing system
IBM’s WebSphere MQ
Routing
By using logical names, in combination with name resolution to local queues, it
is possible to put a message in a remote queue
Alias table
LA1 QMA
SQ1
LA2 QMC
QMD
Message transfer 36 / 49
Communication: Multicast communication Application-level tree-based multicasting
Application-level multicasting
Essence
Organize nodes of a distributed system into an overlay network and use that
network to disseminate data:
Oftentimes a tree, leading to unique paths
Alternatively, also mesh networks, requiring a form of routing
37 / 49
Communication: Multicast communication Application-level tree-based multicasting
Basic approach
1 Initiator generates a multicast identifier mid.
2 Lookup succ(mid), the node responsible for mid.
3 Request is routed to succ(mid), which will become the root.
4 If P wants to join, it sends a join request to the root.
5 When request arrives at Q:
Q has not seen a join request before ⇒ it becomes forwarder; P
becomes child of Q. Join request continues to be forwarded.
Q knows about tree ⇒ P becomes child of Q. No need to forward
join request anymore.
38 / 49
Communication: Multicast communication Application-level tree-based multicasting
Different metrics
End host E Router
A 1 1 1 C
Ra Re Rc
30 20
5
7
40 Rd
1 Rb 1
Internet D
B
Overlay network
Link stress: How often does an ALM message cross the same physical
link? Example: message from A to D needs to cross hRa, Rbi twice.
Stretch: Ratio in delay between ALM-level path and network-level path.
Example: messages B to C follow path of length 73 at ALM, but 47 at
network level ⇒ stretch = 73/47.
Flooding
40 / 49
Communication: Multicast communication Flooding-based multicasting
Flooding
Variation
Let Q forward a message with a certain probability pflood , possibly even
dependent on its own number of neighbors (i.e., node degree) or the degree of
its neighbors.
40 / 49
Communication: Multicast communication Gossip-based data dissemination
Epidemic protocols
41 / 49
Communication: Multicast communication Gossip-based data dissemination
Anti-entropy
Principle operations
A node P selects another node Q from the system at random.
Pull: P only pulls in new updates from Q
Push: P only pushes its own updates to Q
Push-pull: P and Q send updates to each other
Observation
For push-pull it takes O(log(N)) rounds to disseminate updates to all N nodes
(round = when every node has taken the initiative to start an exchange).
Anti-entropy: analysis
Basics
Consider a single source, propagating its update. Let pi be the probability that
a node has not received the update after the i th round.
Anti-entropy performance
1.0
N = 10,000
Probability not yet updated 0.8
0.6 push
0.4 push-pull
pull
0.2
0 5 10 15 20 25
Round
Rumor spreading
Basic model
A server S having an update to report, contacts other servers. If a server is
contacted to which the update has already propagated, S stops contacting
other servers with probability pstop .
Observation
If s is the fraction of ignorant servers (i.e., which are unaware of the update), it
can be shown that with many servers
−(1/pstop +1)(1−s)
s=e
Formal analysis
Notations
Let s denote fraction of nodes that have not yet been updated (i.e., susceptible;
i the fraction of updated (infected) and active nodes; and r the fraction of
updated nodes that gave up (removed).
Wrapup
i(1) = 0 ⇒ C = 1 + pstop ⇒ i(s) = (1 + pstop ) · (1 − s) + pstop · ln(s). We are
−(1/pstop +1)(1−s)
looking for the case i(s) = 0, which leads to s = e
Rumor spreading
The effect of stopping
Rumor spreading
The effect of stopping
Note
If we really have to ensure that all servers are eventually updated, rumor
spreading alone is not enough
Deleting values
Fundamental problem
We cannot remove an old value from a server and expect the removal to
propagate. Instead, mere removal will be undone in due time using epidemic
algorithms
Solution
Removal has to be registered as a special update by inserting a death
certificate
Removing data 48 / 49
Communication: Multicast communication Gossip-based data dissemination
Deleting values
When to remove a death certificate (it is not allowed to stay for ever)
Run a global algorithm to detect whether the removal is known
everywhere, and then collect the death certificates (looks like garbage
collection)
Assume death certificates propagate in finite time, and associate a
maximum lifetime for a certificate (can be done at risk of not reaching all
servers)
Note
It is necessary that a removal actually reaches all servers.
Removing data 49 / 49