C++20: Thread Synchronization with Coroutines

Contents[Show]

It's a typical requirement for thread management to synchronize them. One thread prepares, in this case, a work-package another thread is waiting for. 

 

TimelineCpp20

I assume most of you use condition variables for this sender/receiver or producer/consumer workflow. Condition variables have many inherent risks such as spurious wakeup and lost wakeup. Before I implement thread synchronization with coroutines, let me rephrase from a previous post about the inherent challenges of condition variables.

The Challenges of Condition Variables 

Here is the pattern for the correct usage of condition variables.

 

// conditionVariables.cpp

#include <condition_variable>
#include <iostream>
#include <thread>

std::mutex mutex_;
std::condition_variable condVar; 

bool dataReady{false};

void waitingForWork(){
    std::cout << "Waiting " << std::endl;
    std::unique_lock<std::mutex> lck(mutex_);
    condVar.wait(lck, []{ return dataReady; });   // (4)
    std::cout << "Running " << std::endl;
}

void setDataReady(){
    {
        std::lock_guard<std::mutex> lck(mutex_);
        dataReady = true;
    }
    std::cout << "Data prepared" << std::endl;
    condVar.notify_one();                        // (3)
}

int main(){
    
  std::cout << std::endl;

  std::thread t1(waitingForWork);               // (1)
  std::thread t2(setDataReady);                 // (2)

  t1.join();
  t2.join();
  
  std::cout << std::endl;
  
}

 

How does the synchronization work? The program has two child threads: t1 and t2. They get their work package waitingForWork and setDataRead in lines (1 and 2). setDataReady notifies - using the condition variable condVar - that it is done with the preparation of the work: condVar.notify_one()(line 3). While holding the lock, thread t1 waits for its notification: condVar.wait(lck, []{ return dataReady; })( line 4). The sender and receiver need a lock. In the case of the sender a std::lock_guard is sufficient, because it calls lock and unlock only once. In the case of the receiver, a std::unique_lock is necessary because it usually frequently locks and unlocks its mutex.

Finally, the output of the program.

 conditionVariable

Maybe you are wondering: Why do you need a predicate for the wait call because you can invoke wait without a predicate? This workflow seems quite too complicated for such a simple synchronization of threads. 

Now we are back to the missing memory condition variables have and the two phenomena called lost wakeup and spurious wakeup.

Lost Wakeup and Spurious Wakeup

  • Lost wakeup: The sender sends its notification before the receiver is the wait state. The consequence is that the notification is lost. 
  • Spurious wakeup: It may happen that the receiver wakes up, although no notification happened.

To become not the victim of these two issues, you have to use an additional predicate as memory. If not you have in this example a 50/50 chance for a lost wakeup. A lost wakeup is essentially a deadlock because a thread waits for something that never happens. 

This is not the end of the traps you can fall into with condition variables. Read the details in the previous post: C++ Core Guidelines: Be Aware of the Traps of Condition Variables.

Thanks to coroutines, thread synchronization is quite easy without the inherent risks of condition variables such as spurious wakeups and lost wakeups.

Thread Synchronization with co_await

 

// senderReceiver.cpp

#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>

class Event {
 public:

    Event() = default;

    Event(const Event&) = delete;
    Event(Event&&) = delete;
    Event& operator=(const Event&) = delete;
    Event& operator=(Event&&) = delete;

    class Awaiter;
    Awaiter operator co_await() const noexcept;

    void notify() noexcept;

 private:

    friend class Awaiter;
  
    mutable std::atomic<void*> suspendedWaiter{nullptr};
    mutable std::atomic<bool> notified{false};

};

class Event::Awaiter {
 public:
  Awaiter(const Event& eve): event(eve) {}

  bool await_ready() const;
  bool await_suspend(std::coroutine_handle<> corHandle) noexcept;
  void await_resume() noexcept {}

 private:
    friend class Event;

    const Event& event;
    std::coroutine_handle<> coroutineHandle;
};

bool Event::Awaiter::await_ready() const {                            // (7)
  
    // allow at most one waiter
    if (event.suspendedWaiter.load() != nullptr){
        throw std::runtime_error("More than one waiter is not valid");
    }
  
    // event.notified == false; suspends the coroutine
    // event.notified == true; the coroutine is executed such as a usual function
    return event.notified;
}
                                                                     // (8)
