-
Notifications
You must be signed in to change notification settings - Fork 68
Channel Experiment
Sean Parent edited this page Feb 12, 2016
·
2 revisions
#include <stlab/channel.hpp>
#include <stlab/future.hpp>
#include <iostream>
#include <string>
#include <vector>
using namespace std;
using namespace stlab;
/**************************************************************************************************/
/*
sum is an example of an accumulating "co-routine". It will await for values, keeping an
internal sum, until the channel is closed and then it will yield the result as a string.
*/
struct sum {
process_state _state = process_state::await;
int _sum = 0;
void await(int n) { _sum += n; }
string yield() { _state = process_state::await; return to_string(_sum); }
void close() { _state = process_state::yield; }
process_state state() const { return _state; }
};
int main() {
channel<int> aggregate; // create a channel that we will use to accumulate values
/*
For all copies of the channel we need a single, common, receiver. To get that we
pipe the channel to a identity lambda. This currently creates an inneficency as this
identiy is going to be scheduled in the tasking system.
*/
auto receiver = aggregate | [](auto x){ return x; };
/*
Create a vector to hold all the futures for each result as it is piped to channel.
The future is of type <void> because the value is passed into the channel.
*/
vector<stlab::future<void>> results;
for (int n = 0; n != 10; ++n) {
// Asyncrounously generate a bunch of values.
results.emplace_back(async(default_scheduler(), [_n = n]{ return _n; })
// Then send those values into a copy of the channel
.then([_aggregate = aggregate](int n){ _aggregate(n); }));
}
// Now it is safe to close (or destruct) this channel, all the copies remain open.
aggregate.close();
auto pipe = receiver
/*
The receiver is our common end point - we attach the vector of futures to it (another)
inefficiency here - this is a lambda whose only purpose is to hold the vector of
futures.
*/
| [ _results = move(results) ](auto x){ return x; }
// Then we can pipe the values to our accumulator
| sum()
// And pipe the final value to a lambda to print it.
| [](string s){ cout << s << endl; };
// Wait for everthing to execute (just for demonstration)
sleep(100);
}