Mutual Exclusion#

Mutual exclusion is the requirement that only one thread or process at a time may execute a particular part of a program. We need mutual exclusion when shared state can be read and written by more than one execution context.

Critical Sections#

A critical section is a region of code that accesses shared state and must not be executed concurrently unless the program has been designed to permit that concurrency.

int count = 0;

void increment(int n) {
   for(int i = 0; i < n; i++) {
      int x = count;
      x = x + 1;
      count = x;
   }
}

int main(int argc, char* argv[]) {
   pthread_t t1, t2;
   ....
   pthread_create(&t1, ..., increment)
   pthread_create(&t2, ..., increment)
}

If two threads call increment() at the same time, both threads read and write the shared variable count. The expected final value depends on how many increments are requested, but the actual value can be lower if the operations interleave.

Identifying the Critical Section#

The critical section in increment() is the sequence that reads, modifies, and writes count.

int x = count;
x = x + 1;
count = x;

The program only behaves like a serial program if this sequence is treated as one indivisible operation. If two threads enter this section together, each thread can observe a stale value and overwrite the other thread’s update.

Execution Orders and Atomicity#

Atomicity means that an operation behaves as though it happens all at once. Most source-level statements are not atomic at the machine level.

int x = count;
x = x + 1;
count = x;

One possible pseudo-assembly expansion is:

load &count, r0
set r0, r1
add 1, r1
set r1, r0
store &count, r0

The compiler and CPU are not required to preserve the simple mental model of one C statement becoming one machine instruction. Values may remain in registers longer than expected, instructions may be reordered within the rules of the language and architecture, and a store may occur later than the programmer assumes.

Interleavings#

An interleaving is one possible ordering of instructions from two or more threads. Some interleavings are equivalent to serial execution, and some are not.

t0: load &count, r0
t0: set r0, r1
t1: load &count, r0
t0: add 1, r1
t1: set r0, r1
t0: set r1, r0
t0: store &count, r0
t1: add 1, r1
t1: set r1, r0
t1: store &count, r0

In this interleaving, both threads can read the same old value. Each thread computes the same new value and stores it, so the final result is 1 instead of 2. A correct locking design eliminates the interleavings that are not equivalent to some serial execution.

Characteristics of a Good Locking Solution#

A good locking solution protects correctness without adding unnecessary assumptions about scheduling, CPU speed, or the number of processors.

The main requirements are:

  • No two threads may be inside the same protected critical section at the same time.

  • A thread outside the critical section should not prevent another thread from entering it.

  • The solution should not depend on relative thread speeds.

  • A thread should not have to wait forever to enter the critical section.

Achieving Atomicity and Serializability#

Serializability means that concurrent execution produces a result that is equivalent to some serial ordering of the same operations.

There are two common models:

  • Pessimistic locking obtains the necessary locks before entering the critical section, performs the operation, and then releases the locks.

  • Optimistic locking performs the operation against a snapshot or log and commits the result only if no conflicting operation occurred.

Most operating systems material in this chapter uses pessimistic locking. Optimistic techniques are important in databases, transactional memory, and some high-concurrency data structures.

Types of Pessimistic Locks#

Pessimistic locking assumes that a conflict may happen and prevents the conflict before it can corrupt shared state.

Common techniques include disabling interrupts, strict alternation, spin locks, mutexes, semaphores, condition variables, monitors, and reader-writer locks.

Types of Optimistic Locks#

Optimistic locking assumes that conflicts are uncommon and checks for a conflict before committing the result.

Most non-database implementations use some kind of software transactional memory or compare-and-swap based retry loop. This approach can work well when conflicts are rare, communication is expensive, or the protected data structure is sparse.

Software Transactional Memory Example#

Software transactional memory lets code make tentative changes and then commit them only if the original data has not changed.

The systems-code-examples/stm example implements a small optimistic transaction around a shared counter.

 1#include "trans.hh"
 2#include <pthread.h>
 3#include <stdio.h>
 4
 5const int countPerThread = 10000000;
 6volatile int count = 0;
 7Transaction *trans = new Transaction(&count);
 8
 9void increment()
