Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROTON-2792: [C++] check that scheduled tasks are active under lock #422

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/include/proton/work_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class work {
/// **Unsettled API**
///
/// Execute the piece of work
void operator()() { item_(); }
void operator()() const { item_(); }

~work() = default;

Expand Down
120 changes: 63 additions & 57 deletions cpp/src/container_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "proton/sender_options.hpp"
#include "proton/work_queue.hpp"

#include <chrono>
#include <cstdlib>
#include <ctime>
#include <string>
Expand All @@ -41,7 +42,6 @@
#include <mutex>
#include <condition_variable>


namespace {

std::string make_url(std::string host, int port) {
Expand Down Expand Up @@ -270,17 +270,13 @@ int test_container_stop() {

struct hang_tester : public proton::messaging_handler {
proton::listener listener;
bool done;

hang_tester() : done(false) {}
bool done = false;

void connect(proton::container* c) {
c->connect(make_url("", listener.port()));
}
hang_tester() = default;

void on_container_start(proton::container& c) override {
listener = c.listen("//:0");
c.schedule(proton::duration(250), proton::make_work(&hang_tester::connect, this, &c));
c.schedule(proton::duration(250), [&](){ c.connect(make_url("", listener.port())); });
}

void on_connection_open(proton::connection& c) override {
Expand Down Expand Up @@ -323,10 +319,8 @@ int test_container_pre_stop() {


struct schedule_tester : public proton::messaging_handler {
void stop(proton::container* c) { c->stop(); }

void on_container_start(proton::container& c) override {
c.schedule(proton::duration(250), proton::make_work(&schedule_tester::stop, this, &c));
c.schedule(proton::duration(250), [&](){ c.stop(); });
}
};

Expand Down Expand Up @@ -577,55 +571,66 @@ void test_container_mt_close_race() {
class schedule_cancel : public proton::messaging_handler {
proton::listener listener;
test_listen_handler listen_handler;
long long w1_handle;
long long w2_handle;
long long w3_handle;
long long w4_handle;
long long w5_handle;

void change_w1_state(proton::container* c) {
w1_state = 1;
}

void change_w2_state(proton::container* c) {
w2_state = 1;
}
proton::work_handle w4_handle;
proton::work_handle w5_handle;

void change_w3_state(proton::container* c) {
w3_state = 1;
}

void change_w4_state(proton::container* c) {
w4_state = 1;
}

void change_w5_state(proton::container* c) {
w5_state = 1;
}
public:
int w1_state = 0;
int w2_state = 0;
int w3_state = 0;
int w4_state = 0;
int w5_state = 0;

void on_container_start(proton::container& c) override {
ASSERT(w1_state==0);
ASSERT(w2_state==0);
ASSERT(w3_state==0);
ASSERT(w4_state==0);
ASSERT(w5_state==0);

listener = c.listen("//:0", listen_handler);

// We will cancel this scheduled task before its execution.
w1_handle = c.schedule(proton::duration(250), proton::make_work(&schedule_cancel::change_w1_state, this, &c));
auto w1_handle = c.schedule(proton::duration(250),
[this](){
w1_state = 1;
});

// We will cancel this scheduled task before its execution and will try to cancel it again.
w2_handle = c.schedule(proton::duration(260), proton::make_work(&schedule_cancel::change_w2_state, this, &c));

// We will not cancel this scheduled task.
w3_handle = c.schedule(proton::duration(35), proton::make_work(&schedule_cancel::change_w3_state, this, &c));
auto w2_handle = c.schedule(proton::duration(260),
[this](){
w2_state = 1;
});

// Attempt to make sure that we can cancel a task from a previous task even if the
// previous task gets delayed and scheduled in the same batch as the task to be cancelled.

// Set up task to cancel
auto w3_handle = c.schedule(proton::duration(37),
[this](){
w3_state = 3;
});

// This task overruns and so forces the next 2 tasks to be scheduled together
c.schedule(proton::duration(35),
[this](){
w3_state = 1;
std::this_thread::sleep_for(std::chrono::milliseconds(20));
});

// This should successfully cancel the first scheduled task and so leave w3_state at 2
c.schedule(proton::duration(36),
[&c, w3_handle, this](){
ASSERT(w3_state==1);
w3_state = 2;
c.cancel(w3_handle);
});

// We will try to cancel this task before its execution from different thread i.e connection thread.
w4_handle = c.schedule(proton::duration(270), proton::make_work(&schedule_cancel::change_w4_state, this, &c));
w4_handle = c.schedule(proton::duration(270),
[this](){
w4_state = 1;
});

// We will try to cancel this task after its execution from different thread i.e. connection thread.
w5_handle = c.schedule(proton::duration(0), proton::make_work(&schedule_cancel::change_w5_state, this, &c));
w5_handle = c.schedule(proton::duration(0),
[this](){
w5_state = 1;
});

// Cancel the first scheduled task.
c.cancel(w1_handle);
Expand Down Expand Up @@ -663,22 +668,23 @@ class schedule_cancel : public proton::messaging_handler {
// the container stops.
}

public:
schedule_cancel(): w1_state(0), w2_state(0), w3_state(0), w4_state(0), w5_state(0) {}

int w1_state;
int w2_state;
int w3_state;
int w4_state;
int w5_state;
schedule_cancel() = default;
};

int test_container_schedule_cancel() {
schedule_cancel t;

ASSERT(t.w1_state==0);
ASSERT(t.w2_state==0);
ASSERT(t.w3_state==0);
ASSERT(t.w4_state==0);
ASSERT(t.w5_state==0);

proton::container(t).run(2);

ASSERT(t.w1_state==0); // The value of w1_state remained 0 because we cancelled the associated task before its execution.
ASSERT(t.w2_state==0); // The value of w2_state remained 0 because we cancelled the associated task before its execution.
ASSERT(t.w3_state==1); // The value of w3_state changed to 1 because we hadn't cancelled this task.
ASSERT(t.w3_state==2); // The value of w3_state changed to 2 because we set this in the second callback, but the third was cancelled
ASSERT(t.w4_state==0); // The value of w4_state remained 0 because we cancelled the associated task before its execution.
ASSERT(t.w5_state==1); // The value of w5_state changed to 1 because the task was already executed before we cancelled it.
return 0;
Expand Down
13 changes: 10 additions & 3 deletions cpp/src/proactor_container_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,16 @@ void container::impl::run_timer_jobs() {
// NB. We copied the due tasks in reverse order so execute from end

for (int i = tasks.size()-1; i>=0; --i) {
if(is_active_.count(tasks[i].w_handle)) {
tasks[i].task();
is_active_.erase(tasks[i].w_handle);
const auto& task = tasks[i];
bool active;

{
GUARD(deferred_lock_);
// NB. erase returns the number of items erased
active = is_active_.erase(task.w_handle);
}
if (active) {
task.task();
}
}
}
Expand Down
Loading