A Coroutines-Based Single Consumer – Single Producer Workflow by Ljubic Damir

I’m happy to present in this guest post by Ljubic Damir a typical use-case for coroutines: a producer – consumer workflow.

Although this producer-consumer workflow is challenging, it is a nice starting point for your coroutine experiments.

Intro

Coroutines provide a more intuitive and structured way of writing asynchronous code by allowing you to write asynchronous operations in a procedural style. They are a feature introduced in C++20 to simplify asynchronous programming.

Pre-existing mechanisms like std::async tasks, std::packaged_task, or events (std::condition_variable & std::mutex), synchronize two or more threads on the result of the task by establishing a communication channel. This communication channel has two ends:

std::promise that writes into the shared state either the result or the exception, and

std::future (std::shared_future) – a receiving end that waits on the result of the task (or the exception).

Unlike this pre-existing mechanism, coroutines don‘t directly involve threads or other OS synchronization primitives. They are a pure software abstraction based on the coroutine’s control object and the state-machine logic built around it.

Coroutines are stackless – which means that the control object has to be created on the heap. Coincidentally, it‘s a library wrapper around the promise_type (std::coroutine_handle<promise_type>), but it doesn’t have anything in common with std::promise.

The promise_type is an interface (a customization point) that describes the predefined transition states in a coroutine’s state machine.

Coroutines are highly versatile and can be used in various scenarios where you must manage asynchronous message flow. One common example is socket-based communication.

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

