C++20: Thread Synchronization with Coroutines

It’s a typical requirement for thread management to synchronize them. One thread prepares, in this case, a work-package another thread is waiting for.

I assume most of you use condition variables for this sender/receiver or producer/consumer workflow. Condition variables have many inherent risks, such as spurious wakeup and lost wakeup. Before I implement thread synchronization with coroutines, let me rephrase from a previous post about the inherent challenges of condition variables.

The Challenges of Condition Variables

Here is the pattern for the correct usage of condition variables.

// conditionVariables.cpp

#include <condition_variable>
#include <iostream>
#include <thread>

std::mutex mutex_;
std::condition_variable condVar; 

bool dataReady{false};

void waitingForWork(){
    std::cout << "Waiting " << std::endl;
    std::unique_lock<std::mutex> lck(mutex_);
    condVar.wait(lck, []{ return dataReady; });   // (4)
    std::cout << "Running " << std::endl;
}

void setDataReady(){
    {
        std::lock_guard<std::mutex> lck(mutex_);
        dataReady = true;
    }
    std::cout << "Data prepared" << std::endl;
    condVar.notify_one();                        // (3)
}

int main(){
    
  std::cout << std::endl;

  std::thread t1(waitingForWork);               // (1)
  std::thread t2(setDataReady);                 // (2)

  t1.join();
  t2.join();
  
  std::cout << std::endl;
  
}

How does the synchronization work? The program has two child threads: t1 and t2. They get their work package waitingForWork and setDataRead in lines (1 and 2). setDataReady notifies – using the condition variable condVar – that it is done with preparing the work: condVar.notify_one()(line 3). While holding the lock, thread t1 waits for its notification: condVar.wait(lck, []{ return dataReady; })( line 4). The sender and receiver need a lock. In the case of the sender a std::lock_guard is sufficient because it calls lock and unlock only once. In the case of the receiver, a std::unique_lock is necessary because it usually frequently locks and unlocks its mutex.

Finally, the output of the program.

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

