Skip to content

Chapter 15: Concurrency

15.1

The standard library directly supports concurrent execution of multiple threads in a single address space. The atomic operations allow lock-free programming. The main standard-library concurrency support facilities: thread, mutex, lock() operations, packaged_task and future. These features are built directly upon what operating systems offer and do no incur performance penalities compared with those.

15.2

We call a computation that can potentially be executed concurrently with other computations a task. A thread is the system-level representation of a task in a program. A task to be executed concurrently with other tasks is launched by constructing a std::thread (in <thread>) with the task as its arguments. A task is a function or a function object:

void f();
struct F{ operator()();};

void run_tasks(){
    thread t1{f};
    thread t2{F()};
    t1.join();
    t2.join();
}

The join()s ensure that we don't exit run_tasks() until the threads have completed. To "join" a thread means to "wait for the thread to terminate".

One big challenge in concurrent program design is to make sure tasks are synchronized when they use shared resources or objects, such as std::cout, with some form of synnchronization.

When defining tasks of a concurrent program, our aim is to keep tasks completely separate except where they communicate in simple and obvious way. The simplest way of thinking of a concurrent task is as a function that happens to run concurrently with its caller.

15.3

how to pass argument to tasks?

void f(vector<double>& v);
struct F{
    vector<double>& v;
    F(vector<double>& vv): v{vv} {};
    void operator()();
};

void run_tasks(){
    thread t1{f, std::ref(some_vec)};
    thread t2{F{vec2}};
    t1.join();
    t2.join();
}

This initialization with {f, std::ref{some_vec}} uses a thread variadic template constructor that can accept an arbitrary sequence of arguments. The std::ref() is a type function from <functional> that unfortunately is needed to tell the variadic template to treat some_vec as a reference, rather than as an object. Without that std::ref(), some_vec would be passed by value. The compiler checks that the first argument can be invoked given the following arguments and builds the necessary function object to pass to the thread.

15.4

after we know how to pass arugments, how to get returns from tasks?

one way of course is to pass non-const reference, but that is sneaky. A less obscure technique is to pass the input data by const reference and to pass the location of a place to deposit the thresult as a separate argument (i.e. output parameter).

void run_task(){
    double res1, res2, res3;
    thread t1{f, std::cref(vect1), &res1};
    thread t2{F{vec2, &res2}};
    thread t3{[&]() { res3 = g(vec3); }}; 
}
However, all of these are not elegant. One possible alternative is to use future

15.5

Sometimes tasks need to share data. In that case, the access has to be synchronized so that at most one task at a time has access. The fundamental element of the solution is a mutex, a "mutual exclusion object." A thread acquires a mutex using a lock() operation.

The type of lck can be deduced to be scoped_lock<mutex>. The scoped_lock's constructor acquires the mutex (through a call m.lock()). If another thread has already acquired the mutex, the thread waits ("blocks") until the other thread completes its access. Once a thread has completed its access to the shared data, the scoped_lock releases the mutex (with a call m.unlock()). When a mutex is released, threads waiting for it resume executing. All these types can be found in <mutex>

The correspondence between the shared data and a mutex is conventional: the programmer simply has to know which mutex is supposed to correspond to which data. Obviously, this is error-prone, and equally obviously we try to make the correspondence clear through various language means, such as setting mutex as a class member.

It is not uncommon to need to simultaneously access several resources to perform some action. This can lead to deadlock. For example., if thread1 acquires mutex1and then tries to acquire mutex2 while thread2 acquires mutex2 and then tries to acquire mutex1, then neither task will ever proceed further. The scoped_lock helps by enabling us to acquire several locks simultaneously

void f(){
    scoped_lock lck{mutex1, mutex2, mutex3};  // acquire all three locks.
    //...
}
This scoped_lock will proceed only after acquiring all its mutexes arguments and will never block while holding a mutex.

