Skip to content

Commit

Permalink
Make sure new reducer target copies are marked mutable
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph Schuchart <[email protected]>
  • Loading branch information
devreal committed Dec 22, 2023
1 parent ece8cc3 commit db4f736
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 5 deletions.
2 changes: 1 addition & 1 deletion examples/t9/t9_streaming.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ auto make_reconstruct(const nodeEdge& in, nodeEdge& out, const std::string& name
}

// cannot easily replace this with wrapper due to persistent state
class Norm2 : public TT<Key, std::tuple<>, Norm2, ttg::typelist<Node>> {
class Norm2 : public TT<Key, std::tuple<>, Norm2, ttg::typelist<const Node>> {
using baseT = typename Norm2::ttT;
double sumsq;
std::mutex charon;
Expand Down
135 changes: 135 additions & 0 deletions tests/unit/streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


TEST_CASE("streams", "[streams][core]") {

// in distributed memory we must count how many messages the reducer will receive
SECTION("concurrent-stream-size") {
ttg::Edge<int, int> I2O;
Expand Down Expand Up @@ -72,4 +73,138 @@ TEST_CASE("streams", "[streams][core]") {
ttg::ttg_fence(ttg::default_execution_context());
CHECK(reduce_ops == N/nranks);
}

SECTION("streams-readonly-input") {
ttg::Edge<int, int> I2O;
ttg::Edge<int, int> O2S;
ttg::Edge<int, int> O2D;
const auto nranks = ttg::default_execution_context().size();

constexpr std::size_t N = 12000;
constexpr std::size_t SLICE = 600;
constexpr const timespec ts = { .tv_sec = 0, .tv_nsec = 10000 };
constexpr int VALUE = 1;
std::atomic<std::size_t> reduce_ops = 0;

auto dummy = ttg::make_tt([&](const int &n, const int &i){
CHECK(i == VALUE);
}, ttg::edges(O2D));

auto op = ttg::make_tt(
[&](const int &n, const int& i,
std::tuple<ttg::Out<int, int>, ttg::Out<int, int>> &outs) {
int key = n/SLICE;
nanosleep(&ts, nullptr);
ttg::send<1>(n, i, outs); // send to a dummy to check ref-counting
if (n < N-1) {
ttg::send<0>(key, i, outs);
//ttg::print("sent to sink ", key);
} else {
// set the size of the last reducer
if (N%SLICE > 0) {
ttg::set_size<0>(key, N%SLICE, outs);
std::cout << "set_size key " << key << " size " << N%SLICE << std::endl;
}
// forward the value
ttg::send<0>(key, i, outs);
//ttg::print("finalized last sink ", key);
}
},
ttg::edges(I2O), ttg::edges(O2S, O2D));

auto sink_op = ttg::make_tt(
[&](const int key, const int &value) {
std::cout << "sink " << key << std::endl;
if (!(value == SLICE || key == (N/SLICE))) {
std::cout << "SINK ERROR: key " << key << " value " << value << " SLICE " << SLICE << " N " << N << std::endl;
}
CHECK((value == SLICE || key == (N/SLICE)));
reduce_ops++;
},
ttg::edges(O2S), ttg::edges());

op->set_keymap([=](const auto &key) { return nranks - 1; });
op->set_trace_instance(true);
sink_op->set_input_reducer<0>([&](int &a, const int &b) {
a += 1; // we count invocations
CHECK(b == VALUE);
reduce_ops++;
}, SLICE);

make_graph_executable(op);
ttg::execute(ttg::default_execution_context());
if (ttg::default_execution_context().rank() == 0) {
for (std::size_t i = 0; i < N; ++i) {
op->invoke(i, VALUE);
}
}

ttg::ttg_fence(ttg::default_execution_context());
CHECK(reduce_ops == N/nranks);
}

SECTION("streams-temporary-input") {
ttg::Edge<int, int> I2O;
ttg::Edge<int, int> O2S;
const auto nranks = ttg::default_execution_context().size();

constexpr std::size_t N = 12000;
constexpr std::size_t SLICE = 600;
constexpr const timespec ts = { .tv_sec = 0, .tv_nsec = 10000 };
constexpr int VALUE = 1;
std::atomic<std::size_t> reduce_ops = 0;

auto op = ttg::make_tt(
[&](const int &n, const int& i,
std::tuple<ttg::Out<int, int>> &outs) {
int key = n/SLICE;
nanosleep(&ts, nullptr);
int tmp = i; // temporary data, not tracked
if (n < N-1) {
std::get<0>(outs).send(key, int{i});
//ttg::send<0>(key, int{i}, outs);
//ttg::print("sent to sink ", key);
} else {
// set the size of the last reducer
if (N%SLICE > 0) {
ttg::set_size<0>(key, N%SLICE, outs);
std::cout << "set_size key " << key << " size " << N%SLICE << std::endl;
}
// forward the value
ttg::send<0>(key, int{i}, outs);
//ttg::print("finalized last sink ", key);
}
},
ttg::edges(I2O), ttg::edges(O2S));

auto sink_op = ttg::make_tt(
[&](const int key, const int &value) {
std::cout << "sink " << key << std::endl;
if (!(value == SLICE || key == (N/SLICE))) {
std::cout << "SINK ERROR: key " << key << " value " << value << " SLICE " << SLICE << " N " << N << std::endl;
}
CHECK((value == SLICE || key == (N/SLICE)));
reduce_ops++;
},
ttg::edges(O2S), ttg::edges());

op->set_keymap([=](const auto &key) { return nranks - 1; });
op->set_trace_instance(true);
sink_op->set_input_reducer<0>([&](int &a, const int &b) {
a += 1; // we count invocations
CHECK(b == VALUE);
reduce_ops++;
}, SLICE);

make_graph_executable(op);
ttg::execute(ttg::default_execution_context());
if (ttg::default_execution_context().rank() == 0) {
for (std::size_t i = 0; i < N; ++i) {
op->invoke(i, VALUE);
}
}

ttg::ttg_fence(ttg::default_execution_context());
CHECK(reduce_ops == N/nranks);
}
} // TEST_CASE("streams")
13 changes: 9 additions & 4 deletions ttg/ttg/parsec/ttg.h
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,8 @@ ttg::abort(); // should not happen
break;
}
source_copy = ((detail::ttg_data_copy_self_t *)(item))->self;
assert(target_copy->num_readers() == target_copy->mutable_tag);
assert(source_copy->num_readers() > 0);
reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->get_ptr()),
*reinterpret_cast<std::decay_t<value_t> *>(source_copy->get_ptr()));
detail::release_data_copy(source_copy);
Expand Down Expand Up @@ -2359,6 +2361,9 @@ ttg::abort(); // should not happen
} else {
/* create a new copy */
copy = detail::create_new_datacopy(std::forward<Value>(value));
if (!is_const) {
copy->mark_mutable();
}
}
return copy;
};
Expand Down Expand Up @@ -2387,16 +2392,16 @@ ttg::abort(); // should not happen
detail::reducer_task_t *reduce_task;
reduce_task = create_new_reducer_task<i>(task, true);

/* protected by the bucket lock */
task->streams[i].size = 1;
task->streams[i].reduce_count.store(1, std::memory_order_relaxed);

/* get the copy to use as input for this task */
detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);

/* put the copy into the task */
task->copies[i] = copy;

/* protected by the bucket lock */
task->streams[i].size = 1;
task->streams[i].reduce_count.store(1, std::memory_order_relaxed);

/* release the task if we're not deferred
* TODO: can we delay that until we get the second value?
*/
Expand Down

0 comments on commit db4f736

Please sign in to comment.