C++20 Coroutines Driving a Job System

I had this fiber-based job system which was partly written in assembly, and I badly wanted it to work; and it was working badly. So I decided to rewrite it in C++. In C++20, to be more precise. It was surprisingly pleasant.

The original job system was a nice addition to allow cooperative multitasking in Island, my R&D renderer. A key idea was to package up jobs which could potentially run in parallel into a task list, and then hand this task list over to a scheduler to execute on potentially multiple threads. Whoever puts a task list on the scheduler must wait for the full list to complete before resuming.

Fork Join Diagram
The Fork-Join pattern for parallelism.
Above: single-threaded execution.
Below: multi-threaded execution.
(illustration adapted from Wikipedia)

This follows the fork-join pattern, which is a relatively straightforward model for parallelism, but without having to pay the price for spinning up new threads, or heavy context switches. Since context switches were very cheap, long-running jobs could suspend midway through execution, allowing even more efficient use of all available CPU cores, by hiding memory latency.

Trouble on the Horizon 

Trouble was, as hinted earlier, that for Island’s job system, I was using a sprinkle of assembly to flip CPU contexts around. A bad idea.

Like many bad ideas, it started out of course looking like fun; just as the idea of juggling frozen snakes may, if seen from afar, project a certain allure. But close up, self-written, assembly-based stackful fibers turned out about as brittle as the previous herpetological simile. Especially debugging stack overflows with stacks that you happen to have to maintain yourself may only be pleasant to a subset of the brave, the masochistic, and the meticulous: a very loose Venn diagram. And to top it off, my code was not even portable outside amd64/Linux:

When I looked into porting the job system to Windows, I found that Visual Studio would not allow inline assembly for 64bit targets. I would have to use a separate assembler, or compiler intrinsics (yuck!), or something else… Fortunately, it turned out, there was something else.

Coroutines to the Rescue 

C++ coroutines have been lurking backstage for a few years, and with C++20 they finally appear mature enough to be available with all compilers that I currently care about for Island: GCC, Clang, and MSVC. They seem to have converged on an architectural sweet spot: in contrast to Go with its Goroutines, and my hand-coded assembly nightmare, C++ coroutines are stackless, which means suspend is only possible within the body of a coroutine; you cannot suspend within the body of a leaf function. On the plus-side, there is no need to manage your own stack. Lewiss Baker has written about this in great detail.

That sounds all very promising. But it will suck all joy out of the new implementation if it ends up very clunky, and hard to use correctly. We really want a nice API for our job system, and (to keep compile time low) we also don’t want to infect our header files with tons of header includes… Will C++ coroutines really be up for the job?

Now, before digging deep into the implementation details, let’s make sure that the surface looks nice. In doing this, I’m taking inspiration from Gor Nishanov, who says in his talk on coroutines (I paraphrase): the best way to make things look nice is to write your application code first, and then to see how you can do this with coroutines.

"When people say: How can I do this with Coroutines? I will say, well, think of the best way of solving this problem - just imagine the syntax… And then, you can make it work."

So let’s do just that: We want an API that allows us to add coroutines to our scheduler, and that, in a simple way, ensures that there is no forward progress until all the jobs in the batch have finished executing. Let’s see what I came up with.

The most ergonomic job system that I could think of:* 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Create a scheduler with 3 hardware threads if possible
Scheduler* scheduler = Scheduler::create(3); 

// Create an empty task list
TaskList tasks; 

tasks.add_task( 
    [](int idx, Scheduler* s) -> Task { 
        cout << "executing task" << idx << endl;
        // nearly anything can happen in here - 
        // you can call functions etc. just like inside a normal function.
        co_await suspend_task(); // Suspend this task
        cout << "resuming task" << idx << endl;
        co_return; // Note: co_return makes this a coroutine
    } (42, scheduler) // Important: These parameters will be **copied** into Task activation frame
    );

scheduler->wait_for_task_list(tasks); // schedule and execute tasks

cout << "Finished executing tasks" << endl;

delete scheduler; // explicitly delete scheduler

Now, a few notes about this API: if you look closely, you will notice that what we add to tasks is not a function pointer or a lambda, but the return value that we get from executing the lambda with given parameters. So does this lambda get executed at this point in time? Well, yes and no…

This is because what looks like a lambda is not a lambda, but more like a factory function for a coroutine, returning a coroutine handle. The coroutine handle is of type Task, and such a coroutine does not immediately execute, but it is initially suspended. How do I know? Because I made sure of this myself. But more on this later, as implementation details should not need to concern us for now.

Suspend 

