Back to Home Back to Notes

Concurrency in C++11

Syntactic sugar that makes it harder to shoot yourself in the foot
13th December, 2020

Until recently, I was under the impression that POSIX threads or OpenMP were the only ways in which you could write concurrent programs when using C++. I find the pthreads interface downright ugly, and you can see the ugliness in the compiler-generated lowering of OpenMP code into calls to the pthreads library, so it was refreshing to learn (thanks to Jeffrey Daily) that C++11 includes constructs that bring concurrency to the language. Although the language’s concurrency constructs are still lowered into calls to libpthreads (at least on OpenBSD v6.8), they do seem to make concurrent programming easier.

Starting and Joining Threads

Unlike OpenMP’s fork-join model, the C++11 constructs are similar to (while still being simpler than) the pthreads interface, where you point to a (lambda) function that is executed inside a separate thread. Here is a rudimentary example:

#include <iostream>
#include <thread>

// code to be executed inside a thread; return type has to be `void`
void thread_fn(uint16_t input_0, uint16_t input_1, uint16_t input_2) {
  uint16_t result = input_0 + input_1 + input_2;
  std::cout << "result: " << result << std::endl;
}

int main() {
  std::thread new_thread(thread_fn, 42, 86, 5);

  // block until the thread finishes executing
  new_thread.join();

  return 0;
}

While this is great, C++11 offers a slightly easier mechanism too, for creating concurrent tasks, in the form of async/futures. As I understand, the key enhancement in async/futures is the syntactic sugar to allow the thread function to return an arbitrary value.

#include <future>
#include <iostream>

// code to be executed inside a thread; note the return type
uint16_t thread_fn(uint16_t input_0, uint16_t input_1, uint16_t input_2) {
  return input_0 + input_1 + input_2;
}

int main() {
  std::future<uint16_t> future = std::async(std::launch::async, thread_fn, 42,
      86, 5);

  // peek at the async/future's completion
  if (future.valid() == false) {

    // do some other work, if any
  }

  // block until the async/future execution completes
  std::cout << "result: " << future.get() << std::endl;
  return 0;
}

Basic Synchronization Using Mutexes

C++11 offers a mutex type, which supports lock() and unlock() operations to guard access.

#include <future>
#include <iostream>
#include <mutex>

uint16_t result;
std::mutex mutex;

// code to be executed inside a thread
uint16_t thread_fn(uint16_t input_0, uint16_t input_1, uint16_t input_2) {
  uint16_t partial_sum = input_0 + input_1 + input_2;

  mutex.lock();
  result += partial_sum;
  mutex.unlock();

  return partial_sum;
}

int main() {
  std::future<uint16_t> future_0 = std::async(std::launch::async, thread_fn,
      42, 86, 5);

  std::future<uint16_t> future_1 = std::async(std::launch::async, thread_fn,
      142, 286, 35);

  // block until the async/future execution completes
  uint16_t partial_0 = future_0.get();
  uint16_t partial_1 = future_1.get();

  std::cout << "sum of partial results (" << partial_0 << ", " << partial_1 <<
    ") = " << partial_0 + partial_1 << std::endl;

  // if the code executed correctly, the global result should match the sum
  std::cout << "global result: " << result << std::endl;

  return 0;
}

Eliding (Some) Mutexes Using std::atomic

C++11 also allows you to wrap any type into an atomic class, which supposedly elides the use of mutexes for basic types like int and float. For more complex types, the implementation uses mutexes. The atomic class implements certain common operators, in addition to load and store operations.

std::atomic<uint16_t> result;

// code to be executed inside a thread
uint16_t thread_fn(uint16_t input_0, uint16_t input_1, uint16_t input_2) {
  uint16_t partial_sum = input_0 + input_1 + input_2;

  result += partial_sum;
  return partial_sum;
}

Unlocking Mutexes When They Go Out of Scope

In complex control flow, it is easy to miss unlock() statements, so C++11 offers a lock_guard class, which unlocks the mutex whenever it goes out of scope, similar to Go’s defer statements.

uint16_t result;
std::mutex mutex;

// code to be executed inside a thread
uint16_t thread_fn(uint16_t input_0, uint16_t input_1, uint16_t input_2) {
  uint16_t partial_sum = input_0 + input_1 + input_2;

  // This holds the lock until the end of the current scope
  std::lock_guard<std::mutex> lock(mutex);

  result += partial_sum;
  return partial_sum;
}

Blocking Execution While Waiting

C++11 offers condition variables, which, although useful, have complicated semantics. A condition variable is useful when you want to block program execution until a certain condition (such as the buffer is not empty) is reached. Since execution blocks until the thread is awoken, CPU usage does not spike, like it would with a busy loop.

Condition variables accept a mutex (actually, std::unique_lock) and an optional predicate. As I understand (which could be horribly incorrect, given my limited understanding), the condition variable’s wait() function works as follows:

  1. Evaluate the predicate.
  2. If the predicate is false, put the thread into a sleep state.
  3. If the predicate is true, grab the lock and execute subsequent statements.
  4. When the thread is awoken from its sleep state, go to step (1)
  5. The lock is released when the unique_lock type goes out of scope