Communicating through shared data is pretty low level. Use of shared data is inferior to the notion of call and return. Some people are convinced that sharing must be more efficient than copying arguments and returns. That can indeed be so when large amounts of data are involved, but locking and unlocking are relatively expensive operations. On the other hand, modern machines are very good at copying data, especially compact data, such as vector elements. So don't choose shared data for communication because of "efficiency" without thought and preferably not without measurements.

One of the most common ways of sharing data is among many readers and a single writer. This "reader-writer lock" idiom is supported by shared_mutex. A reader will acquire the mutex "shared" so that other readers can still gain access, whereas a writer will demand exclusive access.

shared_mutex mx;
void reader(){
    shared_lock lck{mx};
    //... read ...
}
void writer(){
    unique_lock lck{mx};
    //... write ...
}

15.6

Sometimes, a thread needs to wait for some kind of external event, such as another thread completing a task or a certain amount of time having passed. One common case is waiting on time.

using namespace std::chrono;
auto t0=high_resolution_clock::now();
this_thread::sleep_for(milliseconds{20});
auto t1=high_resolution_clock::now();
cout << duration_cast<nanoseconds>(t1 - t0).count() << " nanoseconds passed.";

here, this_thread refers to the one and only thread, which could be the main.

The basic support for communicating using external events is provided by condition_variables found in <condition_variable>. A condition_variable is a mechanism allowing one thread to wait for another. In particular, it allows a thread to wait for some conditoin (often called an event) to occur as the result of work done by other threads.

Using condition_variables supports many forms of elegant and efficient sharing but can be rather tricky. Here is an example of queue

class Message {
    // ...
};

queue<Message> mqueue;
condition_variable mcond;
mutex mmutex;

void consumer(){
    while(true){
        unique_lock lck {mmutex};                           // acquire mmutex
        mcond.wait(lck, []{ return !mqueue.empty(); });     // release lck and wait;
                                                            // re-acquire lck upon wakeup, don't wake up unless mqueue is non-empty
        auto m = mqueue.front();
        mqueue.pop();
        lck.unlock();   // release lck
    }
}

void produce(){
    while (true){
        Message m;
        //... fill the message
        scoped_lock lck{mmutex};
        mqueue.push(m);
        mcond.notify_one();
    }
}

Waiting on condition_variable releases its lock argument until the wait is oover (so that the queue is non-empty) and then reacqurie it. THe explicit check of the condition, here !mqueue.empty(), protects against waking up just to find that some other task has "gotten there first so that the condition no longer holds. Also, unique_lock is used rather than scoped_lock because

  • we need to pass the lock to the condition_variable's wait(). A scoped_lock cannot be copied, but a unique_lock can.
  • we want to unlock the mutex protecting the condition variable before processing the message. A unique_lock() offers operations, such as lock() and unlock(), for low-level control of synchronization.
  • con: unique_lock can only hanlde a single mutex.

15.7

To operate at the conceptual level of task, instead of the low-level details of threads/locks, the standard library offers

  • future and promise for returning a value from a task spawned on a separate thread
  • packaged_task to help launch tasks and connect up the mechanisms for returning a result
  • async() for launching a task in a manner ver similar to calling a function.

All of these are found in <future>

15.7.1

The important point about future and promise is that they enable a transfer of a value between two tasks without explicit use of a lock; "the system" implements the transfer efficiently. The basic idea is simple: when a task wants to pass a value to another, it puts the value into a promise. Somehow, the implementation makes the value appear in the corresponding future, from which it can be read.

If we have a future<X> called fx, we can get() a value of type X from it:

X v = fx.get();

If the value isn't there yet, our thread is blocked until it arrives. If the value couldn't be computed, get() might throw an exception.

The main purpose of a promise is to provide simple "put" operations (called set_value() and set_exception()) to match future's get(): this pair can either pass a value or pass an exception.

void f(promise<X>& px){
    //...
    try {
        X res;
        // compute the results
        px.set_value(res);
    }
    catch(...){
        px.set_exception(current_exception());
    }
}

The current_exception refers to the caught exception.