Instead, let’s suspend our disbelief for a moment, and see what nice properties this API brings… For one, we could use the coroutine factory function to elegantly add lots of tasks if we wanted:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// ... 
auto create_task = [](int idx, Scheduler* s)-> Task { 
    cout << "executing task: " << idx << endl;
    co_await suspend_task();
    cout << "resuming task: " << idx << endl;
    co_return;
};

TaskList tasks;
// let's add 100 different tasks!
for (int i=0; i != 100; i++){
    tasks.add_task( create_task(i, scheduler) );
}
scheduler->wait_for_task_list(tasks); // schedule and execute tasks
// ...

Now, to make things more interesting, we could issue new tasks from inside of tasks. All we need is another task generator and a new TaskList object. We submit that to the Scheduler, and wait until we resume:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ...
auto create_outer_task = [](int idx, Scheduler* s)->task { 
    cout << "executing task: " << idx << endl;

    auto create_inner_task = [](int outer, int inner)->task { 
        cout << "  executing inner task: " << outer << ":" << inner << endl;
        co_await suspend_task();
        cout << "  resuming  inner task: " << outer << ":" << inner << endl;
        co_return;
    };

    TaskList inner_tasks;
    for (int j = 0; j != 20; j++){
        inner_tasks.add_tasks(create_inner_task(idx, j));
    }
    // Wait for inner tasks to complete
    s->wait_for_task_list(inner_tasks);

    co_await suspend_task();
    cout << "resuming task: " << idx << endl;
    co_return;
};

TaskList outer_tasks;
// Let's add 100 different tasks, each of which will add 
// inner tasks once it begins executing.
for (int i=0; i != 100; i++){
    outer_tasks.add_task( create_outer_task(i, scheduler) );
}
// Wait for outer tasks to complete (they will only complete if, in turn,
// their inner tasks have completed.)
scheduler->wait_for_task_list(outer_tasks);
// ...

… and Resume 

Now, remember that disbelief which we suspended earlier? Maybe now would be a good point to resume it, and to take a look at some of the things that need to happen under the surface to make this API a reality.

But how does it fork? 

C++20 coroutines are very powerful, and can be used in a lot of different ways, from implementing lazy generators, to state machines, to task graphs. To that end, the new coroutine elements of the language are relatively fine-grained, as they must give a lot of control: I understand them as building blocks to build tools with, and probably not to use directly; and this versatility is what makes coroutines a bit hard to understand at first.

Some of the complexity comes from C++ coroutines being able to produce (in C++ parlance: “yield”) return values, which for our job scheduler we are not interested in: we can blissfully ignore this…

Another big chunk of complexity comes from coroutines expressing their behaviour largely by specialising types, and this means, building tools on top of coroutines means working with the type system, and if we’re lucky, the compiler will magically generating some glue code. All this makes control flow not immediately obvious.

To keep things simple, I’m only going to focus on the subset of the C++ coroutine API that I used.

For our job system we need the following elements:

  1. A way to define a Task as Function + Parameters
  2. A way to put a Task initially in suspended state
  3. A way to allow a Task to suspend itself if it so wishes
  4. A way to resume a Task

Once we have our Task definition in place, we can look at the remaining two set pieces, namely the TaskList, and the Scheduler. These will complete the system.

1. Define a Task as Function + Parameters 

Our definition for Task uses a slightly strange syntax, but it is surprisingly powerful. When we say:

1
2
3
4
5
6
auto create_task = [](int idx, Scheduler* s)->task { 
    cout << "executing task: " << idx << endl;
    co_await suspend_task();
    cout << "resuming task: " << idx << endl;
    co_return;
    };

We create a generator function for coroutines - this means that whenever we execute the create_task function, the value of its parameters will be copied into the activation frame of the coroutine, the coroutine will be initially suspended, and we immediately receive a new handle to it. The handle is of type Task, and it allows us to resume the coroutine wherever and whenever we want.

2. Define Task in Initially Suspended State 

Why can I say that the coroutine will be initially suspended? Because I defined Task this way: Task is-a std::coroutine_handle of TaskPromise. And TaskPromise’s required member function TaskPromise::initial_suspend() returns an object conforming-to awaitable, which is-a std::suspend_always.

1
2
3
4
5
6
7
8
struct Task : std::coroutine_handle<TaskPromise> {
    using promise_type = ::TaskPromise;
};

struct TaskPromise {
    std::suspend_always initial_suspend() noexcept { return {}; }
    /* ... */
};

Now, what is this type, std::suspend_always, really? It is an adorable bit of scaffolding that conforms to the traits of an awaitable and must therefore implement a member function await_ready, which is implemented to return false come what may.

