C++20: Thread Pools with cppcoro

Contents[Show]

This post is the third and final post in my miniseries to cppcoro. cppcoro is a library of coroutine abstractions from Lewis Baker. Today, I introduce thread pools.

 TimelineCpp20

To get the most out of this post, you should know my two previous posts to cppcoro.

Additionally to the cppcoro::sync_wait function which can be used to wait until the specified Awaitable completes, cppcoro offers the quite interesting cppcoro::when_all function.

when_all

  • when_all: creates an Awaitable, that waits for all its Input-Awaitables, and returns an aggregate of their individual results.

I simplified the definition of the function cpporo::when_all. The following example should help to get the first impression.

 

// cppcoroWhenAll.cpp

#include <chrono>
#include <iostream>
#include <thread>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/when_all.hpp>

using namespace std::chrono_literals;

cppcoro::task<std::string> getFirst() {
    std::this_thread::sleep_for(1s);                       // (3)
    co_return "First";
}

cppcoro::task<std::string> getSecond() {
     std::this_thread::sleep_for(1s);                      // (3)
    co_return "Second";
}

cppcoro::task<std::string> getThird() {
     std::this_thread::sleep_for(1s);                     // (3)
    co_return "Third";
}


cppcoro::task<> runAll() {
                                                          // (2)
    auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird());
    
    std::cout << fir << " " << sec << " " << thi << std::endl;
    
}

int main() {
    
    std::cout << std::endl;
    
    auto start = std::chrono::steady_clock::now();
    
    cppcoro::sync_wait(runAll());                          // (1)
    
    std::cout << std::endl;
    
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;   // (4)
    std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
    
    std::cout << std::endl;

}

The top-level task cppcoro::sync_wait(runAll()) (line 1) awaits the Awaitable runAll. runAll awaits the Awaitables getFirst, getSecond, and getThird (line 2). The Awaitables runAll, getFirst, getSecond, and getThird are coroutines. Each of the get functions sleeps for one second (line 3). Three times one second makes three seconds. This is the time the call cppcoro::sync_wait(runAll()) waits for the coroutines. Line 4 displays the time duration.

cppcoroWhenAll

Now, that you get the basics of the function cppcoro::when_all, let me add threads pools to it.

static_thread_pool

  • static_thead_pool: schedule work on a fixed-size pool of threads

cppcoro::static_thread_pool can be invoked with and without a number. The number stands for the number of threads that are created. If you don't specify a number, the C++11 function std::thread::hardware_concurrency() is used. std::thread::hardware_concurrency gives you a hint for the number of hardware threads supported by your system. This may be the number of processors or cores you have.

Let me try it out. The following example based on the previous one, executes the coroutines getFirst, getSecond, and getThird concurrently.

 

// cppcoroWhenAllOnThreadPool.cpp

#include <chrono>
#include <iostream>
#include <thread>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/static_thread_pool.hpp>
#include <cppcoro/when_all.hpp>


using namespace std::chrono_literals;

cppcoro::task<std::string> getFirst() {
    std::this_thread::sleep_for(1s);
    co_return "First";
}

cppcoro::task<std::string> getSecond() {
    std::this_thread::sleep_for(1s);
    co_return "Second";
}

cppcoro::task<std::string> getThird() {
    std::this_thread::sleep_for(1s);
    co_return "Third";
}

template <typename Func>
cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) {
    co_await tp.schedule();
    auto res = co_await func();
    co_return res;
}

cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) {
    
    auto[fir, sec, thi] = co_await cppcoro::when_all(    // (3)
        runOnThreadPool(tp, getFirst),
        runOnThreadPool(tp, getSecond), 
        runOnThreadPool(tp, getThird));
    
    std::cout << fir << " " << sec << " " << thi << std::endl;
    
}
    
int main() {
    
    std::cout << std::endl;
    
    auto start = std::chrono::steady_clock::now();

    cppcoro::static_thread_pool tp;                         // (1)
    cppcoro::sync_wait(runAll(tp));                         // (2)
    
    std::cout << std::endl;
    
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;    // (4)
    std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
    
    std::cout << std::endl;

}

Here are the crucial differences to the previous program cppcoroWhenAll.cpp. I created in line (1) a thread pool tp and used it as an argument for the function runAll(tp) (line 2). The function runAll uses the thread pool to start the coroutines concurrently. Thanks to structured binding (line 3), the values of each coroutine can be easily aggregated and assigned to a variable. On the end, the main function takes one instead of three seconds.

 cppcoroWhenAllOnThreadPool

