Asynchronous Task Dependency Management with Promises and Futures

Simple task dependency management in C++

I was recently working on a project which involved performing a number of tasks asynchronously, with the constraint that the tasks could be performed in any order, but they must “commit” in a specific order based on a dependency chain. For example, the program might kick off a series of tasks with the dependencies A->B->C->D (where X->Y means ‘Y is dependent on X’) on separate threads. These tasks will do some processing and then update a global structure with their results. The processing can be done fully in parallel, with no synchronization required. However, the updates to the global structure must be in done dependency order (so A, then B, and so on). In pseudocode, then these tasks might look like,

routine: Task(args, depends_on[optional])
  result := process(args)
  if (defined(depends_on)) then
    wait_for(depends_on)
  end

  commit(result)
end routine

There are a number of ways to achieve this effect. Probably the simplest would be to have a central manager thread which handles the committing of the results, and enforces the relevant order at that point. But I wanted a decentralized approach that would allow my scheduler to, more or less, “fire and forget” when it launched the task threads. This meant that the threads needed to figure out the commit order amongst themselves.

I decided to take this opportunity to play around with C++11’s std::promise abstraction. As it turned out, this worked exceedingly well and made for a straightforward task ordering implementation, which required much less code than would have been necessary had I used condition variables or semaphores.

Introduction to Promises and Futures

Promises and Futures are two closely related abstractions that are designed for use in asynchronous interfaces. Asynchronous interfaces are ones which initiate a task (usually something expensive like an IO) in the background and return to the caller immediately. They then provide some faculty for allowing the caller to access the result of the background task at some point in the future, when it has finished. This is in contrast to synchronous interfaces, which will not return control to the caller until the result is ready.

In C++, an std::promise object is used to store the returned value from an asynchronous interface and an std::future is used to retrieve that value. The asynchronous interface will return a future to the caller, which is associated with a promise in the asynchronous thread itself. Once the asynchronous thread’s output is ready, it will use the promise to “set” the value, which will then be retrievable in the caller via the future. When the caller accesses the value via the future, it will block until the value is ready. This allows the asynchronous interface to spin up a thread to do the work and return immediately to the caller, who can then do other work and only block on the value when absolutely necessary.

It’s easy to get promises and futures mixed up. An easy way to remember it is that a future represents a value that will be available at some point in the future, and a promise is the mechanism by which that value is provided. Promises exist within the asynchronous operation itself, and futures are returned to the caller.

Here is a simple example of using promises and futures to construct an asynchronous interface,

/* header files containing std::promise, std::future, and std::thread */ 
#include <future>
#include <thread>

/* I'm a C programmer at heart... don't judge me! */
#include <cstdlib>
#include <cstdio>
#include <unistd.h>

std::thread worker_thrd;

void worker(int time, std::promise<int> result_prom) {
  /* "do some work" that takes a random amount of time. */
  sleep(rand() % time);

  /* return the result via the promise */
  result_prom.set_value(100);
}

std::future<int> do_async_job(int arg) {
  std::promise<int> result_prom;
  std::future<int> result_fut = result_prom.get_future();

  /* when passing the promise as an argument, we must use move semantics */
  worker_thrd = std::thread(worker, arg, std::move(result_prom));
  return result_fut;
}

int main(int argc, char **argv) {
  /* call asynchronous interface */
  auto res = do_async_job(10);
  
  /* 
   * the above call will return as soon as the thread
   * is spawned, so we are free to do "other work" 
   * while we wait
   */
  sleep(rand() % 5); 

  /* finally, access the async result; blocking if necessary */
  auto x = res.get();

  fprintf(stdout, "result: %d\n", x);
  worker_thrd.join();
  return EXIT_SUCCESS;
}

I hope you can see just how powerful this abstraction is. In particular, it can be used to design some incredibly easy to use asynchronous interfaces (as seen by how simple the main is above), which don’t require the user to have to deal with callbacks, manual polling, etc.

Using Promises and Futures for Dependency Tracking