Be part of my mentoring programs:

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (starts March 2024)
  • Do you want to stay informed: Subscribe.

     

    conditionVariable

    Maybe you are wondering why you need a predicate for the wait call because you can invoke wait without a predicate. This workflow seems quite too complicated for such a simple synchronization of threads.

    Now we are back to the missing memory condition variables and the two phenomena called lost wakeup and spurious wakeup.

    Lost Wakeup and Spurious Wakeup

    • Lost wakeup: The sender sends its notification before the receiver is in the wait state. The consequence is that the notification is lost.
    • Spurious wakeup: The receiver may wake up although no notification has happened.

    To become not the victim of these two issues, you have to use an additional predicate as memory. If not, you have, in this example, a 50/50 chance of a lost wakeup. A lost wakeup is essentially a deadlock because a thread waits for something that never happens.

    This is not the end of the traps you can fall into with condition variables. Read the details in the previous post: C++ Core Guidelines: Be Aware of the Traps of Condition Variables.

    Thanks to coroutines, thread synchronization is relatively easy without the inherent risks of condition variables such as spurious wakeups and lost wakeups.

    Thread Synchronization with co_await

    // senderReceiver.cpp
    
    #include <coroutine>
    #include <chrono>
    #include <iostream>
    #include <functional>
    #include <string>
    #include <stdexcept>
    #include <atomic>
    #include <thread>
    
    class Event {
     public:
    
        Event() = default;
    
        Event(const Event&) = delete;
        Event(Event&&) = delete;
        Event& operator=(const Event&) = delete;
        Event& operator=(Event&&) = delete;
    
        class Awaiter;
        Awaiter operator co_await() const noexcept;
    
        void notify() noexcept;
    
     private:
    
        friend class Awaiter;
      
        mutable std::atomic<void*> suspendedWaiter{nullptr};
        mutable std::atomic<bool> notified{false};
    
    };
    
    class Event::Awaiter {
     public:
      Awaiter(const Event& eve): event(eve) {}
    
      bool await_ready() const;
      bool await_suspend(std::coroutine_handle<> corHandle) noexcept;
      void await_resume() noexcept {}
    
     private:
        friend class Event;
    
        const Event& event;
        std::coroutine_handle<> coroutineHandle;
    };
    
    bool Event::Awaiter::await_ready() const {                            // (7)
      
        // allow at most one waiter
        if (event.suspendedWaiter.load() != nullptr){
            throw std::runtime_error("More than one waiter is not valid");
        }
      
        // event.notified == false; suspends the coroutine
        // event.notified == true; the coroutine is executed such as a usual function
        return event.notified;
    }
                                                                         // (8)
    bool Event::Awaiter::await_suspend(std::coroutine_handle<> corHandle) noexcept {
    
        coroutineHandle = corHandle;                                    
      
        if (event.notified) return false;
      
        // store the waiter for later notification
        event.suspendedWaiter.store(this);
    
        return true;
    }
    
    void Event::notify() noexcept {                                        // (6)
        notified = true;
      
        // try to load the waiter
        auto* waiter = static_cast<Awaiter*>(suspendedWaiter.load());
     
        // check if a waiter is available
        if (waiter != nullptr) {
            // resume the coroutine => await_resume
            waiter->coroutineHandle.resume();
        }
    }
    
    Event::Awaiter Event::operator co_await() const noexcept {
        return Awaiter{ *this };
    }
    
    struct Task {
        struct promise_type {
            Task get_return_object() { return {}; }
            std::suspend_never initial_suspend() { return {}; }
            std::suspend_never final_suspend() { return {}; }
            void return_void() {}
            void unhandled_exception() {}
        };
    };
    
    Task receiver(Event& event) {                                        // (3)
        auto start = std::chrono::high_resolution_clock::now();
        co_await event;                                                 
        std::cout << "Got the notification! " << std::endl;
        auto end = std::chrono::high_resolution_clock::now();
        std::chrono::duration<double> elapsed = end - start;
        std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
    }
    
    using namespace std::chrono_literals;
    
    int main(){
        
        std::cout << std::endl;
        
        std::cout << "Notification before waiting" << std::endl;
        Event event1{};
        auto senderThread1 = std::thread([&event1]{ event1.notify(); });  // (1)
        auto receiverThread1 = std::thread(receiver, std::ref(event1));   // (4)
        
        receiverThread1.join();
        senderThread1.join();
        
        std::cout << std::endl;
        
        std::cout << "Notification after 2 seconds waiting" << std::endl;
        Event event2{};
        auto receiverThread2 = std::thread(receiver, std::ref(event2));  // (5)
        auto senderThread2 = std::thread([&event2]{
          std::this_thread::sleep_for(2s);
          event2.notify();                                               // (2)
        });
        
        receiverThread2.join();
        senderThread2.join();
         
        std::cout << std::endl;
        
    }
    

    Thread synchronization with coroutines is straightforward from the user’s perspective. Let’s have a look at the program senderReiver.cpp. The threads senderThread1 (line 1) and senderThread2 (line 2) use an event to send its notification. The function receiver in line (3) is the coroutine which is executed in the thread receiverThread1 (line 4) and receiverThread2 (line 5). I measured the time between the coroutine’s beginning and end and displayed it. This number shows how long the coroutine waits. The following screenshot displays the output of the program with the Wandbox. The Compiler Explorer does not support thread creation, but Matt is “on it”.

    senderReceiver

    The output displays that the execution of the second coroutine takes about two seconds. The reason is that event1 sends its notification (line 1) before the coroutine is suspended, but event2 sends its notification after a time duration of 2 seconds (line 2).

    Now, I put the implementer’s hat on and give you a rough idea of the workflow of the coroutine framework.

    The simplified workflow

    There is a subtle difference if you compare the class Generator in the last post (C++20: An Infinite Data Stream with Coroutines) with the class Event in this example. In the first case, the Generator is the awaitable and the awaiter, in the second case, the Event uses the operator co_await to return the awaiter. This separation of concerns into the awaitable and the awaiter improves the structure of the code.

    In my explanation of both workflows, I assume in the first case, the event notification happens before the coroutine awaits the events. For the second case, I assume it is the other way around.

    Let’s first look at event1 and the first workflow. event1 send its notification before receiverThread1 is started. The call event1 (line 1) triggers the member function notify (line 6). First, the notification flag is set, and then, the call auto* waiter = static_cast<awaiter*>(suspendedWaiter.load()); loads the potential waiter. In this case, the waiter is a nullptr because it was not set before. This means the following resume call on the waiter is not executed. The subsequentially performed function await_ready (line 7) checks first if there is more than one waiter. In this case, I throw, for simplicity reasons, a std::runtime exception. The crucial part of this member function is the return value. event.notification was already set to true in the notify method. true means that the coroutine is not suspended and executes such as a usual function.

    In the second workflow, the co_await event2 call happens before event2 sends its notification. co_wait event2 triggers the call await_ready (line 7). The big difference to the first workflow is now that event.notified is false. This false value causes the suspension of the coroutine. Technically the member function await_suspend (lines 8) is executed. await_suspend gets the coroutine handle corHandle and stores it for later invocation in the variable coroutineHandle. Of course, later invocation means resumption. Secondly, the waiter is stored in the variable suspendedWaiter. When later event2.notify triggers its notification, the method notify (line 6) is executed. The difference to the first workflow is that the condition waiter != nullptr evaluates to true. The consequence is that the waiter uses the coroutineHandle to resume the coroutine.

    What’s next?

    If I have one conclusion to this and my last post (C++20: An Infinite Data Stream with Coroutines), then this one: Don’t implement your coroutines. Use existing coroutines such as the one available with cppcoro from Lewis Baker. I strictly follow this advice in my next post.

    Four Voucher for Educative

    There are four vouchers for educative to win: https://bit.ly/VoucherEducative. The vouchers allow you to access all educative.io courses for a quarter of a year.

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, moon, Philipp Lenk, Hobsbawm, and Charles-Jianye Chen.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Seminars

    I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

    Standard Seminars (English/German)

    Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

    • C++ – The Core Language
    • C++ – The Standard Library
    • C++ – Compact
    • C++11 and C++14
    • Concurrency with Modern C++
    • Design Pattern and Architectural Pattern with C++
    • Embedded Programming with Modern C++
    • Generic Programming (Templates) with C++
    • Clean Code with Modern C++
    • C++20

    Online Seminars (German)

    Contact Me

    Modernes C++ Mentoring,

     

     

    0 replies

    Leave a Reply

    Want to join the discussion?
    Feel free to contribute!

    Leave a Reply

    Your email address will not be published. Required fields are marked *