Calling notify_one() in a separate thread wakes up one of the threads blocked on the wait() execution. Calling notify_all() wakes up all blocked threads.

Despite the (supposed) simplicity, the use of condition variables becomes complicated because of:

Semaphores as an Alternative

I find semaphores much more intuitive, and after a couple of hours of fiddling, I think I have a bug-free (counting / binary) semaphore implementation implemented using condition variables, mutexes, and atomic types. The comments and example usage should clarify the rationale behind specific parts of the implementation.

Both binary and counting semaphores are offered in C++20, but since they don’t exist in C++11, here is my attempt at implementing them using C++11.

#include <future>
#include <iostream>
#include <mutex>
#include <vector>

class semaphore_t {
  private:
    // A constant to store the maximum value of the semaphore:
    // 1 => binary semaphore
    // 0 => counting semaphore, limit = max(uint64_t)
    // x => counting semaphore, limit = x
    const uint64_t limit;

    // An atomic variable to count the value of the semaphore
    std::atomic<uint64_t> count;

    // A mutex and a condition variable to block the wait() method
    std::mutex not_empty_mutex;
    std::condition_variable not_empty_cond;

    // A mutex and a condition variable to block the post() method
    std::mutex not_full_mutex;
    std::condition_variable not_full_cond;

    // IMP: To use less memory, we could use just one condition variable to
    // block both post() and wait() invocations, but the implementation will
    // be inefficient, since a post() invocation could unblock another post()
    // invocation, which would be useless if the semaphore has already reached
    // its limit.

  public:
    explicit semaphore_t(uint64_t __limit) : limit(__limit) {
      // Make sure that the semaphore starts out closed.
      count = 0;
    }

    // Use zero to indicate no limit on the value of the semaphore.
    semaphore_t() : semaphore_t(0) {}

    void wait() {
      // Create a unique_lock to be used with the condition variable.
      std::unique_lock<std::mutex> lock(not_empty_mutex);

      // Wait until the count is greater than zero.  Avoid a do-while loop
      // here, since that could cause the execution to deadlock if
      // notify_one() executes before the condition variable's wait().
      //
      // Since the lock is held once code continues beyond this statement, we
      // can be sure that the subsequent decrement of the count will be
      // executed by just one thread.
      //
      // XXX: I presume that in the event of a spurious wakeup, the
      // recently-awake thread holds the lock, guaranteeing that it either
      // goes back to waiting in the loop until the count is non-zero, or it
      // is the only thread that proceeded past the not_empty_cond.wait()
      // statement.
      not_empty_cond.wait(lock, [this]() { return count > 0; });

      // Decrement the semaphore count.  Safe because only one execution of
      // the semaphore's wait() function decrements at a time.  Also, since
      // the count is wrapped in an atomic type, it can be decremented
      // concurrently with the semaphore's post() function's increment.
      count -= 1;
      not_full_cond.notify_one();
    }

    void post() {
      // Create a unique_lock to be used with the condition variable.
      std::unique_lock<std::mutex> lock(not_full_mutex);

      // Wait until the count is less than the limit.  Since limit is a
      // compile-time constant, we don't need to include it in the predicate
      // of the not_full_cond condition variable.
      if (limit > 0) {
        not_full_cond.wait(lock, [this]() { return count < limit; });
      }

      // We can let other executions interleave between the increment and
      // the notify_one() call.  Specifically, if all notify_one() calls
      // happen before executing the semaphore's wait() function, then all
      // wait() functions will skip over the loop in wait().  On the other
      // hand, if any wait() functions execute in between the increment and
      // the notify_one() call, then the execution that is blocked inside the
      // loop will break out of the not_empty_cond.wait() statement.
      count += 1;
      not_empty_cond.notify_one();
    }
};

// Create a binary semaphore
semaphore_t sem(1);

// Typedef to simplify creation of list of producers and consumers
typedef std::future<uint64_t> future_t;

// Counts of items produced and consumed
std::atomic<uint64_t> p_count, c_count;

// consumer function: wait for item, then consume it, and return a dummy value
uint64_t consumer() {
  sem.wait();
  c_count += 1;
  return c_count;
}

// consumer function: produce item, signal consumer, and return a dummy value
uint64_t producer() {
  p_count += 1;
  sem.post();
  return p_count;
}

int main() {
  {
    // vector of 600 consumers
    std::vector<future_t> consumers;
    for (uint32_t idx = 0; idx < 600; idx += 1) {
      consumers.emplace_back(std::async(std::launch::async, consumer));
    }

    // vector of 600 producer
    std::vector<future_t> producers;
    for (uint32_t idx = 0; idx < 600; idx += 1) {
      producers.emplace_back(std::async(std::launch::async, producer));
    }

    // Since producers and consumers are about to go out of scope, the element
    // destructors automatically call future::get(), so execution blocks until
    // all producers and consumers have terminated.
    std::cout << "processing started" << std::endl;
  }

  std::cout << "processed: " << p_count << ", consumed: " << c_count <<
    "." << std::endl;

  return 0;
}

There are, of course, a number of other useful concurrency constructs in C++11, but I imagine that the ones described above are sufficient to start writing concurrent programs.

Back to Notes