An Advanced Priority Scheduler for Coroutines

Today, I use the straightforward scheduler from the post “A Priority Scheduler for Coroutines” and improve its priority handling.

This is the fourth post in my miniseries about schedulers for C++ coroutines. The first two posts were guest posts by Dian-Lun Lin:

Dian-Lun’s schedulers were based on the container adaptor std::stack and std::queue. The std::stack schedules its tasks according to its strategy last-in first-out, but the std::queue applies first-in first-out. Finally, my std::priority_queue-based scheduler in the last post supports the prioritization of tasks.

In this post, I will improve the last scheduler in two ways. First, let me play with the comparison function of the std::priority_queue.

Comparator

The priority queue has its greatest element always at the top. std::priority_queue uses by default the comparison operator std::less. The following scheduler has the comparator as template parameter.

// priority_queueSchedulerComparator.cpp

#include <concepts>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <utility>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle}{}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

using job = std::pair<int, std::coroutine_handle<>>;              // (1)

template <typename Comparator = std::ranges::less>                // (2)
requires std::predicate<decltype(Comparator()), job, job>         // (3)
class Scheduler {

  std::priority_queue<job, std::vector<job>, Comparator> _prioTasks;

  public: 

    void emplace(int prio, std::coroutine_handle<> task) {
      _prioTasks.push(std::make_pair(prio, task));
    }

    void schedule() {
      while(!_prioTasks.empty()) {
        auto [prio, task] = _prioTasks.top();
        _prioTasks.pop();
        task.resume();

        if(!task.done()) { 
          _prioTasks.push(std::make_pair(prio, task));            // (6)
        }
        else {
          task.destroy();
        }
      }
    }

};

Task createTask(const std::string& name) {
  std::cout << name << " start\n";
  co_await std::suspend_always();
  std::cout << name << " execute\n";
  co_await std::suspend_always();
   std::cout << name << " finish\n";
}


int main() {

  std::cout << '\n';

  Scheduler scheduler1;                                              // (4)

  scheduler1.emplace(0, createTask("TaskA").get_handle());            
  scheduler1.emplace(1, createTask("  TaskB").get_handle());

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler<std::ranges::greater> scheduler2;                        // (5)

  scheduler2.emplace(0, createTask("TaskA").get_handle());            
  scheduler2.emplace(1, createTask("  TaskB").get_handle());

  scheduler2.schedule();

  std::cout << '\n';

}

Please read the three previous posts if this scheduler is too overwhelming for you. First, I call the combination of a priority and a task a job (line 1). The scheduler takes a template parameter Comparator that is defaulted to std::ranges::less (line 2). Additionally, the concept std:::predicate checks in line (3), if the predicate can be invoked with two jobs.

scheduler1 (line 5) starts with the highest prioritized job, but scheduler2 (line 6) with the lowest prioritized.

In line 6, I push back the job onto the scheduler. Wouldn’t it be nice if I could update the priority of a pushed-back job?

Updating the Priority

The following scheduler can additionally update the priority of a pushed-back job.

// priority_queueSchedulerPriority.cpp

#include <concepts>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <utility>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle}{}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

using job = std::pair<int, std::coroutine_handle<>>;

template <typename Updater = std::identity,                         // (1)
          typename Comperator = std::ranges::less>                   
requires std::invocable<decltype(Updater()), int> &&                // (2)
         std::predicate<decltype(Comperator()), job, job>             
class Scheduler {

  std::priority_queue<job, std::vector<job>, Comperator> _prioTasks;

  public: 
    void emplace(int prio, std::coroutine_handle<> task) {
      _prioTasks.push(std::make_pair(prio, task));
    }

    void schedule() {
      Updater upd = {};                                            // (3)
      while(!_prioTasks.empty()) {
        auto [prio, task] = _prioTasks.top();
        _prioTasks.pop();
        task.resume();

        if(!task.done()) { 
          _prioTasks.push(std::make_pair(upd(prio), task));          // (4)
        }
        else {
          task.destroy();
        }
      }
    }

};


Task createTask(const std::string& name) {
  std::cout << name << " start\n";
  co_await std::suspend_always();
  for (int i = 0; i <= 3; ++i ) { 
    std::cout << name << " execute " << i << "\n";                  // (5)
    co_await std::suspend_always();
  }
  co_await std::suspend_always();
  std::cout << name << " finish\n";
}


int main() {

  std::cout << '\n';

  Scheduler scheduler1;                                               // (6)

  scheduler1.emplace(0, createTask("TaskA").get_handle());
  scheduler1.emplace(1, createTask("  TaskB").get_handle());
  scheduler1.emplace(2, createTask("    TaskC").get_handle());

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler<decltype([](int a) { return a - 1; })> scheduler2;        // (7)

  scheduler2.emplace(0, createTask("TaskA").get_handle());
  scheduler2.emplace(1, createTask("  TaskB").get_handle());
  scheduler2.emplace(2, createTask("    TaskC").get_handle());

  scheduler2.schedule();

  std::cout << '\n';

}

Only a few modifications on the scheduler in the program priority_queueSchedulerComparator.cpp are necessary to support updating priorities.

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

Be part of my mentoring programs:

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (open)
  • Do you want to stay informed: Subscribe.

     

    First, the scheduler gets an additional template parameter Updater, (line 1) that is defaulted to std::identity. Updater must be invocable and take an int. Of course, std::invocable is a concept (line 2). The Updater is created in line (3) and applied in line (4). Additionally, the coroutine displays in line (5) which part of the task is executed. scheduler1 (line 6) performs its job, starting with the highest priority, but scheduler2 (line 7) decreases the priority of a pushed-back job by one. I use a lambda as invocable.

    The output of the program shows the different scheduling strategies.

    What’s Next?

    Coroutines provide an intuitive way of writing asynchronous code. My next post will be a guest post from Ljubic Damir, presenting a single producer – single consumer workflow based on coroutines.

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, Philipp Lenk, Charles-Jianye Chen, Keith Jeffery,and Matt Godbolt.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Seminars

    I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

    Standard Seminars (English/German)

    Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

    • C++ – The Core Language
    • C++ – The Standard Library
    • C++ – Compact
    • C++11 and C++14
    • Concurrency with Modern C++
    • Design Pattern and Architectural Pattern with C++
    • Embedded Programming with Modern C++
    • Generic Programming (Templates) with C++
    • Clean Code with Modern C++
    • C++20

    Online Seminars (German)

    Contact Me

    Modernes C++ Mentoring,