bool Event::Awaiter::await_suspend(std::coroutine_handle<> corHandle) noexcept {

    coroutineHandle = corHandle;                                    
  
    if (event.notified) return false;
  
    // store the waiter for later notification
    event.suspendedWaiter.store(this);

    return true;
}

void Event::notify() noexcept {                                        // (6)
    notified = true;
  
    // try to load the waiter
    auto* waiter = static_cast<Awaiter*>(suspendedWaiter.load());
 
    // check if a waiter is available
    if (waiter != nullptr) {
        // resume the coroutine => await_resume
        waiter->coroutineHandle.resume();
    }
}

Event::Awaiter Event::operator co_await() const noexcept {
    return Awaiter{ *this };
}

struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

Task receiver(Event& event) {                                        // (3)
    auto start = std::chrono::high_resolution_clock::now();
    co_await event;                                                 
    std::cout << "Got the notification! " << std::endl;
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;
    std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}

using namespace std::chrono_literals;

int main(){
    
    std::cout << std::endl;
    
    std::cout << "Notification before waiting" << std::endl;
    Event event1{};
    auto senderThread1 = std::thread([&event1]{ event1.notify(); });  // (1)
    auto receiverThread1 = std::thread(receiver, std::ref(event1));   // (4)
    
    receiverThread1.join();
    senderThread1.join();
    
    std::cout << std::endl;
    
    std::cout << "Notification after 2 seconds waiting" << std::endl;
    Event event2{};
    auto receiverThread2 = std::thread(receiver, std::ref(event2));  // (5)
    auto senderThread2 = std::thread([&event2]{
      std::this_thread::sleep_for(2s);
      event2.notify();                                               // (2)
    });
    
    receiverThread2.join();
    senderThread2.join();
     
    std::cout << std::endl;
    
}

 

Thread synchronization with coroutines is straightforward from the user's perspective. Let’s have a look at the program senderReiver.cpp. The threads senderThread1 (line 1) and senderThread2 (line 2) use an event to send its notification. The function receiver in line (3) is the coroutine which is executed in the thread receiverThread1 (line 4) and receiverThread2 (line 5). I measured the time between the begin and the end of the coroutine and displayed it. This number shows how long the coroutine waits. The following screenshot displays the output of the program with the Wandbox.  The Compiler Explorer does not support thread creation but Matt is "on it". 

senderReceiver


The output displays that the execution of the second coroutine takes about two seconds. The reason is that event1 sends its notification (line 1) before the coroutine is suspended, but the event2 sends its notification after a time duration of 2 seconds (line 2).

Now, I put the implementer's hat on and give you a rough idea of the workflow of the coroutine framework. 

The simplified workflow

If you compare the class Generator in the last post (C++20: An Infinite Data Stream with Coroutines) with the class Event in this example, there is a subtle difference. In the first case, the Generator is the awaitable and the awaiter, in the second case, the Event uses the operator co_await to return the awaiter. This separation of concerns into the awaitable and the awaiter improves the structure of the code.

In my explanation to both workflows, I assume, the in the first case the event notification happens before the coroutine awaits the events. For the second case, I assume it the other way around. 

Let’s first look at event1 and the first workflow. event1 send its notification before receiverThread1 is started. The call event1 (line 1) triggers the member function notify (line 6). First the notification flag is set and then, the call auto* waiter = static_cast<awaiter*>(suspendedWaiter.load()); loads the potential waiter. In this case, the waiter is a nullptr because it was not set before. This means the following resume call on the waiter is not executed. The subsequentially performed function await_ready (line 7) checks first if there is more than one waiter. In this case, I throw for simplicity reasons a std::runtime exception. The crucial part of this member function is the return value. event.notification was already set to true in the notify method. true means in this case that the coroutine is not suspended and executes such as a usual function.

