A Priority Scheduler for Coroutines

In this post, I will extend the straightforward scheduler from Dian-Lun with priorities.

This is the third post in my mini-series about schedulers for C++ coroutines. The first two posts were guest posts by Dian-Lun Lin:

Dian-Lun’s schedulers were based on the container adaptor std::stack and std::queue. The std::stack schedules its tasks according to its strategy last-in first-out, but the std::queue applies first-in first-out.

The following code snippet shows the queue-based scheduler:

class Scheduler {

  std::queue<std::coroutine_handle<>> _tasks;

  public: 

    void emplace(std::coroutine_handle<> task) {
      _tasks.push(task);
    }

    void schedule() {
      while(!_tasks.empty()) {
        auto task = _tasks.front();
        _tasks.pop();
        task.resume();

        if(!task.done()) { 
          _tasks.push(task);
        }
        else {
          task.destroy();
        }
      }
    }

    auto suspend() {
      return std::suspend_always{};
    }
};

Extending this scheduler with priorities is pretty straightforward.

A Priority-Queue based Scheduler

std::priority_queue is besides std::stack, and std::queue the third container adaptor in C++98.

The std::priority_queue is a similar to a std::queue. The main difference to the std::queue is that their greatest element is always at the top of the priority queue. std::priority_queue uses by default the comparison operator std::less. The lookup time into a std::priority_queue is constant, but the insertion and extraction are logarithmic.

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

