Mutual Exclusion#
Mutual exclusion is the requirement that only one thread or process at a time may execute a particular part of a program. We need mutual exclusion when shared state can be read and written by more than one execution context.
Critical Sections#
A critical section is a region of code that accesses shared state and must not be executed concurrently unless the program has been designed to permit that concurrency.
int count = 0;
void increment(int n) {
for(int i = 0; i < n; i++) {
int x = count;
x = x + 1;
count = x;
}
}
int main(int argc, char* argv[]) {
pthread_t t1, t2;
....
pthread_create(&t1, ..., increment)
pthread_create(&t2, ..., increment)
}
If two threads call increment() at the same time, both threads read
and write the shared variable count. The expected final value depends
on how many increments are requested, but the actual value can be lower
if the operations interleave.
Identifying the Critical Section#
The critical section in increment() is the sequence that reads,
modifies, and writes count.
int x = count;
x = x + 1;
count = x;
The program only behaves like a serial program if this sequence is treated as one indivisible operation. If two threads enter this section together, each thread can observe a stale value and overwrite the other thread’s update.
Execution Orders and Atomicity#
Atomicity means that an operation behaves as though it happens all at once. Most source-level statements are not atomic at the machine level.
int x = count;
x = x + 1;
count = x;
One possible pseudo-assembly expansion is:
load &count, r0
set r0, r1
add 1, r1
set r1, r0
store &count, r0
The compiler and CPU are not required to preserve the simple mental model of one C statement becoming one machine instruction. Values may remain in registers longer than expected, instructions may be reordered within the rules of the language and architecture, and a store may occur later than the programmer assumes.
Interleavings#
An interleaving is one possible ordering of instructions from two or more threads. Some interleavings are equivalent to serial execution, and some are not.
t0: load &count, r0
t0: set r0, r1
t1: load &count, r0
t0: add 1, r1
t1: set r0, r1
t0: set r1, r0
t0: store &count, r0
t1: add 1, r1
t1: set r1, r0
t1: store &count, r0
In this interleaving, both threads can read the same old value. Each
thread computes the same new value and stores it, so the final result is
1 instead of 2. A correct locking design eliminates the
interleavings that are not equivalent to some serial execution.
Characteristics of a Good Locking Solution#
A good locking solution protects correctness without adding unnecessary assumptions about scheduling, CPU speed, or the number of processors.
The main requirements are:
No two threads may be inside the same protected critical section at the same time.
A thread outside the critical section should not prevent another thread from entering it.
The solution should not depend on relative thread speeds.
A thread should not have to wait forever to enter the critical section.
Achieving Atomicity and Serializability#
Serializability means that concurrent execution produces a result that is equivalent to some serial ordering of the same operations.
There are two common models:
Pessimistic locking obtains the necessary locks before entering the critical section, performs the operation, and then releases the locks.
Optimistic locking performs the operation against a snapshot or log and commits the result only if no conflicting operation occurred.
Most operating systems material in this chapter uses pessimistic locking. Optimistic techniques are important in databases, transactional memory, and some high-concurrency data structures.
Types of Pessimistic Locks#
Pessimistic locking assumes that a conflict may happen and prevents the conflict before it can corrupt shared state.
Common techniques include disabling interrupts, strict alternation, spin locks, mutexes, semaphores, condition variables, monitors, and reader-writer locks.
Types of Optimistic Locks#
Optimistic locking assumes that conflicts are uncommon and checks for a conflict before committing the result.
Most non-database implementations use some kind of software transactional memory or compare-and-swap based retry loop. This approach can work well when conflicts are rare, communication is expensive, or the protected data structure is sparse.
Software Transactional Memory Example#
Software transactional memory lets code make tentative changes and then commit them only if the original data has not changed.
The systems-code-examples/stm example implements a small optimistic
transaction around a shared counter.
1#include "trans.hh"
2#include <pthread.h>
3#include <stdio.h>
4
5const int countPerThread = 10000000;
6volatile int count = 0;
7Transaction *trans = new Transaction(&count);
8
9void increment()
10{
11 const int chunkSize = 500;
12 for(int i = 0; i < countPerThread; i += chunkSize)
13 {
14 MemHandle* handle;
15 do
16 {
17 handle = trans->Begin();
18 for(int j = 0; j < chunkSize; j++)
19 {
20 handle->Value += 1;
21 }
22 }
23 while(!trans->TryCommit(handle));
24 }
25}
26
27int main( int argc, char* argv[])
28{
29
30 pthread_t thread1, thread2;
31 pthread_attr_t threadAttribute;
32
33 pthread_attr_init(&threadAttribute);
34 pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
35
36 printf("starting test. final count should be %d\n", 2*countPerThread);
37
38 pthread_create(&thread1, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
39 pthread_create(&thread2, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
40
41 pthread_join(thread1, NULL);
42 pthread_join(thread2, NULL);
43
44 if( count != 2 * countPerThread )
45 {
46 printf("****** Error. Final count is %d\n", count);
47 }
48 else
49 {
50 printf("****** OK. Final count is %d\n", count);
51 }
52
53 printf("rollback count = %d\n", trans->GetRollbackCount());
54
55 return 1;
56}
Key points:
Each thread increments the counter in chunks.
Begin()returns a handle containing a snapshot of the current value.The thread updates the handle instead of updating the shared counter directly.
TryCommit()commits the new value only if no other transaction has committed since the snapshot was taken.A failed commit causes the loop to retry.
1#include "trans.hh"
2#include <stdio.h>
3
4MemHandle::MemHandle(int value, int event)
5{
6 Value = value;
7 _event = event;
8}
9
10int MemHandle::getEvent()
11{
12 return _event;
13}
14
15Transaction::Transaction(volatile int *value)
16{
17 _value = value;
18 _event = 0;
19 _rollbacks = 0;
20 _lock = new Mutex();
21}
22
23Transaction::~Transaction()
24{
25 delete _lock;
26}
27
28int Transaction::GetRollbackCount()
29{
30 return _rollbacks;
31}
32
33MemHandle* Transaction::Begin()
34{
35 MemHandle* handle;
36 _lock->Lock();
37 handle = new MemHandle(*_value, _event);
38 _lock->Unlock();
39 return handle;
40}
41
42bool Transaction::TryCommit(MemHandle* value)
43{
44 bool success = false;
45 _lock->Lock();
46 if(_event == value->getEvent())
47 {
48 *_value = value->Value;
49 _event += 1;
50 success = true;
51 }
52 else
53 {
54 _rollbacks += 1;
55 }
56 _lock->Unlock();
57 delete value;
58 return success;
59}
Key points:
_eventis the version number for the shared value.Begin()copies the current value and event while holding the internal mutex.TryCommit()compares the saved event with the current event.A successful commit writes the new value and advances the event.
A failed commit increments the rollback count.
Disabling Interrupts#
Disabling interrupts prevents timer interrupts from invoking the scheduler on the current CPU. This can make a short kernel critical section atomic on that CPU.
This technique is not suitable for ordinary application code. Disabling interrupts is privileged, can hurt resource utilization, and can stop the system if interrupts are not restored. On multiprocessor systems it also does not automatically protect the same data on other CPUs.
Strict Alternation#
Strict alternation lets threads take turns entering a critical section,
usually through a shared turn variable.
while(TRUE) {
while(turn != 0) {}
critical_section();
turn = 1;
non_critical_section();
}
while(TRUE) {
while(turn != 1) {}
critical_section();
turn = 0;
non_critical_section();
}
Strict alternation is usually a poor solution. A slow thread in its non-critical section can prevent another thread from entering the critical section even when the critical section is free.
Spin Locks#
A spin lock is a lock where a waiting thread repeatedly checks whether the lock is available. The waiting thread stays runnable while it waits.
int lock = 0;
void increment(int n) {
for(int i = 0; i < n; i++) {
spin_lock(&lock);
int x = count;
x = x + 1;
count = x;
spin_unlock(&lock);
}
}
The lock and unlock calls make the update to count behave as a
single protected operation. Spin locks are useful for very short waits,
especially inside kernels, but they waste CPU time if the wait is long.
Spin Lock Implementation#
A spin lock needs an atomic operation that tests the old value and sets a new value as one indivisible operation.
spin_lock(int* lock) {
while(*lock == 1) { }
*lock = 1;
}
spin_unlock(int* lock) {
*lock = 0;
}
This pseudo-code is not correct. Two threads can both observe lock as
0 and then both set it to 1. The test and the store must be one
atomic operation.
spin_lock(int* lock) {
while(test_and_set(lock) == 1) {}
}
On x86 systems, this kind of operation can be implemented with an atomic
instruction such as xchg or cmpxchg.
Spin Lock Example#
The systems-code-examples/spin_lock example demonstrates the race in
the increment program and provides a spin lock implementation.
1#include "lock.h"
2#include <unistd.h>
3#include <pthread.h>
4#include <stdio.h>
5
6long int count;
7unsigned long lock = 0;
8
9void* increment( void* vp_ntimes )
10{
11 long ntimes = (long) vp_ntimes;
12 for(long i = 0; i < ntimes; i++)
13 {
14 int c;
15 //spin_lock(&lock);
16 c = count;
17 c = c+1;
18 count = c;
19 //spin_unlock(&lock);
20 }
21 return NULL;
22}
23
24int main( int argc, char* argv[])
25{
26 const long int n = 100000000;
27
28 pthread_t thread1, thread2;
29 pthread_attr_t threadAttribute;
30
31 pthread_attr_init(&threadAttribute);
32 pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
33
34 printf("starting test. final count should be %ld\n", 2*n);
35
36 pthread_create(&thread1, &threadAttribute, increment, (void *) n);
37 pthread_create(&thread2, &threadAttribute, increment, (void *) n);
38
39 pthread_join(thread1, NULL);
40 pthread_join(thread2, NULL);
41
42 if( count != 2 * n )
43 {
44 printf("****** Error. Final count is %ld\n", count);
45 }
46 else
47 {
48 printf("****** OK. Final count is %ld\n", count);
49 }
50
51 return 1;
52}
53
54
Key points:
Two pthreads both call
increment()and update the shared globalcount.The program checks whether the final value is
2 * n.The calls to
spin_lock()andspin_unlock()are present but commented out, which leaves the race visible.Uncommenting the lock calls protects the read-modify-write sequence.
1#include "lock.h"
2#define _GNU_SOURCE
3#include <pthread.h>
4
5unsigned long test_and_set(unsigned long *lock)
6{
7 unsigned long newval = 0;
8 asm volatile ("lock; cmpxchgl %2, %0"
9 : "+m" (*lock), "+a" (newval) /* output operands */
10 : "r" (1) /* input operands */
11 : "cc"); /* clobbered registers */
12 /* ("memory" means registers get flushed) */
13 return newval;
14}
15
16void spin_lock(unsigned long *lock)
17{
18 while(test_and_set(lock) == 1)
19 {
20 pthread_yield();
21 }
22}
23
24void spin_unlock(unsigned long *lock)
25{
26 *lock = 0;
27}
28
Key points:
test_and_set()uses inline assembly to perform an atomic compare-and-exchange operation.spin_lock()loops while another thread owns the lock.pthread_yield()gives the scheduler a chance to run another thread while this thread waits.spin_unlock()releases the lock by storing0.
Mutexes#
A mutex is a mutual exclusion lock that allows only one owner at a time. Unlike a simple spin lock, a system mutex can put a waiting thread to sleep instead of making it burn CPU cycles.
The pthreads library provides mutexes on POSIX systems:
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);
pthread_mutex_lock(&mutex);
/* critical section */
pthread_mutex_unlock(&mutex);
Windows provides the same idea through mutex handles:
HANDLE mutex = CreateMutex(NULL, FALSE, NULL);
WaitForSingleObject(mutex, INFINITE);
/* critical section */
ReleaseMutex(mutex);
CloseHandle(mutex);
Mutex Example#
The systems-code-examples/mutex example wraps a low-level lock in a
small C++ Mutex class and uses it to protect the shared counter.
1#include "mutex.hh"
2#include <pthread.h>
3#include <stdio.h>
4
5int count;
6Mutex *lock = new Mutex();
7
8void increment( int ntimes )
9{
10 for(int i = 0; i < ntimes; i++)
11 {
12 int c;
13 lock->Lock();
14 c = count;
15 c = c+1;
16 count = c;
17 lock->Unlock();
18 }
19}
20
21int main( int argc, char* argv[])
22{
23 const int n = 100000000;
24
25 pthread_t thread1, thread2;
26 pthread_attr_t threadAttribute;
27
28 pthread_attr_init(&threadAttribute);
29 pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
30
31 printf("starting test. final count should be %d\n", 2*n);
32
33 pthread_create(&thread1, &threadAttribute, (void * (*)(void *))increment, (void *) n);
34 pthread_create(&thread2, &threadAttribute, (void * (*)(void *))increment, (void *) n);
35
36 pthread_join(thread1, NULL);
37 pthread_join(thread2, NULL);
38
39 if( count != 2 * n )
40 {
41 printf("****** Error. Final count is %d\n", count);
42 }
43 else
44 {
45 printf("****** OK. Final count is %d\n", count);
46 }
47
48 return 1;
49}
Key points:
lock->Lock()is called before the shared counter is read.lock->Unlock()is called after the updated value is stored.The critical section is the read, increment, and write of
count.The two worker threads can run concurrently, but only one may update
countat a time.
1#include "mutex.hh"
2#include <pthread.h>
3
4Mutex::Mutex()
5{
6 this->_lock = 0;
7}
8
9Mutex::~Mutex()
10{
11}
12
13void Mutex::Lock()
14{
15 while(test_and_set(&this->_lock) == 1)
16 {
17 pthread_yield();
18 }
19}
20
21unsigned long Mutex::test_and_set(unsigned long *lock)
22{
23 unsigned long newval = 0;
24 asm volatile ("lock; cmpxchgl %2, %0"
25 : "+m" (*lock), "+a" (newval) /* output operands */
26 : "r" (1) /* input operands */
27 : "cc"); /* clobbered registers */
28 return newval;
29}
30
31void Mutex::Unlock()
32{
33 this->_lock = 0;
34}
Key points:
The class stores the lock state in
_lock.Lock()uses the same atomic test-and-set idea as the spin lock.Unlock()releases ownership by setting_lockback to0.This example is useful for seeing how a lock can be wrapped behind a smaller interface.
Semaphores#
A semaphore is a synchronization object with an integer count. A thread decrements the count before proceeding and increments it when it releases or signals the resource.
Binary semaphores have only two useful states and can emulate mutexes.
Counting semaphores allow up to N units of a resource to be used at
the same time. Event semaphores often begin at 0 and are used to
signal that something has happened.
Semaphore Implementation#
A semaphore can be built from a lock that protects the count and operations that adjust the count.
int lock = 0;
int semaphore = 0;
void up() {
spin_lock(&lock);
semaphore += 1;
spin_unlock(&lock);
}
void down() {
spin_lock(&lock);
while(semaphore == 0) {
spin_unlock(&lock);
spin_lock(&lock);
}
semaphore -= 1;
spin_unlock(&lock);
}
This simple version shows the idea but still spins while waiting. System semaphores normally block the waiting thread.
Semaphore Example#
The systems-code-examples/semaphore example uses a semaphore to
coordinate worker threads that consume work from a queue.
1#include "mutex.hh"
2#include "queue.hh"
3#include "semaphore.hh"
4#include "debug.hh"
5#include <pthread.h>
6#include <stdio.h>
7
8const int countPerThread = 10000000;
9int count;
10Mutex *lock = new Mutex();
11Semaphore *semaphore = new Semaphore(0);
12Queue *queue = new Queue();
13
14void increment()
15{
16 bool workLeft = true;
17 while(workLeft)
18 {
19 lock->Lock();
20 if(!queue->HasWorkLeft())
21 {
22 workLeft = false;
23 }
24 else if(semaphore->Down())
25 {
26 int c = queue->Dequeue();
27 count = count + c;
28 }
29 lock->Unlock();
30 }
31}
32
33void populateQueue()
34{
35 for(int i = 0; i < countPerThread*2; i++)
36 {
37 lock->Lock();
38 queue->Enqueue(1);
39 semaphore->Up();
40 lock->Unlock();
41 }
42 lock->Lock();
43 queue->DoneAdding();
44 lock->Unlock();
45}
46
47int main( int argc, char* argv[])
48{
49
50 pthread_t thread1, thread2;
51 pthread_attr_t threadAttribute;
52
53 pthread_attr_init(&threadAttribute);
54 pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
55
56 printf("starting test. final count should be %d\n", 2*countPerThread);
57
58 pthread_create(&thread1, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
59 pthread_create(&thread2, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
60
61 populateQueue();
62
63 pthread_join(thread1, NULL);
64 pthread_join(thread2, NULL);
65
66 if( count != 2 * countPerThread )
67 {
68 printf("****** Error. Final count is %d\n", count);
69 }
70 else
71 {
72 printf("****** OK. Final count is %d\n", count);
73 }
74
75 return 1;
76}
Key points:
populateQueue()adds work items and callssemaphore->Up()for each new item.Worker threads call
semaphore->Down()before removing an item from the queue.The mutex protects the queue while a thread checks for work, removes work, or marks the queue as done.
The final count should equal the number of queued work items.
1#include "semaphore.hh"
2#include "debug.hh"
3#include <stdio.h>
4#include <pthread.h>
5
6Semaphore::Semaphore(int initialValue)
7{
8 _lock = new Mutex();
9 _count = initialValue;
10}
11
12Semaphore::~Semaphore()
13{
14 delete _lock;
15}
16
17void Semaphore::Up()
18{
19 _lock->Lock();
20 _count += 1;
21 _lock->Unlock();
22}
23
24bool Semaphore::Down()
25{
26 bool success = false;
27 _lock->Lock();
28 if(_count > 0)
29 {
30 _count -= 1;
31 success = true;
32 }
33 _lock->Unlock();
34 return success;
35}
Key points:
Semaphorestores the count in_count.Up()increments the count while holding the internal mutex.Down()decrements the count only when a unit is available.This version returns
falseinstead of blocking when the count is zero, so the caller remains responsible for retry behavior.
Producer-Consumer with Semaphores#
The producer-consumer problem uses one or more producers to place items in a bounded queue and one or more consumers to remove them.
Queue *queue = new Queue(5);
Semaphore *mutex = new Semaphore(1);
Semaphore *empty = new Semaphore(5);
Semaphore *full = new Semaphore(0);
void producer() {
while(1) {
empty->down();
mutex->down();
int item = produce_item();
queue->Enqueue(item);
mutex->up();
full->up();
}
}
void consumer() {
while(1) {
full->down();
mutex->down();
int item = queue->Dequeue();
consume_item(item);
mutex->up();
empty->up();
}
}
The empty semaphore counts open slots. The full semaphore counts
available items. The mutex semaphore protects the queue itself.
Bounded Buffer Example#
The systems-code-examples/bounded-buffer example implements the
producer-consumer pattern with pthread mutexes and condition variables.
1#include <pthread.h>
2#include <stdio.h>
3#include <stdlib.h>
4#include <unistd.h>
5#include <getopt.h>
6
7#include "bboptions.h"
8#include "bbuffer.h"
9
10#include "millisleep.h"
11
12
13
14/*
15 * bounded buffer example where there can be any number of suppliers and consumers
16 *
17 * in this design:
18 * - each supplier and consumer runs as a separate thread
19 * thread-specific data allows each to share the bound_buffer_t* instance and command-line options/config
20 * each supplier and consumer knows its own id.
21 * - suppliers each generate options->gen_count values
22 * consumers consume only (options->gen_count * options->no_suppliers) / options->no_consumers entries
23 * some entries may NOT be consumed (ok under most circumstances)
24 * - because consumers will stop after they have gotten their share of values, others are allowed to consume their fair share
25 */
26
27typedef struct
28{
29 bb_options_t* options;
30 bounded_buffer_t* bb;
31 int id;
32} bb_tsd_t;
33
34void* supplier(void *tsd)
35{
36 bb_tsd_t* bb_tsd = (bb_tsd_t*) tsd;
37 bounded_buffer_t* bb = bb_tsd->bb;
38 bb_options_t* options = bb_tsd->options;
39 int my_id = bb_tsd->id;
40
41 lwlog_info("supplier { id: %d, state : \"running\", gen: %d }\n", my_id, options->gen_count);
42 for (int i=0; i < options->gen_count; i++)
43 {
44 millisecond_sleep( rand() % options->supplier_max_delay_ms);
45 entry_t* new_entry = (entry_t*) malloc(sizeof(entry_t));
46 new_entry->value = my_id * options->gen_count + i;
47 bounded_buffer_put(bb, new_entry);
48 lwlog_info("supplier { id: %d, entry: %d }\n", my_id, new_entry->value);
49 bounded_buffer_print_info(bb);
50 }
51 lwlog_info("supplier { id: %d, state: \"exit\" }\n", my_id);
52 pthread_exit(NULL);
53}
54
55
56/* note: consumer only consumesm options->no_suppliers * options->gen_count / options->no_consumers entries (some may not be consumed, which is ok) */
57
58void* consumer(void *tsd)
59{
60 bb_tsd_t* bb_tsd = (bb_tsd_t*) tsd;
61 bounded_buffer_t* bb = bb_tsd->bb;
62 bb_options_t* options = bb_tsd->options;
63 int my_id = bb_tsd->id;
64 int max_to_consume = options->no_suppliers * options->gen_count / options->no_consumers;
65 int not_consumed = options->no_suppliers * options->gen_count % options->no_consumers;
66 if (my_id < not_consumed)
67 max_to_consume++;
68 lwlog_info("consumer { id: %d, state: \"running\", messages: %d }\n", my_id, max_to_consume);
69 for (int i=0; i < max_to_consume; i++)
70 {
71 entry_t* entry = bounded_buffer_get(bb);
72 millisecond_sleep( rand() % options->consumer_max_delay_ms);
73 lwlog_info("consumer { id: %d, entry: %d }\n", my_id, entry->value);
74 bounded_buffer_print_info(bb);
75 free(entry);
76 }
77 lwlog_info("consumer { id: %d, state: \"exit\" }\n", my_id);
78 pthread_exit(NULL);
79}
80
81int main (int argc, char *argv[])
82{
83 pthread_attr_t attr;
84 bounded_buffer_t bb;
85 bb_options_t options;
86
87 bb_options_get(&options, argc, argv);
88 int no_threads = options.no_suppliers + options.no_consumers;
89 pthread_t* threads = (pthread_t*) malloc(no_threads * sizeof(pthread_t));
90 bb_tsd_t* bb_tsd = (bb_tsd_t*) malloc(no_threads * sizeof(bb_tsd_t));
91
92 bb_options_print(&options);
93
94 bounded_buffer_init(&bb, options.bsize);
95
96 pthread_attr_init(&attr);
97 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
98
99 /* create supplier threads */
100 for (int i=0; i < options.no_suppliers; i++)
101 {
102 int thread_number = i;
103 bb_tsd[thread_number] = (bb_tsd_t)
104 {
105 .options = &options, .bb = &bb, .id = i
106 };
107 lwlog_info("pthread_create ( thread: %d, type: \"supplier\", id: %d }\n", thread_number, i);
108 pthread_create(&threads[thread_number], &attr, supplier, (void *)&bb_tsd[thread_number]);
109 }
110
111 /* create supplier threads */
112 for (int i=0; i < options.no_consumers; i++)
113 {
114 int thread_number = i+options.no_suppliers;
115 bb_tsd[thread_number] = (bb_tsd_t)
116 {
117 .options = &options, .bb = &bb, .id = i
118 };
119 lwlog_info("pthread_create ( thread: %d, type: \"consumer\", id: %d }\n", thread_number, i);
120 pthread_create(&threads[thread_number], &attr, consumer, (void *)&bb_tsd[thread_number]);
121 }
122
123 lwlog_info("main { threads: %d, state: \"started\" }\n", no_threads);
124
125 for (int i=0; i < no_threads; i++)
126 {
127 pthread_join(threads[i], NULL);
128 }
129
130 lwlog_info("main { threads: %d, state: \"joined\" }\n", no_threads);
131
132 lwlog_info("main { extra_entires: %d }\n", bounded_buffer_count(&bb));
133
134 int bb_size = bounded_buffer_count(&bb);
135 for (int i=0; i < bb_size; i++)
136 {
137 entry_t* e = bounded_buffer_get(&bb);
138 lwlog_info("main { removed: %d }\n", e->value);
139 free(e);
140 }
141 pthread_attr_destroy(&attr);
142 bounded_buffer_cleanup(&bb);
143 free(threads);
144 free(bb_tsd);
145
146 pthread_exit(NULL);
147}
148
Key points:
The program creates supplier threads and consumer threads.
Each thread receives thread-specific data containing the shared bounded buffer, command-line options, and its own id.
Suppliers allocate entries and place them into the bounded buffer.
Consumers remove entries, process them, and free their memory.
The main thread joins all worker threads before cleaning up the buffer.
1#include "bbuffer.h"
2
3#include <stdlib.h>
4
5void bounded_buffer_init(bounded_buffer_t* bb, int size)
6{
7 bb->entries = (entry_t**) malloc(size * sizeof(entry_t*));
8 for (int i=0; i < size; i++)
9 bb->entries[i] = NULL;
10 bb->size = size;
11 bb->head = bb->tail = 0;
12 pthread_mutex_init(&bb->lock, NULL);
13 pthread_cond_init (&bb->has_space, NULL);
14 pthread_cond_init (&bb->has_items, NULL);
15}
16
17void bounded_buffer_put(bounded_buffer_t* bb, entry_t* item)
18{
19 pthread_mutex_lock(&bb->lock);
20
21 while (bb->tail - bb->head >= bb->size)
22 {
23 pthread_cond_wait(&bb->has_space, &bb->lock);
24 }
25
26 bb->entries[bb->tail++ % bb->size] = item;
27 pthread_cond_signal(&bb->has_items);
28
29 pthread_mutex_unlock(&bb->lock);
30}
31
32entry_t* bounded_buffer_get(bounded_buffer_t* bb)
33{
34 pthread_mutex_lock(&bb->lock);
35
36 while (bb->tail == bb->head)
37 {
38 pthread_cond_wait(&bb->has_items, &bb->lock);
39 }
40
41 entry_t* entry = bb->entries[bb->head % bb->size];
42 bb->entries[bb->head++ % bb->size] = NULL;
43 pthread_cond_signal(&bb->has_space);
44
45 pthread_mutex_unlock(&bb->lock);
46 return entry;
47}
48
49int bounded_buffer_count(bounded_buffer_t* bb)
50{
51 return bb->tail - bb->head;
52}
53
54void bounded_buffer_cleanup(bounded_buffer_t* bb)
55{
56 int unfreed_count = 0;
57 for (int i=0; i < bb->size; i++)
58 if (bb->entries[i] != NULL)
59 unfreed_count++;
60 if (unfreed_count > 0)
61 {
62 lwlog_info("Warning: %d entries in bounded buffer not freed\n", unfreed_count);
63 }
64 free(bb->entries);
65 pthread_mutex_destroy(&bb->lock);
66 pthread_cond_destroy(&bb->has_space);
67 pthread_cond_destroy(&bb->has_items);
68}
69
70void bounded_buffer_print_info(bounded_buffer_t* bb)
71{
72 printf("buffer { size: %d, length: %d, head: %d, tail: %d, ", bb->size, bb->tail - bb->head, bb->head, bb->tail);
73 printf("entries : [");
74 int add_comma = 0;
75 for (int i=bb->head; i < bb->tail; i++)
76 {
77 if (add_comma)
78 {
79 printf(", %d", bb->entries[i % bb->size]->value);
80 }
81 else
82 {
83 printf("%d", bb->entries[i % bb->size]->value);
84 }
85 add_comma = 1;
86 }
87 printf("]");
88 printf(" }\n");
89}
Key points:
bounded_buffer_init()initializes the circular buffer, mutex, and two condition variables.bounded_buffer_put()waits while the buffer is full.bounded_buffer_get()waits while the buffer is empty.has_spacewakes a producer when a consumer removes an item.has_itemswakes a consumer when a producer adds an item.
Reader-Writer Locks#
A reader-writer lock allows many readers to hold the lock at the same time but gives writers exclusive access.
Reader-writer locks can improve concurrency when reads are common and writes are less frequent. Their implementation is more complex than a plain mutex because the lock must track active readers, active writers, waiting readers, and waiting writers.
Semaphore *read = new Semaphore(0);
Semaphore *write = new Semaphore(0);
Semaphore *lock = new Semaphore(1);
int readWait = 0, writeWait = 0;
int activeRead = 0, activeWrite = 0;
void StartWrite() {
lock->down();
if(activeWrite+activeRead+writeWait == 0) {
write->up();
activeWrite += 1;
} else {
writeWait += 1;
}
lock->up();
write->down();
}
void EndWrite() {
lock->down();
activeWrite -= 1;
if(writeWait > 0) {
write->up();
activeWrite += 1;
writeWait -= 1;
} else {
while(readWait>0) {
read->up();
activeRead += 1;
readWait -= 1;
}
}
lock->up();
}
Reader-writer locks force design choices. A lock may give priority to readers, priority to writers, or attempt to balance the two. Each choice can create different starvation and deadlock risks.
POSIX Semaphores#
POSIX semaphores provide a system interface for counting semaphores.
The basic initialization call is:
int sem_init(sem_t *sem, int pshared, unsigned int value);
The sem argument names the semaphore object. The pshared argument
controls whether the semaphore is shared only among threads in one
process or among processes through shared memory. The value argument
sets the initial count.
sem_t sema;
sem_init(&sema, 0, 1);
sem_wait(&sema);
/* critical section */
sem_post(&sema);
Windows Semaphores#
Windows also provides semaphore objects through the Win32 API.
const int minCount = 0;
const int maxCount = 20;
HANDLE sema = CreateSemaphore(NULL, minCount, maxCount, NULL);
WaitForSingleObject(sema, INFINITE);
/* critical section */
ReleaseSemaphore(sema, 1, NULL);
Named semaphores can be shared by separate processes. A process creates
or opens a named semaphore and then waits on it with
WaitForSingleObject().
Monitors and Condition Variables#
A monitor combines mutual exclusion with a way for threads to wait until a condition becomes true.
Monitor-style synchronization usually has four operations:
enterlocks the monitor.exitunlocks the monitor.waitreleases the lock and blocks until another thread signals.pulseorsignalwakes a waiting thread.
The usual pattern is:
enter();
while(condition_to_proceed_is_not_true) {
wait();
}
do_operation();
if(condition_has_changed) {
pulse();
}
exit();
The condition is checked in a loop because a thread may wake up before the condition it needs is actually true.
Monitor Example#
The systems-code-examples/monitor example rewrites the queue
coordination problem with a monitor abstraction.
1#include "queue.hh"
2#include "monitor.hh"
3#include "debug.hh"
4#include <pthread.h>
5#include <stdio.h>
6
7const int countPerThread = 10000000;
8int count;
9Monitor *monitor = new Monitor();
10Queue *queue = new Queue();
11
12void increment()
13{
14 monitor->Enter();
15 while(queue->HasWorkLeft())
16 {
17 while(queue->IsEmpty() && queue->HasWorkLeft())
18 {
19 monitor->Wait();
20 }
21 if(!queue->IsEmpty())
22 {
23 int c = queue->Dequeue();
24 count = count + c;
25 }
26 }
27 monitor->Exit();
28}
29
30void populateQueue()
31{
32 for(int i = 0; i < countPerThread*2; i++)
33 {
34 monitor->Enter();
35 queue->Enqueue(1);
36 monitor->Pulse();
37 monitor->Exit();
38 }
39 monitor->Enter();
40 queue->DoneAdding();
41 monitor->Exit();
42}
43
44int main( int argc, char* argv[])
45{
46
47 pthread_t thread1, thread2;
48 pthread_attr_t threadAttribute;
49
50 pthread_attr_init(&threadAttribute);
51 pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
52
53 printf("starting test. final count should be %d\n", 2*countPerThread);
54
55 pthread_create(&thread1, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
56 pthread_create(&thread2, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
57
58 populateQueue();
59
60 pthread_join(thread1, NULL);
61 pthread_join(thread2, NULL);
62
63 if( count != 2 * countPerThread )
64 {
65 printf("****** Error. Final count is %d\n", count);
66 }
67 else
68 {
69 printf("****** OK. Final count is %d\n", count);
70 }
71
72 return 1;
73}
Key points:
Worker threads enter the monitor before inspecting the shared queue.
A worker waits while the queue is empty and more work may still arrive.
populateQueue()pulses the monitor after it enqueues a new item.DoneAdding()lets waiting workers eventually stop waiting for more work.The monitor keeps the waiting condition close to the protected queue operations.
1#include "monitor.hh"
2#include <stdio.h>
3#include <pthread.h>
4
5Monitor::Monitor()
6{
7 _lock = new Mutex();
8 _locked = false;
9 _pulse = 0;
10}
11
12Monitor::~Monitor()
13{
14 delete _lock;
15}
16
17void Monitor::Enter()
18{
19 _lock->Lock();
20 _locked = true;
21}
22
23void Monitor::Exit()
24{
25 _locked = false;
26 _lock->Unlock();
27}
28
29void Monitor::Wait()
30{
31 if(!_locked)
32 {
33 fprintf(stderr, "wait called wait without a lock\n");
34 }
35 while(_pulse == 0)
36 {
37 Exit();
38 pthread_yield();
39 Enter();
40 }
41 _pulse -= 1;
42}
43
44void Monitor::Pulse()
45{
46 if(!_locked)
47 {
48 fprintf(stderr, "pulse called without a lock\n");
49 }
50 _pulse += 1;
51}
Key points:
Enter()obtains the internal mutex and marks the monitor locked.Exit()releases the internal mutex.Wait()repeatedly releases and reacquires the monitor while waiting for a pulse.Pulse()records that one waiter may proceed.This implementation is intentionally small and shows the idea rather than a production condition-variable implementation.
Pthreads Condition Variables#
Pthreads condition variables implement the monitor pattern with an explicit mutex and condition variable.
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_mutex_init(&mutex);
pthread_cond_init(&cond);
pthread_mutex_lock(&mutex);
while(/* condition is not true */) {
pthread_cond_wait(&cond, &mutex);
}
/* critical section */
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
pthread_cond_wait() releases the mutex while the thread waits and
reacquires it before returning. This lets another thread enter the
critical section and change the condition.
Windows Condition Variables#
Windows condition variables provide the same basic pattern as pthread condition variables.
CONDITION_VARIABLE cond;
CRITICAL_SECTION lock;
InitializeConditionVariable(&cond);
InitializeCriticalSection(&lock);
EnterCriticalSection(&lock);
while(/* condition is not true */) {
SleepConditionVariableCS(&cond, &lock, INFINITE);
}
/* critical section */
WakeConditionVariable(&cond);
LeaveCriticalSection(&lock);
The condition variable works with a CRITICAL_SECTION so the waiting
thread can atomically release the lock and block.
C# Bounded Buffer#
The systems-code-examples/threads_csharp directory includes a C#
bounded buffer implemented with .NET semaphores.
1 public class BoundBuffer<T> {
2 private readonly Semaphore _full;
3 private readonly Semaphore _empty;
4 private readonly Semaphore _lock;
5 private readonly Queue<T> _queue;
6
7 public BoundBuffer (int maxCount) {
8 _empty = new Semaphore (maxCount, maxCount);
9 _full = new Semaphore (0, maxCount);
10 _lock = new Semaphore (1, 1);
11 _queue = new Queue<T> ();
12 }
13
14 public void Enqueue(T item) {
15 _empty.WaitOne ();
16 _lock.WaitOne ();
17 _queue.Enqueue (item);
18 _lock.Release (1);
19 _full.Release (1);
20 }
21
22 public T Dequeue() {
23 _full.WaitOne ();
24 _lock.WaitOne ();
25 var item = _queue.Dequeue ();
26 _lock.Release(1);
27 _empty.Release(1);
28 return item;
29 }
30 }
Key points:
_emptycounts open slots in the buffer._fullcounts items available to consumers._lockis a binary semaphore that protects the queue.Enqueue()waits for space before adding an item.Dequeue()waits for an item before removing one.
C# Monitor Pipeline#
The C# pipeline examples use lock, Monitor.Wait(), and
Monitor.Pulse() to coordinate staged work.
1 public class PipelineComputation {
2 private readonly Queue<byte[]> _readData;
3 private readonly Queue<byte[]> _compressionData;
4 private volatile bool _reading = true;
5 private volatile bool _compressing = true;
6
7 public PipelineComputation () {
8 _readData = new Queue<byte[]>();
9 _compressionData = new Queue<byte[]>();
10 }
11
12 public void PerformCompression() {
13 var readerThread = new Thread (FileReader);
14 var compressThread = new Thread (Compression);
15 var writerThread = new Thread (FileWriter);
16 readerThread.Start ();
17 compressThread.Start ();
18 writerThread.Start ();
19 readerThread.Join ();
20 compressThread.Join ();
21 writerThread.Join ();
22 }
23
24 private void FileReader() {
25 using (var stream = new FileStream("file.txt", FileMode.Open, FileAccess.Read)) {
26 int len;
27 var buffer = new byte[1024];
28 while ((len = stream.Read(buffer, 0, buffer.Length)) > 0) {
29 if (len != buffer.Length) {
30 Array.Resize (ref buffer, len);
31 }
32 lock (_readData) {
33 while (_readData.Count > 10) {
34 Monitor.Wait (_readData);
35 }
36 _readData.Enqueue(buffer);
37 Monitor.Pulse (_readData);
38 }
39 }
40 }
41 _reading = false;
42 }
43
44 private void Compression() {
45 var workLeft = false;
46 while (_reading || workLeft) {
47 workLeft = false;
48 byte[] dataToCompress = null;
49 lock (_readData) {
50 while (_reading && _readData.Count == 0) {
51 Monitor.Wait (_readData, 100);
52 }
53 workLeft = _readData.Count > 1;
54 if (_readData.Count > 0) {
55 dataToCompress = _readData.Dequeue ();
56 }
57 }
58 if (dataToCompress != null) {
59 var compressed = Compress(dataToCompress);
60 lock (_compressionData) {
61 while (_compressionData.Count > 10) {
62 Monitor.Wait (_compressionData, 100);
63 }
64 _compressionData.Enqueue (compressed);
65 Monitor.Pulse (_compressionData);
66 }
67 }
68 }
69 _compressing = false;
70 }
71
72 private static byte[] Compress(byte[] data) {
73 var memStream = new MemoryStream ();
74 using(var compressionStream = new GZipStream(memStream, CompressionMode.Compress)) {
75 compressionStream.Write(data, 0, data.Length);
76 }
77 return memStream.ToArray ();
78 }
79
80 private void FileWriter() {
81 using (var stream = new FileStream("file.gz", FileMode.OpenOrCreate, FileAccess.Write)) {
82 var workLeft = false;
83 while (_compressing || workLeft) {
84 workLeft = false;
85 byte[] compressedData = null;
86 lock (_compressionData) {
87 while (_compressionData.Count == 0 && _compressing) {
88 Monitor.Wait (_compressionData, 100);
89 }
90 workLeft = _compressionData.Count > 1;
91 if (_compressionData.Count > 0) {
92 compressedData = _compressionData.Dequeue ();
93 }
94 }
95 if (compressedData != null) {
96 stream.Write (compressedData, 0, compressedData.Length);
97 }
98 }
99 }
100 }
101 }
Key points:
The reader, compressor, and writer run on separate threads.
Each stage communicates through a queue protected by
lock.A stage waits when its input queue is empty or its output queue is too large.
Monitor.Pulse()wakes a later stage when new work is available.The
volatileflags communicate when earlier stages have finished.
C# Threaded Enumeration#
The ThreadedList helper turns an ordinary enumerable into a
producer-consumer pipeline.
1 public class ThreadedList<T> : IEnumerable<T>
2 {
3 private readonly IEnumerable<T> _list;
4
5 public ThreadedList (IEnumerable<T> list){
6 _list = list;
7 }
8
9 public IEnumerator<T> GetEnumerator ()
10 {
11 return new ThreadedEnumerator<T>(_list.GetEnumerator ());
12 }
13
14 System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator ()
15 {
16 return GetEnumerator ();
17 }
18
19 private class ThreadedEnumerator<S> : IEnumerator<S> {
20
21 private readonly IEnumerator<S> _enumerator;
22 private readonly Queue<S> _queue;
23 private const int _maxQueueSize = 10;
24 private readonly Thread _thread;
25 private volatile bool _keepGoing = true;
26 private volatile bool _finishedEnumerating = false;
27 private S _current;
28
29 public ThreadedEnumerator(IEnumerator<S> enumerator) {
30 _enumerator = enumerator;
31 _thread = new Thread(Enumerate);
32 _thread.Start();
33 }
34
35 private void Enumerate() {
36 while (_keepGoing) {
37 if (_enumerator.MoveNext ()) {
38 var current = _enumerator.Current;
39 lock (_queue) {
40 while (_queue.Count > _maxQueueSize && _keepGoing) {
41 Monitor.Wait (_queue, 100);
42 }
43 if (_keepGoing) {
44 _queue.Enqueue (current);
45 Monitor.Pulse (_queue);
46 }
47 }
48 } else {
49 break;
50 }
51 }
52 _finishedEnumerating = true;
53 }
54
55 public bool MoveNext ()
56 {
57 lock (_queue) {
58 while (!_finishedEnumerating && _queue.Count == 0) {
59 Monitor.Wait (_queue, 100);
60 }
61 if (_queue.Count > 0) {
62 _current = _queue.Dequeue ();
63 Monitor.Pulse (_queue);
64 return true;
65 } else {
66 _current = default(S);
67 return false;
68 }
69 }
70 }
71
72 public void Reset () {
73 lock (_queue) {
74 lock (_enumerator) {
75 _enumerator.Reset ();
76 _queue.Clear ();
77 }
78 }
79 }
80
81 object System.Collections.IEnumerator.Current {
82 get { return _current; }
83 }
84
85 public void Dispose () {
86 _keepGoing = false;
87 _thread.Join ();
88 }
89
90 public S Current {
91 get { return _current; }
92 }
93 }
94 }
Key points:
A background thread advances the wrapped enumerator.
Produced values are placed in a bounded queue.
MoveNext()waits until an item is available or enumeration is finished.Monitor.Wait()andMonitor.Pulse()coordinate the producer and consumer.Dispose()stops the worker thread and joins it.
Concise Pipeline with ThreadedList#
The concise pipeline example shows how the synchronization details can be
hidden behind ThreadedList.
1 public class ConcisePipelineComputation
2 {
3 public ConcisePipelineComputation () {
4 }
5
6 public void PerformCompression() {
7 var fileBlocks = new ThreadedList<byte[]>(FileReader());
8 var compressedBlocks = new ThreadedList<byte[]> (Compression(fileBlocks));
9 FileWriter (compressedBlocks);
10 }
11
12 private IEnumerable<byte[]> FileReader() {
13 using (var stream = new FileStream("file.txt", FileMode.Open, FileAccess.Read)) {
14 int len;
15 var buffer = new byte[1024];
16 while ((len = stream.Read(buffer, 0, buffer.Length)) > 0) {
17 if (len != buffer.Length) {
18 Array.Resize (ref buffer, len);
19 }
20 yield return buffer;
21 }
22 }
23 }
24
25 private IEnumerable<byte[]> Compression(IEnumerable<byte[]> readBuffer) {
26 foreach (var buffer in readBuffer) {
27 yield return Compress (buffer);
28 }
29 }
30
31 private static byte[] Compress(byte[] data) {
32 var memStream = new MemoryStream ();
33 using(var compressionStream = new GZipStream(memStream, CompressionMode.Compress)) {
34 compressionStream.Write(data, 0, data.Length);
35 }
36 return memStream.ToArray ();
37 }
38
39 private void FileWriter(IEnumerable<byte[]> compressedBuffer) {
40 using (var stream = new FileStream("file.gz", FileMode.OpenOrCreate, FileAccess.Write)) {
41 foreach (var buffer in compressedBuffer) {
42 stream.Write (buffer, 0, buffer.Length);
43 }
44 }
45 }
46 }
Key points:
FileReader()yields byte arrays from the input file.Compression()consumes blocks and yields compressed blocks.FileWriter()consumes the compressed stream.ThreadedListlets adjacent stages overlap execution.The pipeline reads like ordinary iterator code while still using threaded buffering internally.
Dining Philosophers#
The dining philosophers problem is a classic example of lock ordering, resource contention, and deadlock.
The complete systems-code-examples/dining-philosophers example fits
best in the deadlock chapter. It will be used there as the main example
for circular wait and deadlock avoidance.