10{
11    const int chunkSize = 500;
12    for(int i = 0; i < countPerThread; i += chunkSize)
13    {
14        MemHandle* handle;
15        do
16        {
17            handle = trans->Begin();
18            for(int j = 0; j < chunkSize; j++)
19            {
20                handle->Value += 1;
21            }
22        }
23        while(!trans->TryCommit(handle));
24    }
25}
26
27int main( int argc, char* argv[])
28{
29
30    pthread_t thread1, thread2;
31    pthread_attr_t threadAttribute;
32
33    pthread_attr_init(&threadAttribute);
34    pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
35
36    printf("starting test. final count should be %d\n", 2*countPerThread);
37
38    pthread_create(&thread1, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
39    pthread_create(&thread2, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
40
41    pthread_join(thread1, NULL);
42    pthread_join(thread2, NULL);
43
44    if( count != 2 * countPerThread )
45    {
46        printf("****** Error. Final count is %d\n", count);
47    }
48    else
49    {
50        printf("****** OK. Final count is %d\n", count);
51    }
52
53    printf("rollback count = %d\n", trans->GetRollbackCount());
54
55    return 1;
56}

Key points:

  • Each thread increments the counter in chunks.

  • Begin() returns a handle containing a snapshot of the current value.

  • The thread updates the handle instead of updating the shared counter directly.

  • TryCommit() commits the new value only if no other transaction has committed since the snapshot was taken.

  • A failed commit causes the loop to retry.

 1#include "trans.hh"
 2#include <stdio.h>
 3
 4MemHandle::MemHandle(int value, int event)
 5{
 6    Value = value;
 7    _event = event;
 8}
 9
10int MemHandle::getEvent()
11{
12    return _event;
13}
14
15Transaction::Transaction(volatile int *value)
16{
17    _value = value;
18    _event = 0;
19    _rollbacks = 0;
20    _lock = new Mutex();
21}
22
23Transaction::~Transaction()
24{
25    delete _lock;
26}
27
28int Transaction::GetRollbackCount()
29{
30    return _rollbacks;
31}
32
33MemHandle* Transaction::Begin()
34{
35    MemHandle* handle;
36    _lock->Lock();
37    handle = new MemHandle(*_value, _event);
38    _lock->Unlock();
39    return handle;
40}
41
42bool Transaction::TryCommit(MemHandle* value)
43{
44    bool success = false;
45    _lock->Lock();
46    if(_event == value->getEvent())
47    {
48        *_value = value->Value;
49        _event += 1;
50        success = true;
51    }
52    else
53    {
54        _rollbacks += 1;
55    }
56    _lock->Unlock();
57    delete value;
58    return success;
59}

Key points:

  • _event is the version number for the shared value.

  • Begin() copies the current value and event while holding the internal mutex.

  • TryCommit() compares the saved event with the current event.

  • A successful commit writes the new value and advances the event.

  • A failed commit increments the rollback count.

Disabling Interrupts#

Disabling interrupts prevents timer interrupts from invoking the scheduler on the current CPU. This can make a short kernel critical section atomic on that CPU.

This technique is not suitable for ordinary application code. Disabling interrupts is privileged, can hurt resource utilization, and can stop the system if interrupts are not restored. On multiprocessor systems it also does not automatically protect the same data on other CPUs.

Strict Alternation#

Strict alternation lets threads take turns entering a critical section, usually through a shared turn variable.

while(TRUE) {
   while(turn != 0) {}
   critical_section();
   turn = 1;
   non_critical_section();
}
while(TRUE) {
   while(turn != 1) {}
   critical_section();
   turn = 0;
   non_critical_section();
}

Strict alternation is usually a poor solution. A slow thread in its non-critical section can prevent another thread from entering the critical section even when the critical section is free.

Spin Locks#

A spin lock is a lock where a waiting thread repeatedly checks whether the lock is available. The waiting thread stays runnable while it waits.

int lock = 0;

void increment(int n) {
   for(int i = 0; i < n; i++) {
      spin_lock(&lock);
      int x = count;
      x = x + 1;
      count = x;
      spin_unlock(&lock);
   }
}

The lock and unlock calls make the update to count behave as a single protected operation. Spin locks are useful for very short waits, especially inside kernels, but they waste CPU time if the wait is long.

Spin Lock Implementation#

A spin lock needs an atomic operation that tests the old value and sets a new value as one indivisible operation.

spin_lock(int* lock) {
   while(*lock == 1) { }
   *lock = 1;
}

spin_unlock(int* lock) {
   *lock = 0;
}

This pseudo-code is not correct. Two threads can both observe lock as 0 and then both set it to 1. The test and the store must be one atomic operation.

spin_lock(int* lock) {
   while(test_and_set(lock) == 1) {}
}

On x86 systems, this kind of operation can be implemented with an atomic instruction such as xchg or cmpxchg.

Spin Lock Example#

The systems-code-examples/spin_lock example demonstrates the race in the increment program and provides a spin lock implementation.

 1#include "lock.h"
 2#include <unistd.h>
 3#include <pthread.h>
 4#include <stdio.h>
 5
 6long int count;
 7unsigned long lock = 0;
 8
 9void* increment( void* vp_ntimes )
10{
11    long ntimes = (long) vp_ntimes;
12    for(long i = 0; i < ntimes; i++)
13    {
14        int c;
15        //spin_lock(&lock);
16        c = count;
17        c = c+1;
18        count = c;
19        //spin_unlock(&lock);
20    }
21    return NULL;
22}
23
24int main( int argc, char* argv[])
25{
26    const long int n = 100000000;
27
28    pthread_t thread1, thread2;
29    pthread_attr_t threadAttribute;
30
31    pthread_attr_init(&threadAttribute);
32    pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
33
34    printf("starting test. final count should be %ld\n", 2*n);
35
36    pthread_create(&thread1, &threadAttribute, increment, (void *) n);
37    pthread_create(&thread2, &threadAttribute, increment, (void *) n);
38
39    pthread_join(thread1, NULL);
40    pthread_join(thread2, NULL);
41
42    if( count != 2 * n )
43    {
44        printf("****** Error. Final count is %ld\n", count);
45    }
46    else
47    {
48        printf("****** OK. Final count is %ld\n", count);
49    }
50
51    return 1;
52}
53
54

Key points:

  • Two pthreads both call increment() and update the shared global count.

  • The program checks whether the final value is 2 * n.

  • The calls to spin_lock() and spin_unlock() are present but commented out, which leaves the race visible.

  • Uncommenting the lock calls protects the read-modify-write sequence.

 1#include "lock.h"
 2#define _GNU_SOURCE
 3#include <pthread.h>
 4
 5unsigned long test_and_set(unsigned long *lock)
 6{
 7    unsigned long newval = 0;
 8    asm volatile ("lock; cmpxchgl %2, %0"
 9                  : "+m" (*lock), "+a" (newval)	/* output operands */
10                  : "r" (1)			/* input operands  */
11                  : "cc");			/* clobbered registers */
12    /* ("memory" means registers get flushed) */
13    return newval;
14}
15
16void spin_lock(unsigned long *lock)
17{
18    while(test_and_set(lock) == 1)
19    {
20        pthread_yield();
21    }
22}
23
24void spin_unlock(unsigned long *lock)
25{
26    *lock = 0;
27}
28

Key points:

  • test_and_set() uses inline assembly to perform an atomic compare-and-exchange operation.

  • spin_lock() loops while another thread owns the lock.

  • pthread_yield() gives the scheduler a chance to run another thread while this thread waits.

  • spin_unlock() releases the lock by storing 0.

Mutexes#

A mutex is a mutual exclusion lock that allows only one owner at a time. Unlike a simple spin lock, a system mutex can put a waiting thread to sleep instead of making it burn CPU cycles.

The pthreads library provides mutexes on POSIX systems:

pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);

pthread_mutex_lock(&mutex);
/* critical section */
pthread_mutex_unlock(&mutex);

Windows provides the same idea through mutex handles:

HANDLE mutex = CreateMutex(NULL, FALSE, NULL);
WaitForSingleObject(mutex, INFINITE);
/* critical section */
ReleaseMutex(mutex);
CloseHandle(mutex);

Mutex Example#

The systems-code-examples/mutex example wraps a low-level lock in a small C++ Mutex class and uses it to protect the shared counter.

 1#include "mutex.hh"
 2#include <pthread.h>
 3#include <stdio.h>
 4
 5int count;
 6Mutex *lock = new Mutex();
 7
 8void increment( int ntimes )
 9{
10    for(int i = 0; i < ntimes; i++)
11    {
12        int c;
13        lock->Lock();
14        c = count;
15        c = c+1;
16        count = c;
17        lock->Unlock();
18    }
19}
20
21int main( int argc, char* argv[])
22{
23    const int n = 100000000;
24
25    pthread_t thread1, thread2;
26    pthread_attr_t threadAttribute;
27
28    pthread_attr_init(&threadAttribute);
29    pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
30
31    printf("starting test. final count should be %d\n", 2*n);
32
33    pthread_create(&thread1, &threadAttribute, (void * (*)(void *))increment, (void *) n);
34    pthread_create(&thread2, &threadAttribute, (void * (*)(void *))increment, (void *) n);
35
36    pthread_join(thread1, NULL);
37    pthread_join(thread2, NULL);
38
39    if( count != 2 * n )
40    {
41        printf("****** Error. Final count is %d\n", count);
42    }
43    else
44    {
45        printf("****** OK. Final count is %d\n", count);
46    }
47
48    return 1;
49}

Key points:

  • lock->Lock() is called before the shared counter is read.

  • lock->Unlock() is called after the updated value is stored.

  • The critical section is the read, increment, and write of count.

  • The two worker threads can run concurrently, but only one may update count at a time.

 1#include "mutex.hh"
 2#include <pthread.h>
 3
 4Mutex::Mutex()
 5{
 6    this->_lock = 0;
 7}
 8
 9Mutex::~Mutex()
10{
11}
12
13void Mutex::Lock()
14{
15    while(test_and_set(&this->_lock) == 1)
16    {
17        pthread_yield();
18    }
19}
20
21unsigned long Mutex::test_and_set(unsigned long *lock)
22{
23    unsigned long newval = 0;
24    asm volatile ("lock; cmpxchgl %2, %0"
25                  : "+m" (*lock), "+a" (newval)	/* output operands */
26                  : "r" (1)			/* input operands  */
27                  : "cc");			/* clobbered registers */
28    return newval;
29}
30
31void Mutex::Unlock()
32{
33    this->_lock = 0;
34}

Key points:

  • The class stores the lock state in _lock.

  • Lock() uses the same atomic test-and-set idea as the spin lock.

  • Unlock() releases ownership by setting _lock back to 0.

  • This example is useful for seeing how a lock can be wrapped behind a smaller interface.

Semaphores#

A semaphore is a synchronization object with an integer count. A thread decrements the count before proceeding and increments it when it releases or signals the resource.

Binary semaphores have only two useful states and can emulate mutexes. Counting semaphores allow up to N units of a resource to be used at the same time. Event semaphores often begin at 0 and are used to signal that something has happened.

Semaphore Implementation#

A semaphore can be built from a lock that protects the count and operations that adjust the count.

int lock = 0;
int semaphore = 0;

void up() {
   spin_lock(&lock);
   semaphore += 1;
   spin_unlock(&lock);
}

void down() {
   spin_lock(&lock);
   while(semaphore == 0) {
      spin_unlock(&lock);
      spin_lock(&lock);
   }
   semaphore -= 1;
   spin_unlock(&lock);
}

This simple version shows the idea but still spins while waiting. System semaphores normally block the waiting thread.

Semaphore Example#

The systems-code-examples/semaphore example uses a semaphore to coordinate worker threads that consume work from a queue.

 1#include "mutex.hh"
 2#include "queue.hh"
 3#include "semaphore.hh"
 4#include "debug.hh"
 5#include <pthread.h>
 6#include <stdio.h>
 7
 8const int countPerThread = 10000000;
 9int count;
10Mutex *lock = new Mutex();
11Semaphore *semaphore = new Semaphore(0);
12Queue *queue = new Queue();
13
14void increment()
15{
16    bool workLeft = true;
17    while(workLeft)
18    {
19        lock->Lock();
20        if(!queue->HasWorkLeft())
21        {
22            workLeft = false;
23        }
24        else if(semaphore->Down())
25        {
26            int c = queue->Dequeue();
27            count = count + c;
28        }
29        lock->Unlock();
30    }
31}
32
33void populateQueue()
34{
35    for(int i = 0; i < countPerThread*2; i++)
36    {
37        lock->Lock();
38        queue->Enqueue(1);
39        semaphore->Up();
40        lock->Unlock();
41    }
42    lock->Lock();
43    queue->DoneAdding();
44    lock->Unlock();
45}
46
47int main( int argc, char* argv[])
48{
49
50    pthread_t thread1, thread2;
51    pthread_attr_t threadAttribute;
52
53    pthread_attr_init(&threadAttribute);
54    pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
55
56    printf("starting test. final count should be %d\n", 2*countPerThread);
57
58    pthread_create(&thread1, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
59    pthread_create(&thread2, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
60
61    populateQueue();
62
63    pthread_join(thread1, NULL);
64    pthread_join(thread2, NULL);
65
66    if( count != 2 * countPerThread )
67    {
68        printf("****** Error. Final count is %d\n", count);
69    }
70    else
71    {
72        printf("****** OK. Final count is %d\n", count);
73    }
74
75    return 1;
76}

Key points:

  • populateQueue() adds work items and calls semaphore->Up() for each new item.

  • Worker threads call semaphore->Down() before removing an item from the queue.

  • The mutex protects the queue while a thread checks for work, removes work, or marks the queue as done.

  • The final count should equal the number of queued work items.

 1#include "semaphore.hh"
 2#include "debug.hh"
 3#include <stdio.h>
 4#include <pthread.h>
 5
 6Semaphore::Semaphore(int initialValue)
 7{
 8    _lock = new Mutex();
 9    _count = initialValue;
10}
11
12Semaphore::~Semaphore()
13{
14    delete _lock;
15}
16
17void Semaphore::Up()
18{
19    _lock->Lock();
20    _count += 1;
21    _lock->Unlock();
22}
23
24bool Semaphore::Down()
25{
26    bool success = false;
27    _lock->Lock();
28    if(_count > 0)
29    {
30        _count -= 1;
31        success = true;
32    }
33    _lock->Unlock();
34    return success;
35}

Key points:

  • Semaphore stores the count in _count.

  • Up() increments the count while holding the internal mutex.

  • Down() decrements the count only when a unit is available.

  • This version returns false instead of blocking when the count is zero, so the caller remains responsible for retry behavior.

Producer-Consumer with Semaphores#

The producer-consumer problem uses one or more producers to place items in a bounded queue and one or more consumers to remove them.

Queue *queue = new Queue(5);
Semaphore *mutex = new Semaphore(1);
Semaphore *empty = new Semaphore(5);
Semaphore *full = new Semaphore(0);

void producer() {
   while(1) {
      empty->down();
      mutex->down();
      int item = produce_item();
      queue->Enqueue(item);
      mutex->up();
      full->up();
   }
}

void consumer() {
   while(1) {
      full->down();
      mutex->down();
      int item = queue->Dequeue();
      consume_item(item);
      mutex->up();
      empty->up();
   }
}

The empty semaphore counts open slots. The full semaphore counts available items. The mutex semaphore protects the queue itself.

Bounded Buffer Example#

The systems-code-examples/bounded-buffer example implements the producer-consumer pattern with pthread mutexes and condition variables.

  1#include <pthread.h>
  2#include <stdio.h>
  3#include <stdlib.h>
  4#include <unistd.h>
  5#include <getopt.h>
  6
  7#include "bboptions.h"
  8#include "bbuffer.h"
  9
 10#include "millisleep.h"
 11
 12
 13
 14/*
 15 * bounded buffer example where there can be any number of suppliers and consumers
 16 *
 17 * in this design:
 18 * - each supplier and consumer runs as a separate thread
 19 *   thread-specific data allows each to share the bound_buffer_t* instance and command-line options/config
 20 *   each supplier and consumer knows its own id.
 21 * - suppliers each generate options->gen_count values
 22 *   consumers consume only (options->gen_count * options->no_suppliers) / options->no_consumers entries
 23 *   some entries may NOT be consumed (ok under most circumstances)
 24 * - because consumers will stop after they have gotten their share of values, others are allowed to consume their fair share
 25 */
 26
 27typedef struct
 28{
 29    bb_options_t* options;
 30    bounded_buffer_t* bb;
 31    int id;
 32} bb_tsd_t;
 33
 34void* supplier(void *tsd)
 35{
 36    bb_tsd_t* bb_tsd = (bb_tsd_t*) tsd;
 37    bounded_buffer_t* bb = bb_tsd->bb;
 38    bb_options_t* options = bb_tsd->options;
 39    int my_id = bb_tsd->id;
 40
 41    lwlog_info("supplier { id: %d, state : \"running\", gen: %d }\n", my_id, options->gen_count);
 42    for (int i=0; i < options->gen_count; i++)
 43    {
 44        millisecond_sleep( rand() % options->supplier_max_delay_ms);
 45        entry_t* new_entry = (entry_t*) malloc(sizeof(entry_t));
 46        new_entry->value = my_id * options->gen_count + i;
 47        bounded_buffer_put(bb, new_entry);
 48        lwlog_info("supplier { id: %d,  entry: %d }\n", my_id, new_entry->value);
 49        bounded_buffer_print_info(bb);
 50    }
 51    lwlog_info("supplier { id: %d, state: \"exit\" }\n", my_id);
 52    pthread_exit(NULL);
 53}
 54
 55
 56/* note: consumer only consumesm options->no_suppliers * options->gen_count / options->no_consumers  entries (some may not be consumed, which is ok) */
 57
 58void* consumer(void *tsd)
 59{
 60    bb_tsd_t* bb_tsd = (bb_tsd_t*) tsd;
 61    bounded_buffer_t* bb = bb_tsd->bb;
 62    bb_options_t* options = bb_tsd->options;
 63    int my_id = bb_tsd->id;
 64    int max_to_consume = options->no_suppliers * options->gen_count / options->no_consumers;
 65    int not_consumed = options->no_suppliers * options->gen_count % options->no_consumers;
 66    if (my_id < not_consumed)
 67        max_to_consume++;
 68    lwlog_info("consumer { id: %d, state: \"running\", messages: %d }\n", my_id, max_to_consume);
 69    for (int i=0; i < max_to_consume; i++)
 70    {
 71        entry_t* entry = bounded_buffer_get(bb);
 72        millisecond_sleep( rand() % options->consumer_max_delay_ms);
 73        lwlog_info("consumer { id: %d, entry: %d }\n", my_id, entry->value);
 74        bounded_buffer_print_info(bb);
 75        free(entry);
 76    }
 77    lwlog_info("consumer { id: %d, state: \"exit\" }\n", my_id);
 78    pthread_exit(NULL);
 79}
 80
 81int main (int argc, char *argv[])
 82{
 83    pthread_attr_t attr;
 84    bounded_buffer_t bb;
 85    bb_options_t options;
 86
 87    bb_options_get(&options, argc, argv);
 88    int no_threads = options.no_suppliers + options.no_consumers;
 89    pthread_t* threads = (pthread_t*) malloc(no_threads * sizeof(pthread_t));
 90    bb_tsd_t* bb_tsd = (bb_tsd_t*) malloc(no_threads * sizeof(bb_tsd_t));
 91
 92    bb_options_print(&options);
 93
 94    bounded_buffer_init(&bb, options.bsize);
 95
 96    pthread_attr_init(&attr);
 97    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 98
 99    /* create supplier threads */
100    for (int i=0; i < options.no_suppliers; i++)
101    {
102        int thread_number = i;
103        bb_tsd[thread_number] = (bb_tsd_t)
104        {
105            .options = &options, .bb = &bb, .id = i
106        };
107        lwlog_info("pthread_create ( thread: %d, type: \"supplier\", id: %d }\n", thread_number, i);
108        pthread_create(&threads[thread_number], &attr, supplier, (void *)&bb_tsd[thread_number]);
109    }
110
111    /* create supplier threads */
112    for (int i=0; i < options.no_consumers; i++)
113    {
114        int thread_number = i+options.no_suppliers;
115        bb_tsd[thread_number] = (bb_tsd_t)
116        {
117            .options = &options, .bb = &bb, .id = i
118        };
119        lwlog_info("pthread_create ( thread: %d, type: \"consumer\", id: %d }\n", thread_number, i);
120        pthread_create(&threads[thread_number], &attr, consumer, (void *)&bb_tsd[thread_number]);
121    }
122
123    lwlog_info("main { threads:  %d, state: \"started\" }\n", no_threads);
124
125    for (int i=0; i < no_threads; i++)
126    {
127        pthread_join(threads[i], NULL);
128    }
129
130    lwlog_info("main { threads:  %d,  state: \"joined\" }\n", no_threads);
131
132    lwlog_info("main { extra_entires:  %d }\n", bounded_buffer_count(&bb));
133
134    int bb_size = bounded_buffer_count(&bb);
135    for (int i=0; i < bb_size; i++)
136    {
137        entry_t* e = bounded_buffer_get(&bb);
138        lwlog_info("main { removed: %d }\n", e->value);
139        free(e);
140    }
141    pthread_attr_destroy(&attr);
142    bounded_buffer_cleanup(&bb);
143    free(threads);
144    free(bb_tsd);
145
146    pthread_exit(NULL);
147}
148

Key points:

  • The program creates supplier threads and consumer threads.

  • Each thread receives thread-specific data containing the shared bounded buffer, command-line options, and its own id.

  • Suppliers allocate entries and place them into the bounded buffer.

  • Consumers remove entries, process them, and free their memory.

  • The main thread joins all worker threads before cleaning up the buffer.

 1#include "bbuffer.h"
 2
 3#include <stdlib.h>
 4
 5void bounded_buffer_init(bounded_buffer_t* bb, int size)
 6{
 7    bb->entries = (entry_t**) malloc(size * sizeof(entry_t*));
 8    for (int i=0; i < size; i++)
 9        bb->entries[i] = NULL;
10    bb->size = size;
11    bb->head = bb->tail = 0;
12    pthread_mutex_init(&bb->lock, NULL);
13    pthread_cond_init (&bb->has_space, NULL);
14    pthread_cond_init (&bb->has_items, NULL);
15}
16
17void bounded_buffer_put(bounded_buffer_t* bb, entry_t* item)
18{
19    pthread_mutex_lock(&bb->lock);
20
21    while (bb->tail - bb->head >= bb->size)
22    {
23        pthread_cond_wait(&bb->has_space, &bb->lock);
24    }
25
26    bb->entries[bb->tail++ % bb->size] = item;
27    pthread_cond_signal(&bb->has_items);
28
29    pthread_mutex_unlock(&bb->lock);
30}
31
32entry_t* bounded_buffer_get(bounded_buffer_t* bb)
33{
34    pthread_mutex_lock(&bb->lock);
35
36    while (bb->tail == bb->head)
37    {
38        pthread_cond_wait(&bb->has_items, &bb->lock);
39    }
40
41    entry_t* entry = bb->entries[bb->head % bb->size];
42    bb->entries[bb->head++ % bb->size] = NULL;
43    pthread_cond_signal(&bb->has_space);
44
45    pthread_mutex_unlock(&bb->lock);
46    return entry;
47}
48
49int bounded_buffer_count(bounded_buffer_t* bb)
50{
51    return bb->tail - bb->head;
52}
53
54void bounded_buffer_cleanup(bounded_buffer_t* bb)
55{
56    int unfreed_count = 0;
57    for (int i=0; i < bb->size; i++)
58        if (bb->entries[i] != NULL)
59            unfreed_count++;
60    if (unfreed_count > 0)
61    {
62        lwlog_info("Warning: %d entries in bounded buffer not freed\n", unfreed_count);
63    }
64    free(bb->entries);
65    pthread_mutex_destroy(&bb->lock);
66    pthread_cond_destroy(&bb->has_space);
67    pthread_cond_destroy(&bb->has_items);
68}
69
70void bounded_buffer_print_info(bounded_buffer_t* bb)
71{
72    printf("buffer { size: %d, length: %d, head: %d, tail: %d, ", bb->size, bb->tail - bb->head, bb->head, bb->tail);
73    printf("entries : [");
74    int add_comma = 0;
75    for (int i=bb->head; i < bb->tail; i++)
76    {
77        if (add_comma)
78        {
79            printf(", %d", bb->entries[i % bb->size]->value);
80        }
81        else
82        {
83            printf("%d", bb->entries[i % bb->size]->value);
84        }
85        add_comma = 1;
86    }
87    printf("]");
88    printf(" }\n");
89}

Key points:

  • bounded_buffer_init() initializes the circular buffer, mutex, and two condition variables.

  • bounded_buffer_put() waits while the buffer is full.

  • bounded_buffer_get() waits while the buffer is empty.

  • has_space wakes a producer when a consumer removes an item.

  • has_items wakes a consumer when a producer adds an item.

Reader-Writer Locks#

A reader-writer lock allows many readers to hold the lock at the same time but gives writers exclusive access.

Reader-writer locks can improve concurrency when reads are common and writes are less frequent. Their implementation is more complex than a plain mutex because the lock must track active readers, active writers, waiting readers, and waiting writers.

Semaphore *read = new Semaphore(0);
Semaphore *write = new Semaphore(0);
Semaphore *lock = new Semaphore(1);
int readWait = 0, writeWait = 0;
int activeRead = 0, activeWrite = 0;

void StartWrite() {
   lock->down();
   if(activeWrite+activeRead+writeWait == 0) {
      write->up();
      activeWrite += 1;
   } else {
      writeWait += 1;
   }
   lock->up();
   write->down();
}

void EndWrite() {
   lock->down();
   activeWrite -= 1;
   if(writeWait > 0) {
      write->up();
      activeWrite += 1;
      writeWait -= 1;
   } else {
      while(readWait>0) {
         read->up();
         activeRead += 1;
         readWait -= 1;
      }
   }
   lock->up();
}

Reader-writer locks force design choices. A lock may give priority to readers, priority to writers, or attempt to balance the two. Each choice can create different starvation and deadlock risks.

POSIX Semaphores#

POSIX semaphores provide a system interface for counting semaphores.

The basic initialization call is:

int sem_init(sem_t *sem, int pshared, unsigned int value);

The sem argument names the semaphore object. The pshared argument controls whether the semaphore is shared only among threads in one process or among processes through shared memory. The value argument sets the initial count.

sem_t sema;
sem_init(&sema, 0, 1);
sem_wait(&sema);
/* critical section */
sem_post(&sema);

Windows Semaphores#

Windows also provides semaphore objects through the Win32 API.

const int minCount = 0;
const int maxCount = 20;
HANDLE sema = CreateSemaphore(NULL, minCount, maxCount, NULL);

WaitForSingleObject(sema, INFINITE);
/* critical section */
ReleaseSemaphore(sema, 1, NULL);

Named semaphores can be shared by separate processes. A process creates or opens a named semaphore and then waits on it with WaitForSingleObject().

Monitors and Condition Variables#

A monitor combines mutual exclusion with a way for threads to wait until a condition becomes true.

Monitor-style synchronization usually has four operations:

  • enter locks the monitor.

  • exit unlocks the monitor.

  • wait releases the lock and blocks until another thread signals.

  • pulse or signal wakes a waiting thread.

The usual pattern is:

enter();
while(condition_to_proceed_is_not_true) {
   wait();
}
do_operation();
if(condition_has_changed) {
   pulse();
}
exit();

The condition is checked in a loop because a thread may wake up before the condition it needs is actually true.

Monitor Example#

The systems-code-examples/monitor example rewrites the queue coordination problem with a monitor abstraction.

 1#include "queue.hh"
 2#include "monitor.hh"
 3#include "debug.hh"
 4#include <pthread.h>
 5#include <stdio.h>
 6
 7const int countPerThread = 10000000;
 8int count;
 9Monitor *monitor = new Monitor();
10Queue *queue = new Queue();
11
12void increment()
13{
14    monitor->Enter();
15    while(queue->HasWorkLeft())
16    {
17        while(queue->IsEmpty() && queue->HasWorkLeft())
18        {
19            monitor->Wait();
20        }
21        if(!queue->IsEmpty())
22        {
23            int c = queue->Dequeue();
24            count = count + c;
25        }
26    }
27    monitor->Exit();
28}
29
30void populateQueue()
31{
32    for(int i = 0; i < countPerThread*2; i++)
33    {
34        monitor->Enter();
35        queue->Enqueue(1);
36        monitor->Pulse();
37        monitor->Exit();
38    }
39    monitor->Enter();
40    queue->DoneAdding();
41    monitor->Exit();
42}
43
44int main( int argc, char* argv[])
45{
46
47    pthread_t thread1, thread2;
48    pthread_attr_t threadAttribute;
49
50    pthread_attr_init(&threadAttribute);
51    pthread_attr_setscope(&threadAttribute, PTHREAD_SCOPE_SYSTEM);
52
53    printf("starting test. final count should be %d\n", 2*countPerThread);
54
55    pthread_create(&thread1, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
56    pthread_create(&thread2, &threadAttribute, (void * (*)(void*))increment, (void*)NULL);
57
58    populateQueue();
59
60    pthread_join(thread1, NULL);
61    pthread_join(thread2, NULL);
62
63    if( count != 2 * countPerThread )
64    {
65        printf("****** Error. Final count is %d\n", count);
66    }
67    else
68    {
69        printf("****** OK. Final count is %d\n", count);
70    }
71
72    return 1;
73}

Key points:

  • Worker threads enter the monitor before inspecting the shared queue.

  • A worker waits while the queue is empty and more work may still arrive.

  • populateQueue() pulses the monitor after it enqueues a new item.

  • DoneAdding() lets waiting workers eventually stop waiting for more work.

  • The monitor keeps the waiting condition close to the protected queue operations.

 1#include "monitor.hh"
 2#include <stdio.h>
 3#include <pthread.h>
 4
 5Monitor::Monitor()
 6{
 7    _lock = new Mutex();
 8    _locked = false;
 9    _pulse = 0;
10}
11
12Monitor::~Monitor()
13{
14    delete _lock;
15}
16
17void Monitor::Enter()
18{
19    _lock->Lock();
20    _locked = true;
21}
22
23void Monitor::Exit()
24{
25    _locked = false;
26    _lock->Unlock();
27}
28
29void Monitor::Wait()
30{
31    if(!_locked)
32    {
33        fprintf(stderr, "wait called wait without a lock\n");
34    }
35    while(_pulse == 0)
36    {
37        Exit();
38        pthread_yield();
39        Enter();
40    }
41    _pulse -= 1;
42}
43
44void Monitor::Pulse()
45{
46    if(!_locked)
47    {
48        fprintf(stderr, "pulse called without a lock\n");
49    }
50    _pulse += 1;
51}

Key points:

  • Enter() obtains the internal mutex and marks the monitor locked.

  • Exit() releases the internal mutex.

  • Wait() repeatedly releases and reacquires the monitor while waiting for a pulse.

  • Pulse() records that one waiter may proceed.

  • This implementation is intentionally small and shows the idea rather than a production condition-variable implementation.

Pthreads Condition Variables#

Pthreads condition variables implement the monitor pattern with an explicit mutex and condition variable.

pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_mutex_init(&mutex);
pthread_cond_init(&cond);

pthread_mutex_lock(&mutex);
while(/* condition is not true */) {
   pthread_cond_wait(&cond, &mutex);
}
/* critical section */
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

pthread_cond_wait() releases the mutex while the thread waits and reacquires it before returning. This lets another thread enter the critical section and change the condition.

Windows Condition Variables#

Windows condition variables provide the same basic pattern as pthread condition variables.

CONDITION_VARIABLE cond;
CRITICAL_SECTION lock;

InitializeConditionVariable(&cond);
InitializeCriticalSection(&lock);

EnterCriticalSection(&lock);
while(/* condition is not true */) {
   SleepConditionVariableCS(&cond, &lock, INFINITE);
}
/* critical section */
WakeConditionVariable(&cond);
LeaveCriticalSection(&lock);

The condition variable works with a CRITICAL_SECTION so the waiting thread can atomically release the lock and block.

C# Bounded Buffer#

The systems-code-examples/threads_csharp directory includes a C# bounded buffer implemented with .NET semaphores.

 1	public class BoundBuffer<T> {
 2		private readonly Semaphore _full;
 3		private readonly Semaphore _empty;
 4		private readonly Semaphore _lock;
 5		private readonly Queue<T> _queue;
 6
 7		public BoundBuffer (int maxCount) {
 8			_empty = new Semaphore (maxCount, maxCount);
 9			_full = new Semaphore (0, maxCount);
10			_lock = new Semaphore (1, 1);
11			_queue = new Queue<T> ();
12		}
13
14		public void Enqueue(T item) {
15			_empty.WaitOne ();
16			_lock.WaitOne ();
17			_queue.Enqueue (item);
18			_lock.Release (1);
19			_full.Release (1);
20		}
21
22		public T Dequeue() {
23			_full.WaitOne ();
24			_lock.WaitOne ();
25			var item = _queue.Dequeue ();
26			_lock.Release(1);
27			_empty.Release(1);
28			return item;
29		}
30	}

Key points:

  • _empty counts open slots in the buffer.

  • _full counts items available to consumers.

  • _lock is a binary semaphore that protects the queue.

  • Enqueue() waits for space before adding an item.

  • Dequeue() waits for an item before removing one.

C# Monitor Pipeline#

The C# pipeline examples use lock, Monitor.Wait(), and Monitor.Pulse() to coordinate staged work.

  1	public class PipelineComputation {
  2		private readonly Queue<byte[]> _readData;
  3		private readonly Queue<byte[]> _compressionData;
  4		private volatile bool _reading = true;
  5		private volatile bool _compressing = true;
  6
  7		public PipelineComputation () {
  8			_readData = new Queue<byte[]>();
  9			_compressionData = new Queue<byte[]>();
 10		}
 11
 12		public void PerformCompression() {
 13			var readerThread = new Thread (FileReader);
 14			var compressThread = new Thread (Compression);
 15			var writerThread = new Thread (FileWriter);
 16			readerThread.Start ();
 17			compressThread.Start ();
 18			writerThread.Start ();
 19			readerThread.Join ();
 20			compressThread.Join ();
 21			writerThread.Join ();
 22		}
 23
 24		private void FileReader() {
 25			using (var stream = new FileStream("file.txt", FileMode.Open, FileAccess.Read)) {
 26				int len;
 27				var buffer = new byte[1024];
 28				while ((len = stream.Read(buffer, 0, buffer.Length)) > 0) {
 29					if (len != buffer.Length) {
 30						Array.Resize (ref buffer, len);
 31					}
 32					lock (_readData) {
 33						while (_readData.Count > 10) {
 34							Monitor.Wait (_readData);
 35						}
 36						_readData.Enqueue(buffer);
 37						Monitor.Pulse (_readData);
 38					}
 39				}
 40			}
 41			_reading = false;
 42		}
 43
 44		private void Compression() {
 45			var workLeft = false;
 46			while (_reading || workLeft) {
 47				workLeft = false;
 48				byte[] dataToCompress = null;
 49				lock (_readData) {
 50					while (_reading && _readData.Count == 0) {
 51						Monitor.Wait (_readData, 100);
 52					}
 53					workLeft = _readData.Count > 1;
 54					if (_readData.Count > 0) {
 55						dataToCompress = _readData.Dequeue ();
 56					}
 57				}
 58				if (dataToCompress != null) {
 59					var compressed = Compress(dataToCompress);
 60					lock (_compressionData) {
 61						while (_compressionData.Count > 10) {
 62							Monitor.Wait (_compressionData, 100);
 63						}
 64						_compressionData.Enqueue (compressed);
 65						Monitor.Pulse (_compressionData);
 66					}
 67				}
 68			}
 69			_compressing = false;
 70		}
 71
 72		private static byte[] Compress(byte[] data) {
 73			var memStream = new MemoryStream ();
 74			using(var compressionStream = new GZipStream(memStream, CompressionMode.Compress)) {
 75				compressionStream.Write(data, 0, data.Length);
 76			}
 77			return memStream.ToArray ();
 78		}
 79
 80		private void FileWriter() {
 81			using (var stream = new FileStream("file.gz", FileMode.OpenOrCreate, FileAccess.Write)) {
 82				var workLeft = false;
 83				while (_compressing || workLeft) {
 84					workLeft = false;
 85					byte[] compressedData = null;
 86					lock (_compressionData) {
 87						while (_compressionData.Count == 0 && _compressing) {
 88							Monitor.Wait (_compressionData, 100);
 89						}
 90						workLeft = _compressionData.Count > 1;
 91						if (_compressionData.Count > 0) {
 92							compressedData = _compressionData.Dequeue ();
 93						}
 94					}
 95					if (compressedData != null) {
 96						stream.Write (compressedData, 0, compressedData.Length);
 97					}
 98				}
 99			}
100		}
101	}

Key points:

  • The reader, compressor, and writer run on separate threads.

  • Each stage communicates through a queue protected by lock.

  • A stage waits when its input queue is empty or its output queue is too large.

  • Monitor.Pulse() wakes a later stage when new work is available.

  • The volatile flags communicate when earlier stages have finished.

C# Threaded Enumeration#

The ThreadedList helper turns an ordinary enumerable into a producer-consumer pipeline.

 1	public class ThreadedList<T> : IEnumerable<T>
 2	{
 3		private readonly IEnumerable<T> _list;
 4
 5		public ThreadedList (IEnumerable<T> list){
 6			_list = list;
 7		}
 8
 9		public IEnumerator<T> GetEnumerator ()
10		{
11			return new ThreadedEnumerator<T>(_list.GetEnumerator ());
12		}
13
14		System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator ()
15		{
16			return GetEnumerator ();
17		}
18
19		private class ThreadedEnumerator<S> : IEnumerator<S> {
20
21			private readonly IEnumerator<S> _enumerator;
22			private readonly Queue<S> _queue;
23			private const int _maxQueueSize = 10;
24			private readonly Thread _thread;
25			private volatile bool _keepGoing = true;
26			private volatile bool _finishedEnumerating = false;
27			private S _current;
28
29			public ThreadedEnumerator(IEnumerator<S> enumerator) {
30				_enumerator = enumerator;
31				_thread = new Thread(Enumerate);
32				_thread.Start();
33			}
34
35			private void Enumerate() {
36				while (_keepGoing) {
37					if (_enumerator.MoveNext ()) {
38						var current = _enumerator.Current;
39						lock (_queue) {
40							while (_queue.Count > _maxQueueSize && _keepGoing) {
41								Monitor.Wait (_queue, 100);
42							}
43							if (_keepGoing) {
44								_queue.Enqueue (current);
45								Monitor.Pulse (_queue);
46							}
47						}
48					} else {
49						break;
50					}
51				}
52				_finishedEnumerating = true;
53			}
54
55			public bool MoveNext ()
56			{
57				lock (_queue) {
58					while (!_finishedEnumerating && _queue.Count == 0) {
59						Monitor.Wait (_queue, 100);
60					}
61					if (_queue.Count > 0) {
62						_current = _queue.Dequeue ();
63						Monitor.Pulse (_queue);
64						return true;
65					} else {
66						_current = default(S);
67						return false;
68					}
69				}
70			}
71
72			public void Reset () {
73				lock (_queue) {
74					lock (_enumerator) {
75						_enumerator.Reset ();
76						_queue.Clear ();
77					}
78				}
79			}
80
81			object System.Collections.IEnumerator.Current {
82				get { return _current; }
83			}
84
85			public void Dispose () {
86				_keepGoing = false;
87				_thread.Join ();
88			}
89
90			public S Current {
91				get { return _current; }
92			}
93		}
94	}

Key points:

  • A background thread advances the wrapped enumerator.

  • Produced values are placed in a bounded queue.

  • MoveNext() waits until an item is available or enumeration is finished.

  • Monitor.Wait() and Monitor.Pulse() coordinate the producer and consumer.

  • Dispose() stops the worker thread and joins it.

Concise Pipeline with ThreadedList#

The concise pipeline example shows how the synchronization details can be hidden behind ThreadedList.

 1	public class ConcisePipelineComputation
 2	{
 3		public ConcisePipelineComputation () {
 4		}
 5
 6		public void PerformCompression() {
 7			var fileBlocks = new ThreadedList<byte[]>(FileReader());
 8			var compressedBlocks = new ThreadedList<byte[]> (Compression(fileBlocks));
 9			FileWriter (compressedBlocks);
10		}
11
12		private IEnumerable<byte[]> FileReader() {
13			using (var stream = new FileStream("file.txt", FileMode.Open, FileAccess.Read)) {
14				int len;
15				var buffer = new byte[1024];
16				while ((len = stream.Read(buffer, 0, buffer.Length)) > 0) {
17					if (len != buffer.Length) {
18						Array.Resize (ref buffer, len);
19					}
20					yield return buffer;
21				}
22			}
23		}
24		
25		private IEnumerable<byte[]> Compression(IEnumerable<byte[]> readBuffer) {
26			foreach (var buffer in readBuffer) {
27				yield return Compress (buffer);
28			}
29		}
30
31		private static byte[] Compress(byte[] data) {
32			var memStream = new MemoryStream ();
33			using(var compressionStream = new GZipStream(memStream, CompressionMode.Compress)) {
34				compressionStream.Write(data, 0, data.Length);
35			}
36			return memStream.ToArray ();
37		}
38
39		private void FileWriter(IEnumerable<byte[]> compressedBuffer) {
40			using (var stream = new FileStream("file.gz", FileMode.OpenOrCreate, FileAccess.Write)) {
41				foreach (var buffer in compressedBuffer) {
42					stream.Write (buffer, 0, buffer.Length);
43				}
44			}
45		}
46	}

Key points:

  • FileReader() yields byte arrays from the input file.

  • Compression() consumes blocks and yields compressed blocks.

  • FileWriter() consumes the compressed stream.

  • ThreadedList lets adjacent stages overlap execution.

  • The pipeline reads like ordinary iterator code while still using threaded buffering internally.

Dining Philosophers#

The dining philosophers problem is a classic example of lock ordering, resource contention, and deadlock.

The complete systems-code-examples/dining-philosophers example fits best in the deadlock chapter. It will be used there as the main example for circular wait and deadlock avoidance.