1
2
3
4
5
6
7
8
// 17.12.5 Trivial awaitables
/// [coroutine.trivial.awaitables]
struct suspend_always
{
    constexpr bool await_ready() const noexcept { return false; }
    constexpr void await_suspend(coroutine_handle<>) const noexcept {}
    constexpr void await_resume() const noexcept {}
};

To sum up: A Task’s promise has-a required function initial_suspend(), which returns an object conforming-to awaitable, which has-a member function await_ready which always-returns false … is what signals to the compiler that we want our Tasks initially suspended, thank you very much, with ice cream on top.

3. Optionally Suspend Tasks Mid-Way 

Now, apart from being able to begin their journey in suspension, a nice property of coroutines is that they may suspend themselves mid-way. To signal to the compiler where they wish to do this, the C++ Standard introduced a new operator, co_await. Inside our coroutine/Task payload function, it can be used like this:

1
2
3
// .. coroutine payload code (executed before suspension)
co_await my_customisable_type();
// .. coroutine payload code (executed after this coroutine gets resumed)
"Not again! Not another customisable type!"

Well, it’s not that tragic, we’ve already encountered the kind of type that is expected for the co_await operator: It needs to conform to the traits of an awaitable, and this time, one of the totally trivial standard awaitable types won’t do, but we’ll have to customise it slightly, and name it suspend_task:

1
2
3
4
5
struct suspend_task { // conforms to "awaitable" trait
     constexpr bool await_ready() noexcept { return false; };
     void await_suspend( std::coroutine_handle<::TaskPromise> coroutine_handle ) noexcept;
     void await_resume() noexcept {};
};

Not much to see here - in fact, on the surface it looks exactly like std::suspend_always, which might be the point of all this traits business… But under the surface, in the implementation file, is where we can do all sorts of interesting things:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void suspend_task::await_suspend( std::coroutine_handle<TaskPromise> handle ) noexcept {
     
     // ----------| Invariant: At this point the coroutine represented by `handle`
     // has been fully suspended. This is guaranteed by the c++ standard.
     
     auto& promise = handle.promise();
     
     auto& task_list = promise.p_task_list;
     
     // Put the current coroutine to the back of the scheduler queue
     // as it has been fully suspended at this point.
     task_list->push_task( promise.get_return_object() );
}

With this, we have everything in place to suspend our Tasks mid-flight, and resume them later.

And here is what happens if, inside of one of our Tasks, control reaches a statement saying:

1
2
3
// .. coroutine payload code (executed before suspension)
co_await suspend_task();
// .. coroutine payload code (executed after this coroutine gets resumed)

First, the Task containing the statement will get fully suspended.

Immediately after this, the compiler will call await_suspend on a temporary object of type suspend_task that was implicitly created in the statement. The await_suspend() function receives a handle to the suspended Task in its only parameter.

await_suspend then takes that coroutine handle and pushes it back onto the TaskList from where it came from, so that the Scheduler can resume the Task later.

4. Resume Task Execution 

Now since all coroutines start out initially suspended, how do they even begin execution? This one is simple - if we have a coroutine handle, we can either call its operator(), or explicitly call .resume() to pass control to it.

For the outcome to be predictable, however, the initiator of any resume() operations on Tasks must be a single object, the Scheduler. The Scheduler keeps track of groups of Tasks that may execute in parallel by keeping each group in a TaskList. These elements, TaskList, and Scheduler, are the two missing pieces that complete our system.

Scheduling in Detail: TaskList 

The TaskList is a ring-buffer storage for Task handles, with an associated workload counter. The TaskList owns all Task handles that are placed into it. Whenever we add such a handle to a TaskList, the TaskList increments its internal workload counter, so that it can keep track of how many Tasks must be completed.

We also add a back-reference to the TaskList to each Task, so that whenever a Task suspends, it can push itself back onto the end of its original TaskList to be resumed and completed later. Once the Task completes, it atomically decrements the workload counter of its TaskList.

Scheduling in Detail: Scheduler 

All work is done through the Scheduler, and carried out by its worker threads. To this end, the Scheduler maintains and owns a group of system threads that it keeps idle by default. Once the Scheduler is fed a TaskList to process via wait_for_task_list(), it will try to move any Tasks from the TaskList (starting at the front of the list) onto worker threads that sit idle.

If the main thread can’t find any idle workers, it will try to execute the Task itself. This means the thread that waits for the TaskList doesn’t just do the scheduling, but potentially contributes useful work in executing Tasks itself.

As the name of the function wait_for_task_list() suggests, it behaves just like a blocking call: forward progress only happens once all Tasks in the TaskList have completed. When the function returns, we must consider our TaskList consumed, and invalidated.