Be part of my mentoring programs:

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (starts March 2024)
  • Do you want to stay informed: Subscribe.

     

    Let me exchange the std::queue in the previous scheduler with a std::priority_queue:

    // priority_queueScheduler.cpp
    
    #include <coroutine>
    #include <iostream>
    #include <queue>
    #include <utility>
    
    
    struct Task {
    
      struct promise_type {
        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
    
        Task get_return_object() { 
            return std::coroutine_handle<promise_type>::from_promise(*this); 
        }
        void return_void() {}
        void unhandled_exception() {}
      };
    
      Task(std::coroutine_handle<promise_type> handle): handle{handle}{}
    
      auto get_handle() { return handle; }
    
      std::coroutine_handle<promise_type> handle;
    };
    
    class Scheduler {
                                                                // (1)
      std::priority_queue<std::pair<int, std::coroutine_handle<>>> _prioTasks;
    
      public: 
    
        void emplace(int prio, std::coroutine_handle<> task) {   // (2)
          _prioTasks.push(std::make_pair(prio, task));
        }
    
        void schedule() {
          while(!_prioTasks.empty()) {                           // (3)
            auto [prio, task] = _prioTasks.top();
            _prioTasks.pop();
            task.resume();
    
            if(!task.done()) { 
              _prioTasks.push(std::make_pair(prio, task));        // (4)
            }
            else {
              task.destroy();
            }
          }
        }
    
        auto suspend() {
          return std::suspend_always{};
        }
    };
    
    
    Task TaskA(Scheduler& sch) {
      std::cout << "Hello from TaskA\n";
      co_await sch.suspend();
      std::cout << "Executing the TaskA\n";
      co_await sch.suspend();
      std::cout << "TaskA is finished\n";
    }
    
    Task TaskB(Scheduler& sch) {
      std::cout << "Hello from TaskB\n";
      co_await sch.suspend();
      std::cout << "Executing the TaskB\n";
      co_await sch.suspend();
      std::cout << "TaskB is finished\n";
    }
    
    
    int main() {
    
      std::cout << '\n';
    
      Scheduler scheduler1;
    
      scheduler1.emplace(0, TaskA(scheduler1).get_handle());       // (5)   
      scheduler1.emplace(1, TaskB(scheduler1).get_handle());
    
      scheduler1.schedule();
    
      std::cout << '\n';
    
      Scheduler scheduler2;
    
      scheduler2.emplace(1, TaskA(scheduler2).get_handle());      // (6)
      scheduler2.emplace(0, TaskB(scheduler2).get_handle());
    
      scheduler2.schedule();
    
      std::cout << '\n';
    
    }
    

    First, the std::priority_queue uses a pair (priority, handle) (line 1). Now, this pair is placed on the _prioTask (line 2). When the scheduler runs, it checks if the _prioTask is empty (line 3). If not, the first task is accessed, removed, and resumed. When the task is not done, it is pushed back onto the _prioTasks (line 4).

    Using a std::priority_queue<std::pair<int, std::coroutine_handle<>>> has the nice side-effect, that tasks with higher priority run first. It makes no difference, in which order the tasks are placed on the scheduler (lines 5 and 6); the task with priority 1 runs first.

    Let me simplify the coroutine, before I improve its priority handling in my next post.

    The Simplified Coroutine

    Here are the previous coroutines TaskA and TaskB:

    Task TaskA(Scheduler& sch) {
      std::cout << "Hello from TaskA\n";
      co_await sch.suspend();
      std::cout << "Executing the TaskA\n";
      co_await sch.suspend();
      std::cout << "TaskA is finished\n";
    }
    
    Task TaskB(Scheduler& sch) {
      std::cout << "Hello from TaskB\n";
      co_await sch.suspend();
      std::cout << "Executing the TaskB\n";
      co_await sch.suspend();
      std::cout << "TaskB is finished\n";
    }
    

    First, instead of calling co_await on the scheduler, I replace it with the direct call of the predefined awaitable std::suspend_always. This allows me to remove the suspend member function of the scheduler. Second, the coroutine gets the name of its task:

    Task createTask(const std::string& name) {
      std::cout << name << " start\n";
      co_await std::suspend_always();
      std::cout << name << " execute\n";
      co_await std::suspend_always();
      std::cout << name << " finish\n";
    }
    

    Finally, here is the simplified program with the corresponding output.

    // priority_queueSchedulerSimplified.cpp
    
    #include <coroutine>
    #include <iostream>
    #include <queue>
    #include <utility>
    
    
    struct Task {
    
      struct promise_type {
        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
    
        Task get_return_object() { 
            return std::coroutine_handle<promise_type>::from_promise(*this); 
        }
        void return_void() {}
        void unhandled_exception() {}
      };
    
      Task(std::coroutine_handle<promise_type> handle): handle{handle}{}
    
      auto get_handle() { return handle; }
    
      std::coroutine_handle<promise_type> handle;
    };
    
    class Scheduler {
    
      std::priority_queue<std::pair<int, std::coroutine_handle<>>> _prioTasks;
    
      public: 
    
        void emplace(int prio, std::coroutine_handle<> task) {
          _prioTasks.push(std::make_pair(prio, task));
        }
    
        void schedule() {
          while(!_prioTasks.empty()) {
            auto [prio, task] = _prioTasks.top();
            _prioTasks.pop();
            task.resume();
    
            if(!task.done()) { 
              _prioTasks.push(std::make_pair(prio, task));
            }
            else {
              task.destroy();
            }
          }
        }
    
    };
    
    
    Task createTask(const std::string& name) {
      std::cout << name << " start\n";
      co_await std::suspend_always();
      std::cout << name << " execute\n";
      co_await std::suspend_always();
      std::cout << name << " finish\n";
    }
    
    
    int main() {
    
      std::cout << '\n';
    
      Scheduler scheduler1;
    
      scheduler1.emplace(0, createTask("TaskA").get_handle());
      scheduler1.emplace(1, createTask("  TaskB").get_handle());
    
      scheduler1.schedule();
    
      std::cout << '\n';
    
      Scheduler scheduler2;
    
      scheduler2.emplace(1, createTask("TaskA").get_handle());
      scheduler2.emplace(0, createTask("  TaskB").get_handle());
    
      scheduler2.schedule();
    
      std::cout << '\n';
    
    }
    

    What’s Next?

    In my next post, I will improve the priority handling of the tasks.

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Matthieu Bolt, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, moon, and Philipp Lenk.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Seminars

    I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

    Standard Seminars (English/German)

    Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

    • C++ – The Core Language
    • C++ – The Standard Library
    • C++ – Compact
    • C++11 and C++14
    • Concurrency with Modern C++
    • Design Pattern and Architectural Pattern with C++
    • Embedded Programming with Modern C++
    • Generic Programming (Templates) with C++
    • Clean Code with Modern C++
    • C++20

    Online Seminars (German)

    Contact Me

    Modernes C++ Mentoring,