In the second workflow, the co_await event2 call happens before event2 sends its notification. co_wait event2 triggers the call await_ready (line 7). The big difference to the first workflow is now that event.notified is false. This false value causes the suspension of the coroutine. Technically the member function await_suspend (lines 8) is executed. await_suspend gets the coroutine handle corHandle and stores it for later invocation in the variable coroutineHandle. Of course, later invocation means resumption. Secondly, the waiter is stored in the variable suspendedWaiter. When later event2.notify triggers its notification, the method notify (line 6) is executed. The difference to the first workflow is that the condition waiter != nullptr evaluates to true. The consequence is that the waiter uses the coroutineHandle to resume the coroutine.

What's next?

If I have one conclusion to this and my last post (C++20: An Infinite Data Stream with Coroutines) then this one: Don't implement your coroutines. Use existing coroutines such as the one available with cppcoro from Lewis Baker. I exactly follow this advice in my next post.

Four Voucher for Educative

There are four vouchers for educative to win: https://bit.ly/VoucherEducative. The vouchers allow you to access all educative.io courses for a quarter of a year.

 

 

Thanks a lot to my Patreon Supporters: Meeting C++, Matt Braun, Roman Postanciuc, Venkata Ramesh Gudpati, Tobias Zindl, Marko, G Prvulovic, Reinhold Dröge, Abernitzke, Richard Ohnemus, Frank Grimm, Sakib, Broeserl, António Pina, Markus Falkner, Darshan Mody, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, espkk, Wolfgang Gärtner, Dendi Suhubdy, and Jon Hess.

 

 

Thanks in particular to:   crp4

 

   

Get your e-book at Leanpub:

The C++ Standard Library

 

Concurrency With Modern C++

 

Get Both as one Bundle

cover   ConcurrencyCoverFrame   bundle
With C++11, C++14, and C++17 we got a lot of new C++ libraries. In addition, the existing ones are greatly improved. The key idea of my book is to give you the necessary information to the current C++ libraries in about 200 pages. I also included more than 120 source files.  

C++11 is the first C++ standard that deals with concurrency. The story goes on with C++17 and will continue with C++20.

I'll give you a detailed insight into the current and upcoming concurrency in C++. This insight includes the theory and a lot of practice with more than 140 source files.

 

Get my books "The C++ Standard Library" (including C++17) and "Concurrency with Modern C++" in a bundle.

In sum, you get more than 700 pages full of modern C++ and more than 260 source files presenting the standard library and concurrency in practice.

 

Comments   

0 #1 Marius 2020-04-13 11:49
One page program just became 3+ pages, really?
Also, cond.notify_one should be protected with the corresponding mutex lock (otherwise the notification will be lost).
Moreover, there is no reason to risk with notify_one, always use notify_all.
Quote
0 #2 Arne Kreutzmann 2020-04-14 04:55
I really like your blog posts and concise examples.
I have one question, why are you using

mutable std::atomic suspendedWaiter{nullptr};

instead of

mutable std::atomic suspendedWaiter{nullptr};

?
Quote
0 #3 Vaughn Cato 2020-04-14 15:59
Thanks for this article. In the senderReceiver.cpp example, if the receiver suspends before the sender notifies the event, isn't it true that the second part of the receiver will be executed in the sender's thread?
Quote
0 #4 Jacek 2020-04-15 06:09
Great post, thanks.

I have 2 questions, or remarks, though.

1. When you start two threads, one by one, as you did in your example, you actually don't have the guarantee that they will be really started in this order. It is up to the operating system, isn't it?

2. Could you please also explain what happens on the thread level? When the thread which runs the coroutine (receiver) executes the co_await instruction, the coroutine is suspended, but what happens to the thread itself? When the notification comes, is the resume method executed in the same thread in which it is called (thus executing the remaining part of a coroutine in different thread than it originally started in), or is the execution brought back to the same thread where the coroutine started?
Quote

My Newest E-Books

Course: Modern C++ Concurrency in Practice

Course: C++ Standard Library including C++14 & C++17

Course: Embedded Programming with Modern C++

Course: Generic Programming (Templates)

Course: C++ Fundamentals for Professionals

Subscribe to the newsletter (+ pdf bundle)

Blog archive

Source Code

Visitors

Today 1688

All 4375963

Currently are 243 guests and no members online

Kubik-Rubik Joomla! Extensions

Latest comments