C++20: Powerful Coroutines with cppcoro

Contents[Show]

I gave in my last post, "C++20: Coroutines with cppcoro", a basic introduction to the coroutines library from Lewis Baker. This introduction covered the elementary coroutines task and generator. Today, I add threads to tasks and get powerful abstractions. 

 TimelineCpp20

Do you remember the previous post "C++20: Thread Synchronization with Coroutines"? If no, I presented the challenges of a condition variable. A condition variable is a classical thread synchronization method, such as in a sender/receiver or a producer/consumer workflow. Condition variables have a big design flaw; they may be invoked without a notification (spurious wakeup) or overhear the notification (lost wakeup). In both cases, you may get a deadlock. My following example on thread synchronization based on coroutines didn't have the inherent risk of condition variables such as spurious or lost wakeup, but the example had another issue. It was too complicated.

Thanks to cppcoro, we can have the best of both worlds.: a straightforward event mechanism that does not have the design flaws of condition variables.

single_consumer_event

 

single_consumer_event is, according to the documentation, a simple manual-reset event type that supports only a single coroutine awaiting it at a time. This is precisely, what I need:

 

// cppcoroProducerConsumer.cpp

#include <cppcoro/single_consumer_event.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>

#include <future>
#include <iostream>
#include <string>
#include <thread>
#include <chrono>

cppcoro::single_consumer_event event;  

cppcoro::task<> consumer() {
    
    auto start = std::chrono::high_resolution_clock::now();
    
    co_await event;  // suspended until some thread calls event.set()
    
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;
    std::cout << "Consumer waited " << elapsed.count() << " seconds." << std::endl;
  
    co_return;
}

void producer() {

    using namespace std::chrono_literals;
    std::this_thread::sleep_for(2s);
    
    event.set();  // resumes the consumer  
    
}

int main() {
    
    std::cout << std::endl;
    
    auto con = std::async([]{ cppcoro::sync_wait(consumer()); });  // (1)
    auto prod = std::async(producer);                              // (2)
    
    con.get(), prod.get();
    
    std::cout << std::endl;
    
}

 

The code should be self-explanatory. The consumer (line 1) and the producer (line 2) run in their thread. The call cppcoro::sync_wait(consumer()) (line 1) serves as a top-level task because the main function cannot be a coroutine. The call waits until the coroutine consumer is done. The coroutine consumer waits in the call co_await event until someone calls event.set(). The function producer sends this event after a sleep of two seconds. 

cppcoroProducerConsumer

Thanks to the cppcoro library, threads can be canceled. 

 

Rainer D 6 P2 540x540Modernes C++ Mentoring

Be part of my mentoring programs:

 

 

 

 

Do you want to stay informed about my mentoring programs: Subscribe via E-Mail.

Cancellation

The caller and the callee communicate with the cppcoro::cancellation_token. The callee of the function that gets the request to cancel can respond in two ways.

  1. Poll at regular intervals for the request to cancel. The cppcoro::cancellation_token supports two member functions: is_cancellation_requested() and throw_if_cancellation_requested().
  2. Register a callback that is executed in case of a cancellation request.

The following example exemplifies the first use case.

 

// cppcoroCancellation.cpp

#include <chrono>
#include <iostream>
#include <future>

#include <cppcoro/cancellation_token.hpp>
#include <cppcoro/cancellation_source.hpp>

using namespace std::chrono_literals; 

int main() {
    
    std::cout << std::endl;
    
    cppcoro::cancellation_source canSource;
    cppcoro::cancellation_token canToken = canSource.token();  // (1)

    auto cancelSender = std::async([&canSource] { 
        std::this_thread::sleep_for(2s);
        canSource.request_cancellation();                      // (3)
        std::cout << "canSource.request_cancellation() " << std::endl;
    });
        
    auto cancelReceiver = std::async([&canToken] { 
        while(true) {
            std::cout << "Wait for cancellation request" << std::endl;
            std::this_thread::sleep_for(200ms);
            if (canToken.is_cancellation_requested()) return;  // (2)
        }
    });

    cancelSender.get(), cancelReceiver.get();
    
    std::cout << std::endl;

}

 