Be part of my mentoring programs:

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (starts March 2024)
  • Do you want to stay informed: Subscribe.

     

    Today, I‘ll try to enlighten coroutines through another example: single producer – single consumer workflow.

    Implementation

    First, we need to define result type for the coroutine

    class[[nodiscard]] AudioDataResult final
    
    {
    
        public:
    
            class promise_type;
    
            using handle_type = std::coroutine_handle<promise_type>;
    
                    
    
            class promise_type
    
            {
    
                ...
    
            };
    
    };
    

    as a wrapper around the inner: promise_type type. We decorate the enclosing class with [[nodiscard]] attribute, since the result type is the control object of the coroutine that we return to the client code – to manage its suspension/resumption.

    @note The destructor of the class cleans the resources (dynamic memory) in RAII fashion, so strictly speaking, the return type could be ignored if there is no need to manage the coroutine state.

    ~AudioDataResult() { if(handle_) { handle_.destroy(); } }
    

    The result type is move-only: the copy operations are forbidden – to prevent the control object from being multiplicated.

    // Make the result type move-only, due to exclusive ownership over the handle
    
    AudioDataResult(const AudioDataResult& ) = delete;
    
    AudioDataResult& operator= (constAudioDataResult& ) = delete;
    
     
    
    AudioDataResult(AudioDataResult&& other) noexcept:
    
    handle_(std::exchange(other.handle_, nullptr))
    
    {}
    
     
    
    AudioDataResult& operator = (AudioDataResult&& other) noexcept
    
    {
    
        using namespace std;
    
        AudioDataResult tmp =std::move(other);
    
        swap(*this, tmp);
    
        return *this;
    
    }
    

    Let’s define the promise_type interface itself:

    // Predefined interface that has to be specify in order to implement
    
    // coroutine's state-machine transitions
    
    class promise_type
    
    {
    
            
    
        public:
    
                    
    
            using value_type = std::vector<int>;
    
     
    
            AudioDataResult get_return_object()
    
            {
    
                return AudioDataResult{ handle_type::from_promise(*this) };
    
            }
    
            std::suspend_never initial_suspend() noexcept { return{}; }
    
            std::suspend_always final_suspend() noexcept { return{}; }
    
            
    
            void return_void() {}
    
            void unhandled_exception()
    
            {
    
                std::rethrow_exception(std::current_exception());
    
            }
    
     
    
            // Generates the value and suspend the "producer"
    
            template <typename Data>
    
            requires std::convertible_to<std::decay_t<Data>, value_type>
    
            std::suspend_always yield_value(Data&& value)
    
            {
    
                data_ = std::forward<Data>(value);
    
                data_ready_.store(true, std::memory_order::relaxed);
    
                return {};
    
            }
    
     
    
        private:
    
            value_type data_;
    
            std::atomic<bool> data_ready_;
    
    };//promise_type interface
    

    The promise_type defines the necessary infrastructure of the coroutine. Additionally, for any coroutines that want to act as a generator – “producer“, to yield the values: promise_type has to be enhanced with the yield_value method (co_yield ≡ co_await promise_.yield_value). Also, to resume the producer, at the point when provided data are consumed – we need to expose the appropriate wrapper method resume():

    void resume() { if( not handle_.done()) { handle_.resume();} }
    

    Now – we need to extend the coroutine to support the consumer requirements: to be synchronized on the data readiness. In other words, the consumer will be suspended until the data are signaled as available by the producer. For that, we need to implement the Awaiter interface:

    class promise_type
    
    {
    
        // Awaiter interface: for consumer waiting on data being ready
    
        struct AudioDataAwaiter
    
        {
    
                                                    
    
            explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}
    
     
    
            bool await_ready() const
    
            {
    
                return promise_.data_ready_.load(std::memory_order::relaxed);
    
            }
    
                                            
    
            void await_suspend(handle_type) const
    
            {
    
                while( not promise_.data_ready_.exchange(false))
    
                {
    
                    std::this_thread::yield();
    
                }
    
            }
    
           
    
            // move assignment at client invocation side:
    
            //        const auto data = co_await audioDataResult;
    
            // This requires that coroutine's result type provides the co_await unary operator
    
     
    
            value_type&& await_resume() const
    
            {
    
                return std::move(promise_.data_);
    
            }
    
     
    
        private:
    
                promise_type& promise_;
    
     
    
        };//Awaiter interface
    
     
    
    };//promise_type
    

    In its state machine, the await_ready() will be the first transition state: it will be inspected on data readiness. If the data are not ready, as next the await_suspend() will be called. Here we wait – until the matching flag is being set. Finally, the await_resume() will be called: we “move” the value from the promise_type, by unconditionally cast to the rvalue reference. On the client invocation side, this will cause the move assignment operator on the returned value – data to be called.

    const auto data = co_await audioDataResult;
    

    For that to work, the result type must provide the co_await unary operator, which returns our Awaiter interface.

    class AudioDataResult
    
    {
    
        auto operator co_await() noexcept
    
        {
    
            return promise_type::AudioDataAwaiter{handle_.promise()};
    
        }
    
    };
    

    <Example 1>:  https://godbolt.org/z/MvYfbEP8r

    The following program producerConsumer.cpp shows a simplified version of example 1:

    // producerConsumer.cpp
    
    #include <algorithm>
    #include <atomic>
    #include <chrono>
    #include <coroutine>
    #include <functional>
    #include <iostream>
    #include <iterator>
    #include <memory>
    #include <source_location>
    #include <thread>
    #include <utility>
    #include <vector>
    
    
    void funcName(const std::source_location location = std::source_location::current()) {
        std::cout << location.function_name() << '\n';
    }
    
    
    template <typename Container>
    void printContainer(const Container& container)
    {
        typedef typename Container::value_type value_type;
        auto first = std::cbegin(container);
        auto last = std::cend(container);
    
        std::cout << " [";
        std::copy(first, std::prev(last), std::ostream_iterator<value_type>(std::cout, ", "));
        std::cout << *std::prev(last) << "]\n";
    }
    
    
    
    
    class [[nodiscard]] AudioDataResult final
    {
        public:
            class promise_type;
            using handle_type = std::coroutine_handle<promise_type>;
            
            // Predefined interface that has to be specify in order to implement
            // coroutine's state-machine transitions
            class promise_type 
            {
                
                public:
                    
                    using value_type = std::vector<int>;
    
                    AudioDataResult get_return_object() 
                    {
                        return AudioDataResult{handle_type::from_promise(*this)};
                    }
                    std::suspend_never initial_suspend() noexcept { return {}; }
                    std::suspend_always final_suspend() noexcept { return {}; }
                    void return_void() {}
                    void unhandled_exception() 
                    {
                        std::rethrow_exception(std::current_exception());
                    }
    
                    // Generates the value and suspend the "producer"
                    template <typename Data>
                    requires std::convertible_to<std::decay_t<Data>, value_type>
                    std::suspend_always yield_value(Data&& value) 
                    {
                        data_ = std::forward<Data>(value);
                        data_ready_.store(true);
                        return {};
                    }
    
                    // Awaiter interface: for consumer waiting on data being ready
                    struct AudioDataAwaiter 
                    {
                        explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}
    
                        bool await_ready() const { return promise_.data_ready_.load();}
                        
                        void await_suspend(handle_type) const
                        {
                            while(not promise_.data_ready_.exchange(false)) {
                                 std::this_thread::yield(); 
                            }
                        }
                        // move assignment at client invocation side: const auto data = co_await audioDataResult;
                        // This requires that coroutine's result type provides the co_await unary operator
                        value_type&& await_resume() const 
                        {
                            return std::move(promise_.data_);
                        }
    
                        private: 
                            promise_type& promise_;
                    };//Awaiter interface
    
            
                private:
                    value_type data_;
                    std::atomic<bool> data_ready_;
            }; //promise_type interface
    
            
            auto operator co_await() noexcept   
            {
                return promise_type::AudioDataAwaiter{handle_.promise()};
            }
    
            // Make the result type move-only, due to ownership over the handle
            AudioDataResult(const AudioDataResult&) = delete;
            AudioDataResult& operator=(const AudioDataResult&) = delete;
    
            AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)){}
            AudioDataResult& operator=(AudioDataResult&& other) noexcept 
            {
                using namespace std;
                AudioDataResult tmp = std::move(other);
                swap(*this, tmp);
                return *this;
            }
    
            // d-tor: RAII
            ~AudioDataResult() { if (handle_) {funcName(); handle_.destroy();}}
    
            // For resuming the producer - at the point when the data are consumed
            void resume() {if (not handle_.done()) { funcName(); handle_.resume();}}
        
        private:
            AudioDataResult(handle_type handle) noexcept : handle_(handle) {}
    
        private:
        handle_type handle_;
    };
    
    
    using data_type = std::vector<int>;
    AudioDataResult producer(const data_type& data) 
    {
        for (std::size_t i = 0; i < 5; ++i) {
            funcName();
            co_yield data;
        }
        co_yield data_type{}; // exit criteria
    }
    
    AudioDataResult consumer(AudioDataResult& audioDataResult) 
    {
        while(true)
        {
            funcName();
            const auto data = co_await audioDataResult;
            if (data.empty()) {std::cout << "No data - exit!\n"; break;}
            std::cout << "Data received:";
            printContainer(data);
            audioDataResult.resume(); // resume producer
        }
    }
    
    int main() 
    {
        {
            const data_type data = {1, 2, 3, 4};
            auto audioDataProducer = producer(data);
            std::thread t ([&]{auto audioRecorded = consumer(audioDataProducer);});
            t.join();
        }
    
        std::cout << "bye-bye!\n";
    }
    

    Finally, here is the output of the program:

    The other way around is to use the promise_type::await_transform() – to wait on the value stored in the promise_type instance used by the producer.

    class promise_type
    
    {
    
        auto await_transform(handle_type other)
    
        {
    
            // Awaiter interface: remained the same
    
            struct AudioDataAwaiter
    
            {
    
                explicit AudioDataAwaiter(promise_type& promise)noexcept: promise_(promise) {}
    
                ...
    
            };
    
        
    
            return AudioDataAwaiter{other.promise()};
    
        }
    
    };
    

    This way, we don’t need to specify the co_await unary operator of the result type anymore, but rather (explicit) conversion operator

    explicit operator handle_type() const {return handle_;}
    

    so that we can pass it at the point when the consumer calls co_await, which will internally be translated to the await_transform() call.

    const auto data = co_await static_cast<AudioDataResult::handle_type>(audioDataResult);
    

    We can illustrate this as:   me.handle_.promise().await_transform(other.handle_)

    <Example 2>:  https://godbolt.org/z/57zsK9rEn

    Conclusion

    In this simple example, the producer will be suspended without any penalty – since, after being resumed, it will provide the very same – upfront known sequence of data. In a real scenario, that is likely not the case: the producer will probably be some kind of mediator – receiver of asynchronously emitted data that will be retransmitted to the consumer. For that, some queuing logic needs to be implemented at the producer side to avoid the data loss at the point of being suspended, waiting for the consumer to resume it – to compensate for the differences in producer data arrival vs. consumer consumption rate.

    What’s Next?

    In C++20, you can define or default the three-way comparison operator. This gives you all six comparison operators: ==, !=, <, <=, >, and >=. Do you know that you can also define or default the equal operator (==) ?

    A Short Christmas Break

    My blog makes a short Christmas break. The next post will be published on the 8th January 2024. Have a good time.

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, Philipp Lenk, Charles-Jianye Chen, Keith Jeffery,and Matt Godbolt.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Seminars

    I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

    Standard Seminars (English/German)

    Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

    • C++ – The Core Language
    • C++ – The Standard Library
    • C++ – Compact
    • C++11 and C++14
    • Concurrency with Modern C++
    • Design Pattern and Architectural Pattern with C++
    • Embedded Programming with Modern C++
    • Generic Programming (Templates) with C++
    • Clean Code with Modern C++
    • C++20

    Online Seminars (German)

    Contact Me

    Modernes C++ Mentoring,