Asynchronous Task Dependency Management with Promises and Futures
Simple task dependency management in C++
I was recently working on a project which involved performing a number of tasks
asynchronously, with the constraint that the tasks could be performed in any
order, but they must “commit” in a specific order based on a dependency chain.
For example, the program might kick off a series of tasks with the dependencies
A->B->C->D
(where X->Y
means ‘Y is dependent on X’) on separate threads. These
tasks will do some processing and then update a global structure with their
results. The processing can be done fully in parallel, with no synchronization
required. However, the updates to the global structure must be in done
dependency order (so A
, then B
, and so on). In pseudocode, then these tasks
might look like,
routine: Task(args, depends_on[optional])
result := process(args)
if (defined(depends_on)) then
wait_for(depends_on)
end
commit(result)
end routine
There are a number of ways to achieve this effect. Probably the simplest would be to have a central manager thread which handles the committing of the results, and enforces the relevant order at that point. But I wanted a decentralized approach that would allow my scheduler to, more or less, “fire and forget” when it launched the task threads. This meant that the threads needed to figure out the commit order amongst themselves.
I decided to take this opportunity to play around with C++11’s std::promise abstraction. As it turned out, this worked exceedingly well and made for a straightforward task ordering implementation, which required much less code than would have been necessary had I used condition variables or semaphores.
Introduction to Promises and Futures
Promises and Futures are two closely related abstractions that are designed for use in asynchronous interfaces. Asynchronous interfaces are ones which initiate a task (usually something expensive like an IO) in the background and return to the caller immediately. They then provide some faculty for allowing the caller to access the result of the background task at some point in the future, when it has finished. This is in contrast to synchronous interfaces, which will not return control to the caller until the result is ready.
In C++, an std::promise
object is used to store the returned value from an
asynchronous interface and an std::future
is used to retrieve that value. The
asynchronous interface will return a future to the caller, which is associated
with a promise in the asynchronous thread itself. Once the asynchronous
thread’s output is ready, it will use the promise to “set” the value, which
will then be retrievable in the caller via the future. When the caller accesses
the value via the future, it will block until the value is ready. This allows
the asynchronous interface to spin up a thread to do the work and return
immediately to the caller, who can then do other work and only block on the
value when absolutely necessary.
Here is a simple example of using promises and futures to construct an asynchronous interface,
/* header files containing std::promise, std::future, and std::thread */
#include <future>
#include <thread>
/* I'm a C programmer at heart... don't judge me! */
#include <cstdlib>
#include <cstdio>
#include <unistd.h>
std::thread worker_thrd;
void worker(int time, std::promise<int> result_prom) {
/* "do some work" that takes a random amount of time. */
sleep(rand() % time);
/* return the result via the promise */
result_prom.set_value(100);
}
std::future<int> do_async_job(int arg) {
std::promise<int> result_prom;
std::future<int> result_fut = result_prom.get_future();
/* when passing the promise as an argument, we must use move semantics */
worker_thrd = std::thread(worker, arg, std::move(result_prom));
return result_fut;
}
int main(int argc, char **argv) {
/* call asynchronous interface */
auto res = do_async_job(10);
/*
* the above call will return as soon as the thread
* is spawned, so we are free to do "other work"
* while we wait
*/
sleep(rand() % 5);
/* finally, access the async result; blocking if necessary */
auto x = res.get();
fprintf(stdout, "result: %d\n", x);
worker_thrd.join();
return EXIT_SUCCESS;
}
I hope you can see just how powerful this abstraction is. In particular, it can be
used to design some incredibly easy to use asynchronous interfaces (as seen by how
simple the main
is above), which don’t
require the user to have to deal with callbacks, manual polling, etc.
Using Promises and Futures for Dependency Tracking
This same abstraction can be used to conveniently allow threads to block on the
completion of other threads by communicating the completion status of one
thread to another via a promise and future. std::promise
supports a void
specialization that can be used to send signals without passing around any
specific values.
I’ll represent each task’s state using a struct, which will package the task arguments together with a promise and a future.
struct Task {
int arg;
/* used to signal dependent threads on Task completion */
std::promise<void> prom;
/* if false, do not block on a future */
bool has_dependence;
/* used to wait for dependency completion */
std::future<void> fut;
}
void task_func(Task *t) {
/* do some work */
sleep(rand() % t->arg);
/* block on dependency if not already complete */
if (t->has_dependence) {
t->fut.get();
}
/* commit */
sleep(rand() % t->arg);
/* signal dependent task of completion */
t->prom.set_value();
}
As the tasks are scheduled, each task will have its future linked to the
promise in the thread it is dependent on. For example, given a four Tasks
with dependencies 0->1->2->3
, they can be set up like so,
tasks[0].has_dependence = false;
tasks[1].has_dependence = true;
tasks[1].fut = tasks[0].get_future();
tasks[2].has_dependence = true;
tasks[2].fut = tasks[1].get_future();
tasks[3].has_dependence = true;
tasks[3].fut = tasks[2].get_future();
and then each run on a thread. No matter what order they happen to complete their work in, the interlinked promises and futures will ensure that they commit in the correct order. More complex dependencies can be modeled this way too, using arrays of futures within the task, so long as the resulting dependency graph is a DAG; this approach cannot detect cyclic dependencies, and so these will cause a deadlock. Also, note that this approach requires the dependencies to be configured prior to launching the threads. This isn’t usually a problem, but it is something to be aware of.
Example
Here is a toy example which uses the technique above to ensure correct behavior. Each thread sleeps for a period of time (selected in an attempt to force worst-case behavior, with the tasks completing in reverse dependency order), waits on its dependency to be satisfied, and then prints a number. The numbers and dependencies are selected so that the result should be a sequence of numbers in ascending order, assuming the commits are properly ordered. You can download the code directly, here.
In this example, I use a dummy promise which I immediately satisfy, rather than a boolean, to handle the case of the first task in the sequence (which should lack any dependence).
#include <thread>
#include <future>
#include <cstdlib>
#include <cstdio>
#include <unistd.h>
struct Task {
size_t i;
std::promise<void> prom;
std::future<void> fut;
};
void thread_task(Task *t) {
size_t tmp = rand() % 5;
sleep(16 - t->i);
t->fut.get();
printf("%ld\n", t->i);
t->prom.set_value();
}
int main(int argc, char **argv) {
size_t n=16;
std::thread workers[n];
Task tasks[n];
std::promise<void> trigger_prom;
std::promise<void> *prev_prom = &trigger_prom;
for (size_t i=0; i<n; i++) {
tasks[i].i = i;
tasks[i].fut = prev_prom->get_future();
prev_prom = &(tasks[i].prom);
if (i == 0) trigger_prom.set_value();
workers[i] = std::thread(thread_task, &(tasks[i]));
}
for (size_t i=0; i<n; i++) {
workers[i].join();
}
}
Conclusion
In this post, I discussed C++’s promise/future abstraction for asynchronous interfaces, and how it can be used to provide decentralized dependency enforcement among concurrent tasks. In this example, I specifically considered tasks which had a work phase requiring no synchronization, followed by a commit phase that must follow a specific dependency ordering, with only a single dependency per task. However, this approach can be easily generalized to multiple dependencies, or to tasks which require dependency enforcement at different places within their execution.
This approach is quite convenient, requiring much less synchronization code than condition variables or semaphores, and avoiding the need for a centralized thread to coordinate task commits. It does, however, require setting up the dependencies for all the tasks prior to launching them.