Maybe you know, that we get with C++20 latches and barriers. Latches and barriers are thread synchronization mechanisms that enable some threads to block until a counter becomes zero. cppcoro also supports latches and barriers.

async_latch

  • async_latch: allows coroutines to asynchronously wait until a counter becomes zero

The following program cppcoroLatch.cpp shows thread synchronization with a cppcoro::async_latch.

 

// cppcoroLatch.cpp

#include <chrono>
#include <iostream>
#include <future>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/async_latch.hpp>
#include <cppcoro/task.hpp>

using namespace std::chrono_literals; 

cppcoro::task<> waitFor(cppcoro::async_latch& latch) {
    std::cout << "Before co_await" << std::endl;
    co_await latch;                              // (3)
    std::cout << "After co_await" << std::endl;
}

int main() {
    
    std::cout << std::endl;

    cppcoro::async_latch latch(3);              // (1)

// (2)
auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); }); auto counter1 = std::async([&latch] { // (2) std::this_thread::sleep_for(2s); std::cout << "counter1: latch.count_down() " << std::endl; latch.count_down(); }); auto counter2 = std::async([&latch] { // (2) std::this_thread::sleep_for(1s); std::cout << "counter2: latch.count_down(2) " << std::endl; latch.count_down(2); }); waiter.get(), counter1.get(), counter2.get(); std::cout << std::endl; }

 

I create the cppcoro::asynch_latch in line (1) and initialize the counter to 3. This time, I use std::async (line 2) to run the three coroutines concurrently. Each std::async call gets the latch per reference. The waitFor coroutine waits in line 3 until the counter becomes zero. The coroutine counter1 sleeps for 2 seconds before it counts down by 1. In contrast, the counter2 sleeps for 1 second and counts down by 2. The screenshot shows the interleaving of the threads.

cppcoroLatch

What's next?

So far, I have written about three of the big four of C++20: concepts, ranges, and coroutines. Modules are still missing in my tour through the big four and are the topic of my next posts.

By the way, if anyone wants to write a post to a C++20 feature I'm going to write about, please contact me. I'm happy to publish it and translate it into English/German if necessary.

 

 

Thanks a lot to my Patreon Supporters: Meeting C++, Matt Braun, Roman Postanciuc, Venkata Ramesh Gudpati, Tobias Zindl, Marko, G Prvulovic, Reinhold Dröge, Abernitzke, Richard Ohnemus, Frank Grimm, Sakib, Broeserl, António Pina, Markus Falkner, Darshan Mody, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, espkk, Wolfgang Gärtner, Jon Hess, Christian Wittenhorst, Louis St-Amour, and Stephan Roslen.

 

Thanks in particular to: Bitwyre Technologies

 

Thanks in particular to:   crp4

 

   

Get your e-book at Leanpub:

The C++ Standard Library

 

Concurrency With Modern C++

 

Get Both as one Bundle

cover   ConcurrencyCoverFrame   bundle
With C++11, C++14, and C++17 we got a lot of new C++ libraries. In addition, the existing ones are greatly improved. The key idea of my book is to give you the necessary information to the current C++ libraries in about 200 pages. I also included more than 120 source files.  

C++11 is the first C++ standard that deals with concurrency. The story goes on with C++17 and will continue with C++20.

I'll give you a detailed insight into the current and upcoming concurrency in C++. This insight includes the theory and a lot of practice with more than 140 source files.

 

Get my books "The C++ Standard Library" (including C++17) and "Concurrency with Modern C++" in a bundle.

In sum, you get more than 700 pages full of modern C++ and more than 260 source files presenting the standard library and concurrency in practice.

 

My Newest E-Books

Course: Modern C++ Concurrency in Practice

Course: C++ Standard Library including C++14 & C++17

Course: Embedded Programming with Modern C++

Course: Generic Programming (Templates)

Course: C++ Fundamentals for Professionals

Subscribe to the newsletter (+ pdf bundle)

Blog archive

Source Code

Visitors

Today 1569

Yesterday 5295

Week 1569

Month 206616

All 4827510

Currently are 148 guests and no members online

Kubik-Rubik Joomla! Extensions

Latest comments