The main thread will repeat this loop of trying to assign Tasks to worker threads until it notices that the Task List’s workload counter has dropped to zero. If that is the case, this means that all tasks have completed execution, and that the main thread can return from wait_for_task_list().

Fork Join Diagram
The Scheduler processing a TaskList using two worker threads, one being the main thread (W0), the other one a dedicated worker thread (W1).

Note how Task A suspends midway and is placed back on the end of the TaskList, before resuming on a different worker thread.

Task Completion 

But who does atomically decrement the workload counter of a TaskList, once every Task has been completed? This is where the true arcane magic of C++ coroutines reveals itself:

In what amounts to a magnificent summons of the combined spirits of Rube Goldberg and Fischli & Weiss, this happens because the coroutine promise for Tasks was defined with a return type for the required method final_suspend() that returns a custom awaitable that I named finalize_task, which, in turn, declares a required method called await_suspend() which, finally, in its custom implementation, decrements the workload counter, immediately after the Task coroutine was suspended for the very last time after returning.

1
2
3
4
5
6
void finalize_task::await_suspend( std::coroutine_handle<TaskPromise> h ) noexcept {
     // This is the last time that this coroutine will be awakened
     // we do not suspend it anymore after this
     h.promise().p_task_list->decrement_task_count();
     h.destroy(); // are we allowed to destroy here?
}

Bonus: Tasks that spawn TaskLists 

And what with Tasks that spawn Tasks of their own? That’s of course possible - as long as a Task has access to the Scheduler, (and you can pass a pointer to the Scheduler as a parameter to a Task, so it is possible), it can create a new TaskList, then pass it to the Scheduler for execution, and await completion before continuing.

While the caller is waiting, the Scheduler will take the current Task execution context to distribute the Tasks from the given TaskList onto any idle worker threads. If there are no idle worker threads available, it will execute the Task itself; that way, we can ensure something akin to load balancing.

Eager Worker-Led Scheduling 

Once a Task completes, instead of returning control immediately to the Scheduler, it will try to pull another Task from its own TaskList, if there are any Tasks left in it. This helps to spread the load of scheduling a bit more fairly and equally over any available worker threads. More power to workers!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// this goes in towards the end of suspend_task::await_suspend
{
    // --- Eager Workers ---
    //
    // Eagerly try to fetch & execute the next task from the front
    // of the TaskList where it came from -
    // We do this so that multiple threads can share the
    // scheduling workload.
    //
    coroutine_handle_t c = task_list->pop_task();
    
    if ( c ) {
        c.resume();
    }
}

GitHub Repo 

Phew, what a ride - what started out as a show-and-tell for a nice job-system API slid into the morass of implementation details… Which of course only managed to scratch the surface. If you’re interested in (even) more details about how the elements discussed above work together, and how, for example, I’m avoiding spinlocking, I would point at the GitHub repository that contains the current version of the task system:

https://github.com/tgfrerer/pal_tasks

If you would like to post questions or comments, feel free to open an Issue on this repository :)

In retrospect 

Implementing this job-system with C++ coroutines sometimes felt like programming with callbacks, but using the type system to do so.

One reason for writing this post was to force myself to understand coroutines, not just well enough to write them, but well enough to write about them. At first, I found it pretty hard to figure out the type traits that make up std::coroutines, and how awaitables are key for how they interact, mostly because control flow is not immediately clear, and there is a lot of invisible, compiler-generated code involved.

That said, the traits-based syntax allows us to hide a lot of complexity from the header files, and therefore makes it possible to create a very nice, and clean user-facing API for a job-system, which counts for a lot. I still need to test this system quite a bit, while early performance tests with perf etc. look very promising.

I wonder, however, whether the stacklessness of C++ coroutines may make this job-system less versatile compared to a system with stackful coroutines, where we may suspend at any point, but at our peril… Time will tell. I have a feeling that “boring, but safe” could be a good thing in the end. 🐍



Tagged:

codecoroutinesc++job-system


RSS:

Find out first about new posts by subscribing to the RSS Feed


Further Posts:

Vulkan Video Decode: First Frames h.264 video island rendergraph synchronisation vulkan code
Vulkan Render-Queues and how they Sync island rendergraph synchronisation vulkan code
Rendergraphs and how to implement one island rendergraph vulkan code
Implementing Bitonic Merge Sort in Vulkan Compute code algorithm compute glsl island
Callbacks and Hot-Reloading Reloaded: Bring your own PLT code hot-reloading c assembly island
Callbacks and Hot-Reloading: Must JMP through extra hoops code hot-reloading c assembly island
Earth Normal Maps from NASA Elevation Data tutorial code
Using ofxPlaylist tutorial code
Flat Shading using legacy GLSL on OSX tutorial code