Line (1) shows the cancellation_token, created by the cancellation_source. The caller cancelSender gets the cancellation source canSource, and the callee cancelReceiver gets the cancellation token. The callee polls permanently for the cancellation request (line 2), which the caller sends via the call call.request_cancellation() (line 3) after two seconds.

cppcoroCancellation

I want to make two interesting observations.

  1. The cancellation is cooperative. If the callee ignores that cancellation request, nothing happens. 
  2. We get with C++20 an improved std::thread: std::jthread. std::jthread joins automatically in its destructor and can be in interrupted via an interrupt token. Read more details about the improved std::thread in my previous post: "A new Thread with C++20: std::jthread". 

cppcoro even supports a mutex. 

async_mutex

 

A mutex such as cppcoro::async_mutex is a synchronization mechanism to protect shared data from being accessed by multiple threads simultaneously.

 

// cppcoroMutex.cpp

#include <cppcoro/async_mutex.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>

#include <iostream>
#include <thread>
#include <vector>


cppcoro::async_mutex mutex;

int sum{};                                                                  // (2)

cppcoro::task<> addToSum(int num) {
    cppcoro::async_mutex_lock lockSum = co_await mutex.scoped_lock_async(); // (3)  
    sum += num;
  
}                                                                           // (4)

int main() {
    
    std::cout << std::endl;
    
    std::vector<std::thread> vec(10);                                       // (1)
    
    for(auto& thr: vec) {
        thr = std::thread([]{ for(int n = 0; n < 10; ++n) cppcoro::sync_wait(addToSum(n)); } );
    }
    
    for(auto& thr: vec) thr.join();
    
    std::cout << "sum: " << sum << std::endl;
    
    std::cout << std::endl;
    
}

 

Line (1) creates ten threads. Each thread adds the numbers 0 to 9 to the shared sum (line 2). The function addToSum is the coroutine. The coroutine waits in the expression co_await mutex.scoped_lock_async() (line 3) until the mutex is acquired. The coroutine that waits for the mutex is not blocked but suspended. The previous lock-holder resumes the waiting coroutine in its unlock call.   As its name suggests, the mutex stays locked until the end of the scope (line 4).

 

cppcoroMutex

What's next? 

Thanks to the function cppcoro::when_all, you can wait on one and more coroutines. I use cppcoro::when_all with cppcoro::static_thread_pool in my next post to compose powerful workflows.

 

Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Matthieu Bolt, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, and Rob North.

 

Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, and Slavko Radman.

 

 

My special thanks to Embarcadero CBUIDER STUDIO FINAL ICONS 1024 Small

 

My special thanks to PVS-Studio PVC Logo

 

My special thanks to Tipi.build tipi.build logo

 

My special thanks to Take Up code TakeUpCode 450 60

 

Seminars

I'm happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

Bookable (Online)

German

Standard Seminars (English/German)

Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

  • C++ - The Core Language
  • C++ - The Standard Library
  • C++ - Compact
  • C++11 and C++14
  • Concurrency with Modern C++
  • Design Pattern and Architectural Pattern with C++
  • Embedded Programming with Modern C++
  • Generic Programming (Templates) with C++

New

  • Clean Code with Modern C++
  • C++20

Contact Me

Modernes C++,

RainerGrimmDunkelBlauSmall

Stay Informed about my Mentoring

 

Mentoring

English Books

Course: Modern C++ Concurrency in Practice

Course: C++ Standard Library including C++14 & C++17

Course: Embedded Programming with Modern C++

Course: Generic Programming (Templates)

Course: C++ Fundamentals for Professionals

Course: The All-in-One Guide to C++20

Course: Master Software Design Patterns and Architecture in C++

Subscribe to the newsletter (+ pdf bundle)

All tags

Blog archive

Source Code

Visitors

Today 3871

Yesterday 4371

Week 39678

Month 169803

All 12057569

Currently are 146 guests and no members online

Kubik-Rubik Joomla! Extensions

Latest comments