void g(future<X>& fx){
    //...
    try{
        X v = fx.get();
    }
    catch (...){
        // handle error
    }
}

15.7.2

The packaged_task type is provided to simplify setting up tasks connected with futures and promises to be run on threads. A packged_task provides wrapper code to put the return value or exception from the task into a promise. If you ask it by calling get_future, a packaged_task will give you the future corresponding to its promise.

double accum(double* beg, double* end, double init){
    return std::accumulate(begin, end, init);
}
double comp2(vector<double>& v){
    using Task_type = double(double*, double*, double);
    packaged_task<Task_type> pt0 {accum};
    packaged_task<Task_type> pt1 {accum};
    future<double> f0{pt0.get_future()};
    future<double> f1{pt1.get_future()};

    double* first = &v[0];
    thread t1{move(pt0), first, first+V/size()/2, 0.0};
    thread t2{move{pt1}, first+v.size()/2, first+v.size(), 0.0};

    return f0.get() + f1.get();
}

The packaged_task template takes the type of the task as its tempalte argument, and the task as its constructor argument. The move() opeartions are needed because a packaged_task cannot be copied. The reason that a packaged_task cannot be copied is that it is a resource handle: it owns its promise and is (indirectly) responsible for whatever resources its task may own.

15.7.3

Treat a task as a function that may happen to run concurrently with other tasks. It is far from the only model supported by the C++ standard library, but it serves well for a wide range of needs.

To launch tasks to potentially run asynchronously, we can use async()

double comp4(vector<double>& v){
    if (v.size() < 1000) {
        return accum(v.begin(), v.end(), 0.0);
    }

    auto v0 = &v[0];
    auto sz = v.size();

    auto f0 = async(accum, v0, v0+sz/4, 0.0);
    auto f1 = async(accum, v0+sz/4, v0+sz/2, 0.0);
    auto f2 = async(accum, v0+sz/2, v0+sz*3/4, 0.0);
    auto f3 = async(accum, v0+sz*3/4, v0+sz, 0.0);

    return f0.get() + f1.get() + f2.get() + f3.get();
}

Basically async() separates the "call part" of a function call from the "get the result part and separates both from the actual execution of the task. Using async() you don't have to think about threads and locks. Instead, you think just in terms of tasks that potentially compute their results asynchronouslhy. There is an obvious limitation: don't even think of using async() for tasks that share resources needing locking. With async() you don't even know how many threads will be used because that's up to async() to decide based on what it knows about the system resources available at the time of a call. For example, async() may check whether any idel cores are available before deciding how many threads to use.

Using a guess about the cost of computation relative to the cost of launching a thread, such as v.size()<1000, is very primitive and pront to gross mistakes about performance.

18.5.4 Stopping a thread from the 3rd edition

Instead, the standard library provides a mechanism for politely requesting a thread to clean up and go away: a stop_token. A thread can be programmed to terminate if it has a stop_token and is requested to stop.

atmoic<int> result = -1;
template<class T>
struct Range{
    T* first;
    T* last;
};

void find(stop_token tok, const string* base, const Range<string> r, const string target)
{
    for (string* p = r.first; p!=r.last && !tok.stop_requested(); ++p)
    if (match(*p, target)){
        result = p - base;
        return;
    }
}

18.6 Coroutine from the 3rd edition

generator<long long> fib()
{
    long long a = 0;
    long long b = 1;
    while (a<b){
        auto next = a + b;
        co_yield next;
        a = b;
        b = next;
    }
    co_return 0;
}

void user(int max)
{
    for (int i =0; i++<max;)
        cout << fib()  << ' ';
}

18.6.1 Cooperative Multitasking

The keys to such designs are

  • Many different coroutines that maintain their state between calls
  • A form of polymorphism that allows us to keep lists of events containing different kinds of coroutines and invoke them independently of their types.
  • A scheduler that selects the next coroutine(s) to run from the list(s).

A thread requires a megabyte or two (mostly for its stack), a coroutine often only a couple of dozen byes.