Interprocess Communication in Unix: An Introduction To Concurrent Programming
Interprocess Communication in Unix: An Introduction To Concurrent Programming
Interprocess Communication in Unix: An Introduction To Concurrent Programming
Contents
Process Creation and Management Signals Pipes, Named Pipes and Unix Sockets IO Multiplexing Message Queues Shared Memory Semaphores POSIX Threads
Standards
Linux, HP-UX, Solaris, AIX, BSD, IRIX, etc. Different capabilities, different APIs for advanced functionalities
Standards?
UNIX System V (Release 4) BSD 4.3 (1988) POSIX.1 (1994) POSIX Spec. 1170 (2001) IEEE Std. 1003.1-2001 (or POSIX)
POSIX Extensions
Implementations that comply with the base standard define _POSIX_VERSION to 200112L in <unistd.h>
Process Model
Process creation in Unix is based on spawning child processes which inherit all the characteristics of their fathers
Original Process
Variables, program counter, open files, etc. Spawning a process is done using the fork() system call
After forking, each process will be executing having different variables and different state.
Original Process
The Program Counter will be pointing to the next instruction Changing a variable in the child program doesnt affect its father (and vice-versa)
Process Management
Each process has an unique identifier (PID). Each process has a father, which is also identified (PPID). pid_t getpid(void); Returns the PID of the current process. pid_t getppid(void); Returns the PID of the parent process. pid_t fork(void); Creates a new process which inherits all its fathers state. It returns 0 on the original process and the childs PID in the spawned process. pid_t wait(int* status); Waits until a child process exits. The status of the child is set in status. (status is the return value of the process) pid_t waitpid(pid_t who, int* status, int options); Same as wait() but allows to wait for a particular child. In options, by using WNOHANG in options, allows for checking if a child has already exited without blocking. 0 in who means wait for any child.
9
11
A process is only truly eliminated by the operating system when its father calls wait()/waitpid() on it.
This allows the parent check things like the exit code of its sons
Zombie Process: One that has died and its parent has not acknowledged its death (by calling wait())
Be careful with this if your are designing servers. They are eating up resources!!
Orphan Process: One whose original parent has died. In that case, its parent becomes init (process 1).
12
void worker() { printf("[%d] Hi, I'm a worker process! Going to die...\n", getpid()); } int main() { for (int i=0; i<10; i++) { if (fork() == 0) { worker(); exit(0); } } printf("[%d] Big father is sleeping!\n", getpid()); sleep(10); return 0; }
13
Zombies (2)
14
16
Dont do it!
Fairly common...
17
Server
Client
Client
Client
18
Somehow the OS must be able to execute code starting from an executable file
e.g. how does the shell (bash) becomes ls? execl(const char *path, const char *arg, ...); execlp(const char *file, const char *arg, ...); execle(const char *path, const char *arg, ..., char *const envp[]); execv(const char *path, char *const argv[]); execvp(const char *file, char *const argv[]); Allow to substitute the current process executable image by another one. The substitution is complete! The functions that have a p make use of the environment PATH; The functions that have a v make use of a pointer to an array on parameters; The functions that have an l have the parameters passed separated by commas Make sure that the first parameter is the name of the program!
19
Example
The code, the stack, the heap, its all replaced by the new executable exec()
Original Code ls code
ls code
20
int main() { if (execlp("ls", "ls", "-a", NULL) == -1) perror("Error executing ls: "); else printf("This cannot happen!\n"); return 0; }
char* ls_param[] = { "ls", "-a", NULL }; if (execvp(ls_param[0], ls_param) == -1) perror("Error executing ls: ");
21
Signals
A signal represents an asynchronous event which an application must (should? can?) process
The programmer can register a routine to handle such events The user hits Ctrl+C SIGINT The system requests the application to terminate SIGTERM The program tried to write to a closed channel SIGPIPE Process SIGINT normal flow of execution
int sigint_handler() { // process signal }
Examples:
23
Signals (2)
Blocked: Upon arrival, they are stored in a queue until the process unblocks them. Then, they are delivered. Ignored: Upon arrival, they are discarded. It is as if they had never existed. Being Handled: They are redirected to a signal handler which is called. None of the above: Non-handled, non-blocked or ignored signals. Upon arrival, they cause program termination.
Some signals cannot be ignored or handled (e.g. SIGKILL) When a process starts, signals are on their default behavior.
Some are ignored, most are in the non-handled, non-blocked nor ignored state. If a signal occurs, the process will die.
24
sighandler_t signal(int signum, sighandler_t handler); Redirects a certain signal (signum) to a handler routine. int kill(pid_t pid, int sig); Sends a signal to a certain process identified by a PID. (Note: if pid is 0, sends to all processes in the current process group.) int pause(); Blocks the process until a signal is received.
Note: These are the recommended POSIX routines. We are not going to cover them here. The problem with signal() is that in certain cases its behavior is undefined across systems.
25
Handling a signal
void sigint(int signum) { char option[2];
signal(SIGINT, sigint);
printf("\n ^C pressed. Do you want to abort? "); scanf("%1s", option); if (option[0] == 'y') { printf("Ok, bye bye!\n"); exit(0); } } int main() { // Redirects SIGINT to sigint() signal(SIGINT, sigint); // Do some work! while (1) { printf("Doing some work...\n"); sleep(1); } return 0; }
26
27
signal(SIGINT, SIG_IGN)
Ignores SIGINT
signal(SIGINT, SIG_DFL)
Restores SIGINT to its default handling
28
Its completely asynchronous: you never know when you are going to get a signal This means that you have to protect all calls!
errno==EINTR means that a certain routine was interrupted and has to be tried again. Other routines return other things. It you are using signals, you have to protect them against all that!
29
30
Sending a signal
Its just a question of calling kill() with the PID of the target process...
void master(pid_t pid_son) { printf("Master sleeping for a while...\n"); sleep(3); printf("Master says: Hello son!\n"); kill(pid_son, SIGUSR1); } int main() { pid_t son; // Creates a worker process if ((son=fork()) == 0) { worker(); exit(0); } // The master master(son); wait(NULL); return 0; }
31
32
33
Danger!!!
What do you think it will happen if you receive a signal inside a signal handler??
In most systems, upon entering a signal handling routine, all signals of that type become blocked (i.e. they are queued). [Well, for normal signals, a finite set of them are queued (typically 1); for real time signals, all are...] The other signals are still processed asynchronously if they arrive. This behavior is not consistent across systems. In fact, in some systems, that signal type resets to its default behavior. This means that if, meanwhile, the program receives a signal of the same type it may die! On that type of system, the first thing that you must do is to once again set the signal handler.
void dady_call(int signum) { signal(SIGUSR1, dady_call); printf("Dady has just called in!\n"); }
Well... doesnt really solve the problem, it just makes it less likely. The new POSIX routines address this use them. Also, most system nowadays dont reset the signal handler.
34
Beware
Signal numbers vary across operating systems and architectures. Dont rely on them, use symbolic constants
man 7 signal
Linux, i386
35
Some Signals
36
Pipes, Named Pipes and Stream Unix SOCKETS allow processes to communicate using streams of data
A pipe is a connection between two processes. You can send things through the pipe, you can try to receive things from the pipe.
pipe
write() read()
If a process tries to write to a pipe that is full, it blocks If a process tries to read from a pipe that is empty, it blocks
38
Pipes
Provides for communication amount processes that are hierarchically related (i.e. father-child)
Whenever a pipe is created, using pipe(), two file descriptors are opened: one for reading (fd[0]), one for writing (fd[1])
fd[1] writing
fd[0] reading
39
Example
typedef struct { int a; int b; } numbers; // File descriptors for the pipe channel int channel[2]; (...) int main() { // Create a pipe pipe(channel); // Create the processes if (fork() == 0) { worker(); exit(0); } master(); wait(NULL); return 0; }
40
Example (cont.)
void worker() { numbers n; close(channel[1]); while (1) { read(channel[0], &n, sizeof(numbers)); printf("[WORKER] Received (%d,%d) from master to add. Result=%d\n", n.a, n.b, n.a+n.b); } } void master() { numbers n; close(channel[0]); while (1) { n.a = rand() % 100; n.b = rand() % 100; printf("[MASTER] Sending (%d,%d) for WORKER to add\n", n.a, n.b); write(channel[1], &n, sizeof(numbers)); sleep(2); } }
41
42
Be careful!
A pipe is a finite buffer. If you try to write too much too quickly into it, the process will block until some space clears up. Atomicity is something to be dealt with
If you try to write less that PIPE_BUF bytes into a pipe, you are guarantied that it will be written atomically It you try to write more, you have no guaranties! If several processes are writing at the same time, the writes can be interleaved Also, when a process tries to read from a pipe, you are not guarantied that it will be able to read everything You must synchronize your writes when youre writing a lot of data! You must ensure that you read complete messages!
struct person p; int n, total = 0; while (total < sizeof(p)) { n = read(fd[0], (char*)p + total, sizeof(p)-total); total+= n; }
Meaning...
43
Each process has a file descriptor table. By default, entries 0, 1 and 2 are: stdin, stdout, stderr. Each time a file is open, an entry is added to this table. Each time a file is closed, the corresponding entry becomes available. The process descriptor table, in fact, contains only references to the OS global file descriptor table. 0 1 2 3 4 5 6 <free> <free> f2 f3
File Descriptor Table after: open(f1) open(f2) open(f3) close(f1)
44
int dup(int fd) Duplicates file descriptor fd on the first available position of the file descriptor table. int dup2(int fd, int newfd) Duplicates file descriptor fd on the newfd position, closing it if necessary.
Note that after a file descriptor is duplicated, the original and the duplicate can be used interchangeably. They share the file pointers, the buffers, locks, etc.
Careful: Closing one file descriptor doesnt close all other that have been duplicated!
45
46
Resulting in...
47
NAMED PIPES
48
Each pipe has a name (string). The pipe is written persistently in the file system. For creating a named pipe, use the mkfifo command or call mkfifo(const char* filename, mode_t mode);
Means that they must be open read-only or write-only They are opened like files, but they are not files You cannot fseek() a named pipe; write() always appends to the pipe, read() always returns data from the beginning of the pipe. After data is read from the named pipe, its no longer there. Its not a file, its an object in the unix kernel!
49
int main() { // Creates the named pipe if it doesn't exist yet if ((mkfifo(PIPE_NAME, O_CREAT|O_EXCL|0600)<0) && (errno!= EEXIST)) perror("Cannot create pipe: "); exit(0); } // Opens the pipe for reading int fd; if ((fd = open(PIPE_NAME, O_RDONLY)) < 0) { perror("Cannot open pipe for reading: "); exit(0); } // Do some work numbers n; while (1) { read(fd, &n, sizeof(numbers)); printf("[SERVER] Received (%d,%d), adding it: %d\n", n.a, n.b, n.a+n.b); } return 0; }
50
int main() { // Opens the pipe for writing int fd; if ((fd = open(PIPE_NAME, O_WRONLY)) < 0) { perror("Cannot open pipe for writing: "); exit(0); } // Do some work while (1) { numbers n; n.a = rand() % 100; n.b = rand() % 100; printf("[CLIENT] Sending (%d,%d) for adding\n", n.a, n.b); write(fd, &n, sizeof(numbers)); sleep(2); } return 0; }
51
Executing them...
52
If you get a SIGPIPE signal, this means that you are trying to read/write from a closed pipe A named pipe is a connection between two processes. A process blocks until the other party open the pipe...
Being it for reading or writing. Its possible to bypass this behavior (open it non-blocking O_NONBLOCK), but be very, very careful: if not properly programmed, it can lead to busy waiting. If a named pipe is open non-blocking, EOF is indicated when read() returns 0. When designing a client/server multiple client application, this means that either the pipe is re-opened after each client disconnects, or the pipe is open read-write. If opened read-write, the server will not block until the other party connects (since, he itself is also another party!)
53
Interesting Problem
A printer daemon is connected to a physical printer There are 3 named-pipes which allow automatic formatted printing
Pipes are blocking, so this doesnt work!!!
/printer/a4_double_sided
/printer/a4_single_sided
Printer Daemon
/printer/a3_single_sided
55
I/O Multiplexing
I/O Multiplexing: The ability to examine several file descriptors at the same time
int select(int n, fd_set* readfd, fd_set* writefd, fd_set* exceptfd, struct timeval* timeout)
Greatest fd plus one For reading activity For writing activity For out-of-band activity
Blocks until activity is detected or a timeout occurs. The fd_set variables are input/output. Upon return, they indicate if there was activity in a certain descriptor or not.
56
select()
Its not the number of file descriptors A bit set representing file descriptors
fd_set
FD_ZERO(fd_set* set) Cleans up the file descriptor set FD_SET(int fd, fd_set* set) Sets a bit in the file descriptor set FD_CLEAR(int fd, fd_set* set) Clears a bit in the file descriptor set FD_ISSET(int fd, fd_set* set) Tests if a file descriptor is set
57
Example (printerd.c)
(...) #define BUF_SIZE #define NUM_PRINTERS const char* PRINTER_NAME[] = { "printer1", "printer2", "printer3" }; // The printer file descriptors int printer[NUM_PRINTERS]; void create_printers() { for (int i=0; i<NUM_PRINTERS; i++) { unlink(PRINTER_NAME[i]); mkfifo(PRINTER_NAME[i], O_CREAT|O_EXCL|0666); printer[i] = open(PRINTER_NAME[i], O_RDONLY|O_NONBLOCK); assert(printer[i] >= 0); } } int main(int argc, char* argv[]) { create_printers(); accept_requests(); }
58
4096 3
Resulting in...
60
Types of communication
The sender requests the transmission of N bytes The data starts flowing, the receiver starts getting it The receiver may get several chucks of less then N bytes
Its like sending a letter. Either you get if fully or you dont. You dont get half a letter.
169
62
Message Queues
Completely asynchronous
A process can start executing, write some messages to a message queue and die; another process can latter on come alive an receive them. Sharp contrast with all the mechanisms that weve seen so far, which require both the sender and the receiver to be present at the same time Message queues are maintained by the operating system. They are not destroyed if a process dies! msgrcv()
msgsnd()
63
int msgget(key_t key, int flags) Obtains an identifier to an existing message queue or creates a new one.
key can be IPC_PRIVATE (which creates a new unique identifier), or an existing identifier. ftok() can be used to generate a number based on a filename. flags, normal mode flags. When ORed with IPC_CREAT creates a new one.
int msgctl(int mqid, int cmd, struct msqid_ds* buff) Provides a variety of control operations on the message queue.
mqid is the value returned by msgget() cmd is the command (most usually: IPC_RMID to remove it) buff a structure used in some control operations
64
int msgsnd(int mqid, const void* message, size_t length, int flags) Sends a message to a certain key.
mqid is the value returned by msgget() message its a pointer to the message to send length represents the length of the payload of the message (not total) flags: 0 or IPC_NOWAIT (non-blocking)
int msgrcv(int mqid, void* message, size_t length, int type, int flags) Retrieves a message from a message queue.
mqid is the value returned by msgget() message its a pointer to the message to receive length represents the maximum payload we are willing to receive type represent the type of message to receive (0 FIFO) flags: 0 or IPC_NOWAIT
65
In System V a message can be anything. But, it must always have a long integer in the beginning
Payload
66
mq_pong.c (1)
typedef struct { long mtype; int first, second; } numbers_msg; // Message queue id int id; void cleanup(int signum) { msgctl(id, IPC_RMID, NULL); exit(0); } void main(int argc, char* argv[]) { assert( (id = msgget(IPC_PRIVATE, IPC_CREAT|0700)) != 0 ); signal(SIGINT, cleanup); if (fork() == 0) ping(); else pong(); }
67
mq_pong.c (2)
void ping() { numbers_msg msg; msg.first = rand() % 100; msg.second = rand() % 100; while (1) { msg.mtype
= 1;
printf("[A] Sending (%d,%d)\n", msg.first, msg.second); msgsnd(id, &msg, sizeof(msg)-sizeof(long), 0); msgrcv(id, &msg, sizeof(msg)-sizeof(long), 2, 0); printf("[A] Received (%d,%d)\n", msg.first, msg.second); ++msg.first; ++msg.second; sleep(3); } }
68
mq_pong.c (3)
void pong() { numbers_msg msg; while (1) { msgrcv(id, &msg, sizeof(msg)-sizeof(long), 1, 0); printf("[B] Received (%d,%d)\n", msg.first, msg.second); msg.mtype = 2; ++msg.first; ++msg.second; printf("[B] Sending (%d,%d)\n", msg.first, msg.second); msgsnd(id, &msg, sizeof(msg)-sizeof(long), 0); } }
69
Resulting in...
70
IPC Resources
Remember, IPC resources are not automatically cleaned This can lead so serious resource leaks
ipcs allows you to see the current System V IPCs in use ipcrm allows you to manually delete resources
71
Message types represent priorities A read from a POSIX message queue always returns the oldest message of the largest type (priority) POSIX message queues allow a signal to be raised when a message is put on an empty queue or the initiation of a thread Messages queues are represented by names on the file system (like named pipes)
Up until now...
User Space
P1
write()
P2
read()
Kernel Space
buffer
74
Shared Memory
(Almost) No kernel involvement! Fast! Very Fast! Dangerous, very dangerous! shared buffer User Space P1 P2
Kernel Space
75
Each address space corresponds to a page table. There are as many page tables as there are processes
Shared memory corresponds to putting the same real memory pages in the page tables of two different processes
4Gb 4Gb 5000 (slightly simplified) 256Mb
1000 0
Address Space Process A
1000 0
Address Space Process B Page Translation
0
Physical Memory
76
int shmget(key_t key, int size, int flags) Obtains an identifier to an existing shared memory or creates a new one.
key can be IPC_PRIVATE (which creates a new unique identifier), or an existing identifier. ftok() can be used to generate a number based on a filename. size its the shared memory size in bytes flags, normal mode flags. When ORed with IPC_CREAT creates a new one.
int shmctl(int shmid, int cmd, struct shmid_ds* buff) Provides a variety of control operations on the shared memory.
shmid is the value returned by shmget() cmd is the command (most usually: IPC_RMID to remove it) buff a structure used in some control operations
77
int shmat(int shmid, const void* where, int flags) Maps a certain shared memory region into the current process address space.
shmid represents the shared memory identifier shmid returned by shmget() where represents an unused address space location where to map the shared memory (normally, use NULL) flags represent different ways of doing the mapping (typically 0)
int shmdt(const void* where) Unmaps a certain shared memory region from the current address space.
where represents an unused address space location where to map the shared memory (normally, use NULL)
78
Real Memory p
19 59 19 59
(id.. 1234)
19 59
P1
print_work(a, 12);
P2
print_work(b, 65);
80
Synchronization Semaphores
Controlled access to a counter (a value) Two operations are supported: wait() and post()
wait()
post()
If the semaphore is positive, decrement it and continue If not, block the calling process (thread) Increment the semaphore value If there was any process (thread) blocked due to the semaphore, unblock one of them.
5 P1 P6 P3 NULL
A semaphore
81
Corrected version
You always have to synchronize, even if you are only reading or writing one byte!
82
Synchronization Semaphores
System V Semaphores
Works with semaphore arrays semget(), semctl(), semop() A little bit hard to use by themselves Use a library to encapsulate them!
POSIX Semaphores
Quite easy to use sem_init(), sem_close(), sem_post(), sem_wait() Also work with threads! But... in Linux, they only work with threads (kernel 2.4)
Semaphores are very useful for other things besides mutual exclusion: they can be used to count things!
83
semaphore.h
My Semaphore Library
84
Example Producer/Consumer
A producer puts elements on a finite buffer. If the buffer is full, it blocks until theres space. The consumer retrieves elements. If the buffer is empty, it blocks until something comes along.
Producer
Consumer
One to count the empty slots One to count the full slots One to provide for mutual exclusion to the shared buffer
85
Example Producer/Consumer
read_pos write_pos Consumer
mutex Producer
empty
put_element(e) { sem_wait(empty); sem_wait(mutex); buf[write_pos] = e; write_pos = (write_pos+1) % N; sem_post(mutex); sem_post(full); }
full
get_element() { sem_wait(full); sem_wait(mutex); e = buf[read_pos]; read_pos = (read_pos+1) % N; sem_post(mutex); sem_post(empty); return e; } 86
87
88
90
Remember
Always cleanup...
91
92
93
94
Threads
They share all the address space of a process Each thread has its own stack and local variables (even so, they can access others threads variables)
Process
Thread 1
Thread 2
Thread 3
96
97
98
99
Linux
gcc lpthread D_REENTRANT Wall fich.c o fich
It instructs the compiler to use special re-entrant routine functions If you dont... it ONLY appears to work, until you get in trouble!
Beware: Many routines are not re-entrant, they cannot be directly used with threads since they use common storage in an unsynchronized way (e.g. stktok())! In some cases, there are re-entrant versions (e.g. strtok_r()). Check the manual! Dont trust common sense.
101
What happens if this is called from two different threads at the same time??
102
Things to beware of
Doesnt work, (1) after main() dies, its variables disappear race condition with the starting threads; (2) main() dies everything dies!
103
This is OK!
Note: the other threads continue to execute.
104
Synchronization
Mutexes
Provide mutual exclusion zones between threads In fact, these are just fast binary semaphores
POSIX Semaphores
Used to signal events across threads Used to count objects in an synchronized way
Condition Variables
Allow a thread to block or to notify others on any condition Semaphores are a kind of condition variable: the implicit condition is the semaphore being greater than 0
105
Mutexes
106
POSIX Semaphores
107
Producer/Consumer Revisited
Producer
Producer
Consumer
Producer
108
prod_cons_threads.c
109
prod_cons_threads.c (2)
110
111
Condition variables allow the programmer to suspend or notify a thread on any condition!
112
Important rule:
A condition variable always has an associated mutex. Always check the condition variable in mutual exclusion. The mutex must be locked.
113
The thread tests a condition in mutual exclusion. If the condition is false, pthread_cond_wait() atomically releases de mutex AND waits until someone signals that the condition should be tested again. When the condition is signaled AND the mutex is available, pthread_cond_wait() atomically reacquires the mutex AND releases the thread. pthread_cond_signal() indicates that exactly one blocked thread should test the condition again. Note that this is note a semaphore. If there is no thread blocked, the signal is lost. If all threads should re-check the condition, use pthread_cond_broadcast(). Since a mutex is involved, each one will test it one at a time, in mutual exclusion.
114
Example
Suppose a buffer that can hold a maximum of N elements. When it is full, it should immediately be emptied. While the buffer is being emptied, no thread can put things into it.
Producer
Producer
Producer
Cleaner
115
bounded_buffer.c
116
bounded_buffer.c (2)
117
118
The condition must always be tested with a while loop, never an if! Being unlocked out of a condition variable only means that the condition must be re-checked, not that it is true The condition must always be checked and signaled inside a locked mutex.
While condition() may be true while Thread B is executing, something may happen between the time that the condition is signaled and Thread A is unblock (e.g. another thread may change the condition)
119
Locks should always be taken in the same order Always release locks in the reverse order they have been taken
Deadlock!
One way to assure that you always take locks in the same order is to create a lock hierarchy. I.e. associate a number to each lock using a table and always lock in increasing order using that table as reference (index).
120
Sometimes it is not possible to know each order to take when locking (or using semaphores)
Example: you are using two resources owned by the operating system. They are controlled by locks. You cannot be sure another application is not using exactly the same resources and locking in reverse order.
In that case, use pthread_mutex_trylock() or sem_trywait() and back off if you are unsuccessful.
To Learn More
122