Continuations for C++ std::future<> types
thousandeyes::futures
is a self-contained, header-only, cross-platform library for attaching continuations to std::future
types and adapting multiple std::future
objects into a single std::future
object.
The library is tested on and should work on the following platforms:
- MacOS with clang 7.0 or newer
- Linux with g++ 5.5.0 or newer
- Windows with VS 2017 or newer
Currently, the library requires a C++14-capable compiler. Nonetheless, it can be ported to C++11 if projects that want to use the library cannot be upgraded to C++14.
- Getting started
- Motivation
- Features
- Examples
- Tests
- Using thousandeyes::futures in Existing Projects
- Performance
- Specialized Use Cases
- Contributing
- Licensing
The following program attaches a continuation to the future returned by the getRandomNumber()
function. The attached continuation gets called only when the input future is ready (i.e., the future<int>::get()
does not block).
future<int> getRandomNumber()
{
return std::async(std::launch::async, []() {
return 4; // chosen by fair dice roll.
// guaranteed to be random.
});
}
int main(int argc, const char* argv[])
{
// 1. create the executor used for waiting and setting the value on futures.
auto executor = make_shared<DefaultExecutor>(milliseconds(10));
// 2. set the executor as the default executor for the current scope.
Default<Executor>::Setter execSetter(executor);
// 3. attach a continuation that gets called only when the given future is ready.
auto f = then(getRandomNumber(), [](future<int> f) {
return to_string(f.get()); // f is ready - f.get() does not block
});
// 4. the resulting future becomes ready when the continuation produces a result.
string result = f.get(); // result == "4"
// 5. stop the executor and cancel all pending continuations.
executor->stop();
}
There are five concepts/aspects of the thousandeyes::futures
library that can be seen in the above example:
- Creating an
Executor
- Setting an
Executor
instance as the scope's default executor - Attaching continuations using the
thousandeyes::futures::then()
function - Extracting the final value from the resulting, top-level
std::future
object - Stopping an
Executor
The Executor
is the component responsible for waiting on and providing values to the std::future
objects that are handled by the thousandeyes::futures
library. Before attaching continuations to std::future
types, an instance of a concrete implementation of the Executor
interface has to be created as a shared pointer (shared_ptr<Executor>
).
The library provides a simple, default implementation of the Executor
interface called DefaultExecutor
. Clients of the library, however, may choose to implement and use their own executor(s) if the provided DefaultExecutor
is not a good fit for the project (see section Performance).
The DefaultExecutor
uses two std::thread
threads: one thread that polls all the active std::future
objects that have continuations attached to them and one that is used to invoke the continuations once the futures become ready. It polls the active std::future
objects with the timeout given in the component's constructor.
In the above example, the polling timeout is 10 ms, which means that DefaultExecutor
will wait for 10 ms for each std::future
to get ready. If a specific std::future
gets ready before the timeout expires, it gets "dispatched" - meaning that the attached continuation gets invoked (on another thread). If the std::future
is not ready after the given timeout, the DefaultExecutor
will start waiting on the next std::future
object and will come back to the first one after it finishes with all the others (i.e., when it dispaches or when the timeout expires on them).
When attaching continuations to std::future
objects, an Executor
instance has to be specified and be responsible for monitoring and dispatching them.
Executor
instances can be specified either by passing a shared_ptr<Executor>
instance as a first argument to the thousandeyes::futures::then()
function or by setting a shared_ptr<Executor>
instance as the default Executor
for the current scope. The latter can be seen in the example above where the executor
object is passed into the Default<Executor>::Setter
's constructor.
Then, the lifetime of the registration of the specified default Executor
is tied to the lifetime of the Default<Executor>::Setter
object. When the Setter
object goes out of scope the default Executor
is reset back to its previously registered value. This is more clearly shown in the example below.
future<int> h()
{
return then(getRandomNumber(), [](future<int> f) {
return 1821;
});
}
int main(int argc, const char* argv)
{
auto e1 = make_shared<DefaultExecutor>(milliseconds(10));
Default<Executor>::Setter execSetter(e1);
auto f = then(getRandomNumber(), [](future<int> f) { // uses e1
return "e1";
});
auto g = h(); // then() in "h" uses e1
auto e2 = make_shared<DefaultExecutor>(milliseconds(1821));
{
Default<Executor>::Setter execSetter(e2);
auto f2 = then(getRandomNumber(), [](future<int> f) { // uses e2
return 1821;
});
auto g2 = h(); // then() in "h" uses e2
}
auto k = then(getRandomNumber(), [](future<int> f) { // uses e1 again
return "e1 again";
});
e2.reset(); // e2's lifetime ends here
}
Attaching continuations to std::future
objects is achieved via the thousandeyes::futures::then()
function. As mentioned above, then()
can optionally accept a shared_ptr<Executor>
instance as its first argument. Nonetheless, the main arguments the function accepts are the following:
- The input future object (
std::future
) - The continuation function that accepts the input future as its argument and can either return a value or an
std::future
of a value
The input future object is passed to the then()
function by value, which means that from that point onwards then()
owns the object. Subsequently, the continuation function is invoked by the Executor
(the DefaultExecutor
in the examples) only when the input std::future
object is ready. The ownership of the ready input future object is transferred to the continuation function and, thus, it is passed to it by value.
Inside the continuation function, the std::future::get()
method never blocks, it either returns the stored result immediately or throws the stored exception depending on whether the original input future object became ready with a value or with an exception. If the continuation function's return type is a non-std::future
type T
, then the return value of the then()
function is an std::future<T>
object that becomes ready with the result of the continuation function or an exception, if the continuation function happens to throw
when invoked. If the continuation function's return type is an std::future<U>
type, then the return value of the then()
function is equivalent to the resulting std::future<U>
object itself. More specifically, then()
returns an std::future<U>
object that becomes ready when the continuation's std::future<U>
object becomes ready or, if the continuation function happens to throw
when invoked, it contains the thrown exception.
This allows client code to easily chain then()
invocations inside arbitrarily nested continuation functions and receive the final result only when all the std::future
objects that participate in the chain have become ready. This is best shown in the excerpt below:
auto f = then(getValueAsync(1821), [](future<int> f) {
auto first = to_string(f.get());
return then(getValueAsync(string("1822")), [first](future<string> f) {
auto second = f.get();
return then(getValueAsync(1823), [first, second](future<int> f) {
return first + '_' + second + '_' + to_string(f.get());
});
});
});
string result = f.get(); // result == "1821_1822_1823"
When the intermediate std::future
objects of a calculation are independent, even when they cannot be determined at compile-time, the thousandeyes::futures::all()
adapter can be used to create an std::future
object that gets ready when all the input futures become ready. The resulting std::future
object, in turn, can be passed into then()
in order to process and aggregate the individual results. This can be seen in the example below:
vector<future<int>> futures;
for (int i = 0; i < 1821; ++i) {
futures.push_back(getValueAsync(i));
}
auto f = then(all(move(futures)), [](future<vector<future<int>>> f) {
auto futures = f.get();
return accumulate(futures.begin(), futures.end(), 0, [](int sum, future<int>& f) {
return sum + f.get();
});
});
int result = f.get(); // result == 0 + 1 + 2 + ... + 1820
In the above example, inside the continuation, both the top-level future (std::future<vector<std::future<int>>>
) and all the individual futures contained within the std::vector
of its stored value (std::future<int>
) are guaranteed to be in the ready state. Therefore, all the std::future:get()
calls above do not block.
As also mentioned in the subsections above, the result of the then()
function is an std::future<T>
object whose type T
depends on the return type of the continuation function. The result of the all()
function is either a "future of a vector of futures" object (std::future<std::vector<std::future<T>>>
), when its input is an std::vector<std::future<T>>
object, or a "future of a tuple of futures" object if its input is a tuple or multiple, variable, arguments.
The potential return types of the then()
function are summarized in the table below:
Continuation Return Type | then() Return Type |
---|---|
T |
std::future<T> |
std::future<T> |
std::future<T> |
The potential return types of the all()
function are summarized in the table below:
all() Argument Type(s) |
all() Return Type |
---|---|
std::vector<std::future<T>> |
std::future<std::vector<std::future<T>>> |
std::tuple<std::future<T>, std::future<U>, ...> |
std::future<std::tuple<std::future<T>, std::future<U>, ...>> |
std::future<T>, std::future<U>, ... |
std::future<std::tuple<std::future<T>, std::future<U>, ...>> |
Calling the std::future::get()
method of an std::future
object returned by the then()
function throws an exception under the following conditions:
- When the continuation function throws an exception
E
when invoked - When the continuation function returns an
std::future
object that becomes ready with an exceptionE
- When the
Executor
object is stopped - When the total wait time on the input future exceeds the library's
Time Limit
In the first two cases, std::future::get()
re-throws the same the exception E
, whereas in the last two cases it throws a WaitableWaitException
and its subclass, WaitableTimedOutException
, respectively.
Calling the std::future::get()
method of an std::future
object returned by the all()
function throws only when the associated Executor
object is stopped and when the total wait time on an input future exceeds the library's Time Limit
(see section Setting and handling the time limit). In these cases, the method throws a WaitableWaitException
and its subclass, WaitableTimedOutException
, respectively.
Explicitly stopping an Executor
instance does two things:
- It cancels all pending wait operations on all the associated input
std::future
objects - It sets the
Executor
object into an invalid state where it cannot be used anymore to wait on and dispatchstd::future
objects
The DefaultExecutor
implementation, used in all the examples above, (a) sets the WaitableWaitException
on all the pending std::future
objects associated with it and (b) joins the two threads that are used to poll and dispatch the associated std::future
objects respectively. For more information on providing concrete Executor
implementations, see section Implementing alternative Executors.
The C++11/14/17 standard library includes the std::future
type for returning results asynchronously to clients. The API of the standard std::future
type, however, is very limited and does not provide support for attaching continuations or combining/adapting existing future objects. This limitation makes it very difficult to use std::future
extensively in projects, since it is tedious and error-prone to effectively parallelize and reuse components that need to consume, transform and combine multiple asynchronous results.
Prior to implementing the thousandeyes::futures
library, a few existing libraries were evaluated as potential alternatives to std::future
. Despite their maturity, flexibility and feature-completeness they were not deemed as a good fit for our projects.
First of all, the boost::future
type was not a good fit since it relied on the specific implementations of the boost::thread
library’s components, which, in turn, meant that dependent projects would have to replace standard components such as std::thread
, std::mutex
, and std::chrono
with their boost equivalents.
The folly::future
library, created by Facebook, was not deemed a good fit since it relied on many of the other folly sub-libraries and we did not want to add the whole folly
library as a dependency to our projects.
Finally, other open source future implementations on github.com were deemed not a good fit since they implemented their own, unique future type and were not very mature and well-tested.
The proposed solution, named thousandeyes::futures
, is a small, independent header-only library with the following features:
- Provides all its functionality on top of the standard, widely-supported
std::future
type - Does not have any external dependencies
- Does not need building - client code should only
#include
its headers - Is efficient and well-tested
- Is easy to extend and support many different use-cases
- Achieves good trade-offs between implementation simplicity, efficient use of resources and time-to-detect-ready lag
The library includes a few examples that showcase its use. The source code of the examples is available under the examples
folder. Some of the examples and excerpts of them are also explained in the sections above.
The examples can be compiled with the following commands:
$ mkdir build
$ cd build
$ cmake -DTHOUSANDEYES_FUTURES_BUILD_EXAMPLES=ON ..
$ cmake --build . --config Debug
Note that cmake
version 3.11
is required for building the examples.
Then, the executables of all the examples will be created under the build/examples/Debug
folder.
thousandeyes::futures
uses googletest
and googlemock
for its tests. The provided tests assure that the library and its building blocks work correctly under many different usage scenarios. The source code of the tests is available under the tests
folder.
The tests can be compiled with the following commands:
$ mkdir build
$ cd build
$ cmake -DTHOUSANDEYES_FUTURES_BUILD_TESTS=ON ..
$ cmake --build . --config Debug
Note that cmake
version 3.11
is required for building the tests.
Then, the executables for all the available tests will be created under the build/tests/Debug
folder. They can invoked (individually) manually or all together, at once via the ctest
command:
$ ctest -C Debug -V
The simplest and most direct way to use the library is to copy it into an existing project under, e.g., the thousandeyes-futures
folder, and then add its include
sub-folder into the project's include path. E.g.: -I./thousandeyes-futures/include
or /I.\thousandeyes-futures\include
.
Moreover, the following sub-sections describe alternative ways to integrate the library into existing projects that either use cmake
and/or the conan.io
package manager (vpkg
coming soon).
In projects using cmake
the library can be found using the thousandeyes-futures_ROOT
variable when generating the build files via the cmake
generate command.
E.g.:
$ cmake -G Xcode -Dthousandeyes-futures_ROOT=/path/to/thousandeyes-futures -DBOOST_ROOT=${BOOST_ROOT}
Alternatively, the library can be copied in a sub-folder (or a git submodule), e.g., under the directory thousandeyes-futures
, and then added to cmake
via the add_directory()
macro in the top-level CMakeLists.txt
file.
E.g.:
add_directory(thousandeyes-futures)
In any case, then, the library can be set as a target link library with the alias thousandeyes::futures
. For example, the following command adds the library as a private dependency to the my-target
target:
target_link_libraries(my-target PRIVATE thousandeyes::futures)
The library can be built and published into a conan.io
-compatible repository. Regardless of where it is published, the library can be integrated into the existing project using the project's build system and the macros/variables generated by conan
.
When the configured conan
generator is cmake
, the library can be integrated either via the conan
-generated cmake macros or, alternatively, via the find_package()
macro that imports the target library with the thousandeyes::futures
alias:
find_package(ThousandEyesFutures)
Subsequently, the library can be added as link target to the other cmake
targets. For example, the following command adds the library as a public dependency to the my-target
target:
target_link_libraries(my-target PUBLIC thousandeyes::futures)
The library can be published to a conan.io
repository. The first step to doing that is making sure that the internal or external repository is set as a conan
remote
.
For example the following command adds the server http://conanrepo.example.com:9300
as conan remote called origin
:
$ conan remote add origin http://conanrepo.example.com:9300
Then, the library can be published as the thousandeyes-futures
package via the upload
command.
For example the following commands bundle the library as a conan package and publish it to the origin
remote under the user/channel
namespace:
$ cd thousandeyes-futures
$ conan create . user/channel
$ conan upload -r origin thousandeyes-futures/0.1@user/channel
Subsequently, the published package can be required from the project via a conanfile.py
:
from conans import ConanFile
class MyConanFile(ConanFile):
def requirements(self):
self.requires("thousandeyes-futures/0.1@user/channel")
Or a conanfile.txt
:
[requires]
thousandeyes-futures/0.1@user/channel
This subsection includes the results and a discussion about the performance of the default thousandeyes::futures::then()
implementation. In this context "default implementation" refers to the the implementation of thousandeyes::futures::then()
using the provided DefaultExecutor
, which, in turn, is the default, concrete implementation of the Executor
component based on the PollingExecutor
strategy.
After testing and benchmarking the default implementation (a) against other, more direct approaches for detecting when the set of active futures become ready and (b) against other implementations of the Executor
component, it appears that the DefaultExecutor
with a q
value of 10 ms achieves a good balance between efficient use of resources and raw performance.
The proposed, default implementation of then()
, using the DefaultExecutor
(default_then()
), was benchmarked against the following alternative implementations:
- A
blocking_then()
implementation that eagerly callsfuture::get()
and blocks to get the result before moving to the next future (serves as the baseline) - An
unbounded_then()
implenentation that creates a new thread per-invocation that waits for the result viafuture::wait()
- An
asio_then()
implementation that dispatches a function viaboost::asio::io_context::post()
per-invocation, which, in turn, waits for the result viafuture::wait()
and uses 50 threads to runboost::asio::io_context::run()
Whereas the other approaches (apart from blocking_then()
) use many threads, the default_then()
implementation uses at most two threads. One thread for polling all the active futures for completion status and one thread for dispatching the continuations.
Also all the futures that were used for the benchmark were independent - i.e., they did not depend on other futures to complete before completing themselves. The code that benchmarked the different methods was equivalent to the following:
template<class T>
future<T> getValueAsync(int i)
{
promise<T> p;
auto result = p.get_future();
ioService.post([p=move(p), i]() {
int g = 500000 - i * 5;
sleep_for(microseconds(g));
p.set_value(i);
});
return result;
}
void useCase()
{
vector<future<string>> f;
for (int i = 0; i < 1900; ++i) {
f.push_back(then(getValueAsync(i), [](future<int> f) {
return to_string(f.get());
}));
}
for (int i = 0; i < 1900; ++i) {
auto result = f[i].get();
assert(result == to_string(i));
}
}
The testing system was a 2016 MacBook Pro with 2.4 GHz Intel Core i7 CPU and 16 GB 1867 MHz LPDDR3 RAM. All the programs used a boost::asio::io_context
-based thread-pool with 100 threads for the implementation of the getValueAsync()
function, since macOS caps the number of threads that can be instantiated. On the testing system, after about 2,000 threads the thread::thread()
constructor throwed an exception with the message:
thread constructor failed: Resource temporarily unavailable
In general, among the aforementioned implementations, unbounded_then()
, asio_then()
and default_then(q = 10ms)
exhibit very similar performance characteristics. The results are summarized in the table below:
Implementation | Result (total / user / system / %cpu ) |
---|---|
blocking_then() |
15:46.81 / 0.14s / 0.19s / 0% |
unbounded_then() |
00:09.52 / 0.23s / 0.24s / 4% |
asio_then() |
00:09.50 / 0.04s / 0.06s / 1% |
default_then(q = 0ms) |
00:09.45 / 1.72s / 8.12s / 104% |
default_then(q = 1ms) |
00:09.48 / 0.20s / 0.30s / 5% |
default_then(q = 10ms) |
00:09.48 / 0.06s / 0.08s / 1% |
default_then(q = 500ms) |
00:09.53 / 0.04s / 0.05s / 0% |
The DefaultExecutor
was subsequently tested against different Executor
implementations under two different use cases. The first use case, like before, only generates independent futures. The second use case, on the other hand, generates a long chain of interdependent futures where a specific future becomes ready only when all the futures generated after it become ready.
Specifically, the first use case, that utilizes std::async()
to simulate asynchronous work and sleeps for a time period uniformely distributed in the [5, 5_000_000]
usec
interval can be seen below:
template<class T>
future<T> getValueAsync(const T& value)
{
static mt19937 gen;
static uniform_int_distribution<int> dist(5, 5000000);
return std::async(std::launch::async, [value]() {
this_thread::sleep_for(microseconds(dist(gen)));
return value;
});
}
void usecase0()
{
vector<future<string>> results;
for (int i = 0; i < 1900; ++i) {
results.push_back(then(getValueAsync(i), [](future<int> f) {
return to_string(f.get());
}));
}
for (int i = 0; i < 1900; ++i) {
auto result = results[i].get();
assert(result == to_string(i));
}
}
The second use case generates a long chain of interdependent futures by using two pseudo mutually recursive functions recFunc1
and recFunc2
. The first function generates a future that is ready when the future returned from the second function becomes ready, which in turn becomes ready when the future returned by the first function becomes ready. Specifically, the code for the second use case can be seen below:
bool usecase1()
{
auto f = recFunc1(0);
int result = f.get();
assert(result == 1821);
}
future<int> recFunc1(int count)
{
auto h = std::async(std::launch::async, [count]() {
this_thread::sleep_for(milliseconds(1));
return count + 1;
});
return then(move(h), [](future<int> g) {
return recFunc2(move(g));
});
}
future<int> recFunc2(future<int> f)
{
auto count = f.get();
if (count == 100) {
return fromValue(1821);
}
auto h = std::async(std::launch::async, []() {
this_thread::sleep_for(milliseconds(1));
});
return then(move(h), [count](future<void> g) {
g.get();
return recFunc1(count);
});
}
The two Executor
implementations that were benchmarked against the DefaultExecutor
were the BlockingExecutor
(baseline) and the UnboundedExecutor
(see also section Implementing alternative Executors).
Whereas for the first use case (independent futures) the DefaultExecutor
of any q != 0
value behaves similarly to UnboundedExecutor
, the second use case exposes a pathological case for the DefaultExecutor
. When all active futures are interdependent, DefaultExecutor
hits its theoretical worst-case performance. This can be easily seen in the table below:
Executor Implementation |
Result 0 (total / user / system / %cpu ) |
Result 1 (total / user / system / %cpu ) |
---|---|---|
BlockingExecutor |
77:31.90 / 0.20s / 0.33s / 0% |
See note 1 |
UnboundedExecutor |
00:5.696 / 0.50s / 0.58s / 18% |
00:0.301 / 0.03s / 0.06s / 30% |
DefaultExecutor(q = 0ms) |
00:5.168 / 5.28s / 0.17s / 105% |
00:0.283 / 0.27s / 0.03s / 105% |
DefaultExecutor(q = 1ms) |
00:5.141 / 0.29s / 0.26s / 10% |
00:26.71 / 0.40s / 0.43s / 3% |
DefaultExecutor(q = 10ms) |
00:5.156 / 0.23s / 0.23s / 8% |
03:47.96 / 0.45s / 0.46s / 0% |
DefaultExecutor(q = 100ms) |
00:5.252 / 0.24s / 0.22s / 8% |
33:52.91 / 0.50s / 0.46s / 0% |
DefaultExecutor(q = 500ms) |
00:5.415 / 0.24s / 0.23s / 8% |
See note 2 |
note 1
: Since the futures are interdependent, theBlockingExecutor
cannot completenote 2
: It takes too long to finish, about5 * DefaultExecutor(q = 100ms)
The full source code used for this benchmark can be seen in examples/executors.cpp
.
When the active futures are independent, the theoretical time-to-detect-ready lag, i.e., the time period from the exact moment the future becomes ready until the moment it is dispatched, is q * O(N)
, where N
is the number of active futures associated with a specific Executor
instance. The worst possible delay can happen if the future becomes ready immediately after it is polled and, then, all the other active futures associated with the same Executor
do not get ready when polled.
When the active futures do not complete independently, the theoretical time-to-detect-ready lag of DefaultExecutor
increases to q * O(N^2)
, where N
is the number of active interdependent futures. The second use case of the previous subsection (see Comparing to other Executors) achieves the worst possible delay by creating a long chain of active futures where each future depends on the future generated after it.
Regardless of the aforementioned extreme cases, the DefaultExecutor
with a q
value of 10 ms appears to be a very good compromise between raw, real-world performance and resource utilization. In typical usage scenarios, where there will be a few hundred std::future
instances active at any given time, mostly independent, the worst possible time-to-detect-ready lag will only be a few seconds. Moreover, the proposed implementation allows for easily scaling the monitoring and dispatching of the active futures. In usage scenarios where the number of active futures is orders of magnitute bigger, the active futures can be distributed over many different Executor
instances.
The way the thousandeyes::futures
library is currently used in internal projects, a few seconds of delay is perfectly fine since the main goal is increasing the parallelization potential of the underlying system and not make measurements. The proposed library achieves that goal with very modest cpu and memory requirements.
Nonetheless, thousandeyes::futures
should not be used for measuring events via continuations, especially if those measurements need millisecond (or better) accuracy. For example, measurements like the following would not provide the required accuracy when using the DefaultExecutor
:
Stopwatch<steady_clock> sw;
then(connect(host, port, use_future), [sw] {
out << "It took " << sw.elapsed() << " to connect to host" << endl;
});
The thousandeyes::futures
library provides overloads for its then()
and all()
functions (a) for explicitly specifying the Executor
instance that will be used to monitor the input std::future
and dispatch the attached continuation and (b) for setting timeouts after which the library gives up waiting for the input future(s) to become ready.
Moreover, following the performance discussion in the previous section, the library can be extended to support more specialized use cases by either providing completely different concrete implementations of the Executor
interface or by providing different invokers for the existing PollingExecutor
.
Both the thousandeyes::futures::then()
and thousandeyes::futures::all()
functions accept a timeLimit
value as their first or second argument, depending on whether the Executor
is given explicitly or not. This value sets a timeout after which the input (given) future(s) stop being monitored and the resulting (output) future becomes ready with the thousandeyes::futures::WaitableTimedOutException
exception.
Specifically, the signatures of the functions that include the timeLimit
parameter are the following:
future<...> then(std::shared_ptr<Executor> executor, std::chrono::microseconds timeLimit, ...);
future<...> then(std::chrono::microseconds timeLimit, ...);
future<...> all(std::shared_ptr<Executor> executor, std::chrono::microseconds timeLimit, ...);
future<...> all(std::chrono::microseconds timeLimit, ...);
Therefore, when a timeout is specified when attaching a continuation to an input std::future
object, if the latter is not ready after dt
, where dt >= timeLimit
, then the resulting (output) std::future
object will become ready with the library's WaitableTimedOutException
exception.
This behavior is clearly shown in the example below:
void timeout0()
{
future<void> f = sleepAsync(hours(5));
auto g = then(milliseconds(100), move(f), [](future<void> f){
f.get(); // This will never get called
});
try {
g.get(); // This will throw a timeout exception
}
catch (const WaitableTimedOutException& e) {
cout << "Got exception: " << e.what() << endl;
}
}
On the other hand, when a timeout is specified at the all()
adapter, if any of the input std::future
objects is not ready after dt
, where dt >= timeLimit
, then the resulting (output) std::future
object will become ready with the library's WaitableTimedOutException
exception.
Again, this behavior is clearly shown in the example below:
void timeout1()
{
auto f = all(milliseconds(100), sleepAsync(milliseconds(0)), sleepAsync(hours(5)));
auto g = then(move(f), [](future<tuple<future<void>, future<void>>> f){
try {
f.get(); // This will throw a timeout exception
}
catch (const WaitableTimedOutException& e) {
cout << "Got exception: " << e.what() << endl;
}
});
g.get(); // This will not throw since the exception was handled in the continuation
}
If no explicit timeLimit
is given by the client code, the library's then()
and all()
functions implicitly set their timeLimit
to one hour.
A full example that showcases timeouts when using the library can be found in examples/timeout.cpp
.
As also mentioned in the sections above, the Executor
is responsible for monitoring all the input futures until they become ready. As soon as the Executor
detects that a future is ready, it is responsible for dispatching it. All the aforementioned input future objects are adapted by the then()
and all()
functions to objects that implement the library's Waitable
interface.
The Waitable
interface is defined as follows:
class Waitable {
public:
virtual ~Waitable() = default;
virtual bool wait(const std::chrono::microseconds& timeout) = 0;
virtual void dispatch(std::exception_ptr err = nullptr) = 0;
};
The interface's wait()
method is equivalent to the std::future::wait_for()
method and returns true
if the Waitable
object is ready for dispatching and false
otherwise. The dispatch()
method with a nullptr
argument is equivalent to the std::future::set_value()
method, whereas with a non-nullptr
argument, it is equivalent to the std::future::set_exception()
method.
Then, an Executor
receives Waitable
objects to monitor via its watch()
method. Executor
should also define a stop()
method for suspending its normal operation and dispatching all non-ready Waitable
objects with a WaitableWaitException
exception.
Specifically, the interface of the Executor
component is defined as follows:
class Executor {
public:
virtual ~Executor() = default;
virtual void watch(std::unique_ptr<Waitable> w) = 0;
virtual void stop() = 0;
};
An example of a simple, limited but complete and fully conforming Executor
is the BlockingExecutor
which can be implemented as follows:
class BlockingExecutor : public Executor {
public:
void watch(std::unique_ptr<Waitable> w)
{
try {
while (active_) {
if (!w->wait(minutes(1))) {
continue;
}
w->dispatch();
return;
}
w->dispatch(make_exception_ptr(WaitableWaitException("Executor stoped")));
}
catch (...) {
w->dispatch(current_exception());
}
}
void stop()
{
active_ = false;
}
private:
atomic<bool> active_{ true };
};
The BlockingExecutor
above has the following characteristics:
- A shared
BlockingExecutor
object is thread-safe - It monitors the given
Waitable
objects until they become ready - It dispatches the monitored
Waitable
objects without an error if they do notthrow
- It dispatches the monitored
Waitable
objects with an error if an exception is thrown - The invocation of its
stop()
method results in (eventually) halting the monitoring of theWaitable
objects and dispatching them with theWaitableWaitException
exception
The UnboundedExecutor
, below, has the same characteristics:
class UnboundedExecutor : public Executor {
public:
void watch(std::unique_ptr<Waitable> w)
{
lock_guard<mutex> lock(threadListMutex_);
threads_.emplace_back([this, w=move(w)]() {
try {
while (active_) {
if (!w->wait(minutes(1))) {
continue;
}
w->dispatch();
return;
}
w->dispatch(make_exception_ptr(WaitableWaitException("Executor stoped")));
}
catch (...) {
w->dispatch(current_exception());
}
});
}
void stop()
{
active_ = false;
lock_guard<mutex> lock(threadListMutex_);
for (auto& t: threads_) {
if (t.joinable() && t.get_id() != this_thread::get_id()) {
t.join();
}
}
}
private:
atomic<bool> active_{ true };
mutex threadListMutex_;
vector<thread> threads_;
};
Aparently, the UnboundedExecutor
is a much more powerful executor, able to handle more complex std::future
dependencies (see Comparing to other Executors) at the expense of utilizing more resources.
A full example that implements and uses the executors above can be found in examples/executors.cpp
.
The library's DefaultExecutor
is based on the PollingExecutor
strategy, which is implemented as a template class with the actual functors that invoke the polling and dispatching threads as template parameters.
The PollingExecutor
is defined as follows:
template<class TPollFunctor, class TDispatchFunctor>
class PollingExecutor : public Executor {
public:
PollingExecutor(std::chrono::microseconds q);
PollingExecutor(std::chrono::microseconds q,
TPollFunctor&& pollFunc,
TDispatchFunctor&& dispatchFunc);
};
Then, the DefaultExecutor
, used in all the examples and tests within the thousandeyes::futures
library, is defined as follows:
using DefaultExecutor = PollingExecutor<detail::InvokerWithNewThread,
detail::InvokerWithSingleThread>;
As seen in section Performance, the DefaultExecutor
with a reasonable q
parameter, achieves very good real-world performance with very low resource utilization. The invokers, used by the DefaultExecutor
, however, may not be appropriate for every project, especially when a project is using its own thread-pools or uses the thread-pools of a third-party library. In those cases, the PollingExecutor
can be integrated via a custom implementation of the Invoker
concept.
An Invoker
implementation must be a functor with start()
and stop()
methods like below:
struct Invoker {
void start();
void stop();
void operator()(std::function<void()> f);
};
A real world Invoker
that enables the PollingExecutor
to use boost::asio
-based thread-pools can be simply defined as follows:
class AsioInvoker {
public:
explicit AsioInvoker(boost::asio::io_service& ioService) :
ioService_(ioService)
{}
inline void start() {}
inline void stop() {}
inline void operator()(std::function<void()> f)
{
ioService_.post(std::move(f));
}
private:
boost::asio::io_service& ioService_;
};
The way the AsioInvoker
above can be used by the PollingExecutor
is shown in a complete example in the next subsection (see Using the library with boost::asio).
The implementation of the invokers used by the DefaultExecutor
can be seen in the following source files:
detail/InvokerWithNewThread.h
detail/InvokerWithSingleThread.h
As mentioned before, the library's PollingExecutor
can be easily extended to use other third party threads and thread-pools for the polling the input futures and invoking the continuations.
This is the full implementation of the DefaultPollingExecutor
that uses boost::asio::io_service
-based thread-pool to start the monitoring thread and dispatch the continuations:
#pragma once
#include <chrono>
#include <functional>
#include <utility>
#include <boost/asio/io_service.hpp>
#include <thousandeyes/futures/PollingExecutor.h>
class AsioInvoker {
public:
explicit AsioInvoker(boost::asio::io_service& ioService) :
ioService_(ioService)
{}
inline void start() {}
inline void stop() {}
inline void operator()(std::function<void()> f)
{
ioService_.post(std::move(f));
}
private:
boost::asio::io_service& ioService_;
};
class DefaultPollingExecutor :
public thousandeyes::futures::PollingExecutor<AsioInvoker, AsioInvoker> {
public:
DefaultPollingExecutor(std::chrono::microseconds q,
boost::asio::io_service& pollingIoService,
boost::asio::io_service& dispatchIoService) :
PollingExecutor(std::move(q),
detail::AsioInvoker(pollingIoService),
detail::AsioInvoker(dispatchIoService))
{}
};
Moreover, boost::asio
can return std::future
objects from all its asynchronous operations by using the boost::asio::use_future
directive. In the example below, both t
and endpoints
are std::future<>
objects:
boost::asio::io_service io;
boost::asio::steady_timer timer(io);
timer.expires_after(seconds(1));
future<void> t = timer.async_wait(boost::asio::use_future);
boost::asio::udp::resolver resolver(io);
auto endpoints = resolver.async_resolve(boost::asio::udp::v4(),
"host.example.com",
"daytime",
boost::asio::use_future);
A more complete example is provided by boost::asio
at the link below:
https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/example/cpp11/futures/daytime_client.cpp
Of course, all std::future<>
objects, returned by boost::asio
methods and functions can be used by thousandeyes::futures
via its then()
and all()
functions.
In certain use-cases it may be impossible to move the ownership of a container of futures to the library. Furthermore a container may want to encapsulate std::future
objects as members in its internal structures. For those cases, the thousandeyes::futures::all()
function has an overload that accepts iterators to futures as arguments.
Then, iterator adapters can be used to adapt iterators to internal structures to iterators to futures as required by the library. The complete example below, which uses the boost::iterator::transform_iterator
type shows how everything must be set up for these particular use cases:
#include <functional>
#include <future>
#include <string>
#include <memory>
#include <boost/iterator/transform_iterator.hpp>
#include <thousandeyes/futures/DefaultExecutor.h>
#include <thousandeyes/futures/all.h>
#include <thousandeyes/futures/then.h>
using boost::transform_iterator;
using boost::make_transform_iterator;
using namespace std;
using namespace std::chrono;
using namespace thousandeyes::futures;
template<class T>
future<T> getValueAsync(const T& value)
{
return std::async(std::launch::async, [value]() {
return value;
});
}
int main(int argc, const char* argv[])
{
auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Default<Executor>::Setter execSetter(executor);
typedef tuple<string, future<int>, string, bool> FancyStruct;
int targetSum = 0;
vector<FancyStruct> fancyStructs;
for (int i = 0; i < 1821; ++i) {
targetSum += i;
fancyStructs.push_back(make_tuple("A user-friendly name",
getValueAsync(i),
"a-usr-friednly-id-" + to_string(i),
false));
}
auto convert = [](const FancyStruct& s) -> const future<int>& {
return get<1>(s);
};
typedef transform_iterator<function<const future<int>&(const FancyStruct&)>,
vector<FancyStruct>::iterator> Iter;
Iter first(fancyStructs.begin(), convert);
Iter last(fancyStructs.end(), convert);
// Note: vector<FancyStruct> has to stay alive until the all() future becomes ready
auto f = then(all(first, last), [](future<tuple<Iter, Iter>> f) {
int sum = 0;
auto range = f.get();
for (auto iter = get<0>(range); iter != get<1>(range); ++iter) {
const auto& baseIter = iter.base();
int i = get<1>(*baseIter).get();
sum += i;
}
return sum;
});
int result = f.get(); // result == 0 + 1 + 2 + ... + 1820
executor->stop();
}
If you'd like to contribute, please fork the repository and use a feature branch. Pull requests are welcome.
The code in this project is licensed under the MIT license.