This same abstraction can be used to conveniently allow threads to block on the completion of other threads by communicating the completion status of one thread to another via a promise and future. std::promise supports a void specialization that can be used to send signals without passing around any specific values.

I’ll represent each task’s state using a struct, which will package the task arguments together with a promise and a future.

struct Task {
  int arg;

  /* used to signal dependent threads on Task completion */
  std::promise<void> prom; 

  /* if false, do not block on a future */
  bool has_dependence;

  /* used to wait for dependency completion */
  std::future<void> fut;
}

void task_func(Task *t) {
  /* do some work */
  sleep(rand() % t->arg);

  /* block on dependency if not already complete */
  if (t->has_dependence) {
    t->fut.get();
  }

  /* commit */
  sleep(rand() % t->arg);

  /* signal dependent task of completion */
  t->prom.set_value();
}

As the tasks are scheduled, each task will have its future linked to the promise in the thread it is dependent on. For example, given a four Tasks with dependencies 0->1->2->3, they can be set up like so,

tasks[0].has_dependence = false;

tasks[1].has_dependence = true;
tasks[1].fut = tasks[0].get_future();

tasks[2].has_dependence = true;
tasks[2].fut = tasks[1].get_future();

tasks[3].has_dependence = true;
tasks[3].fut = tasks[2].get_future();

and then each run on a thread. No matter what order they happen to complete their work in, the interlinked promises and futures will ensure that they commit in the correct order. More complex dependencies can be modeled this way too, using arrays of futures within the task, so long as the resulting dependency graph is a DAG; this approach cannot detect cyclic dependencies, and so these will cause a deadlock. Also, note that this approach requires the dependencies to be configured prior to launching the threads. This isn’t usually a problem, but it is something to be aware of.

This approach assumes that you have enough threads to launch all of the tasks concurrently. If you have fewer threads than tasks, you need to take care to launch the tasks in the appropriate order. Otherwise you may deadlock, with all available threads waiting on the completion of a task which hasn’t been launched yet.

Example

Here is a toy example which uses the technique above to ensure correct behavior. Each thread sleeps for a period of time (selected in an attempt to force worst-case behavior, with the tasks completing in reverse dependency order), waits on its dependency to be satisfied, and then prints a number. The numbers and dependencies are selected so that the result should be a sequence of numbers in ascending order, assuming the commits are properly ordered. You can download the code directly, here.

In this example, I use a dummy promise which I immediately satisfy, rather than a boolean, to handle the case of the first task in the sequence (which should lack any dependence).

#include <thread>
#include <future>
#include <cstdlib>
#include <cstdio>
#include <unistd.h>

struct Task {
    size_t i;
    std::promise<void> prom;
    std::future<void> fut;
};

void thread_task(Task *t) {
    size_t tmp = rand() % 5;
    sleep(16 - t->i);
    t->fut.get();

    printf("%ld\n", t->i);
    t->prom.set_value();
}


int main(int argc, char **argv) {
    size_t n=16;
    std::thread workers[n];
    Task tasks[n];

    std::promise<void> trigger_prom;
    std::promise<void> *prev_prom = &trigger_prom;
    for (size_t i=0; i<n; i++) {
        tasks[i].i = i;
        tasks[i].fut = prev_prom->get_future();
        prev_prom = &(tasks[i].prom);
        
        if (i == 0) trigger_prom.set_value();

        workers[i] = std::thread(thread_task, &(tasks[i]));
    }

    for (size_t i=0; i<n; i++) {
        workers[i].join();
    }
}

Conclusion

In this post, I discussed C++’s promise/future abstraction for asynchronous interfaces, and how it can be used to provide decentralized dependency enforcement among concurrent tasks. In this example, I specifically considered tasks which had a work phase requiring no synchronization, followed by a commit phase that must follow a specific dependency ordering, with only a single dependency per task. However, this approach can be easily generalized to multiple dependencies, or to tasks which require dependency enforcement at different places within their execution.

This approach is quite convenient, requiring much less synchronization code than condition variables or semaphores, and avoiding the need for a centralized thread to coordinate task commits. It does, however, require setting up the dependencies for all the tasks prior to launching them.