Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functions to interrupt a single thread #761

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions capi/geos_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,24 @@ extern "C" {
return geos::util::Interrupt::registerCallback(cb);
}

GEOSInterruptThreadCallback*
GEOS_interruptRegisterThreadCallback(GEOSInterruptThreadCallback* cb, void* data)
{
return geos::util::Interrupt::registerThreadCallback(cb, data);
}

void
GEOS_interruptRequest()
{
geos::util::Interrupt::request();
}

void
GEOS_interruptThread()
{
geos::util::Interrupt::requestForCurrentThread();
}

void
GEOS_interruptCancel()
{
Expand Down
42 changes: 37 additions & 5 deletions capi/geos_c.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ typedef int (*GEOSTransformXYCallback)(
/* ========== Interruption ========== */

/**
* Callback function for use in interruption. The callback will be invoked _before_ checking for
* interruption, so can be used to request it.
* Callback function for use in interruption. The callback will be invoked at each
* possible interruption point and can be used to request interruption.
*
* \see GEOS_interruptRegisterCallback
* \see GEOS_interruptRequest
Expand All @@ -306,19 +306,51 @@ typedef int (*GEOSTransformXYCallback)(
typedef void (GEOSInterruptCallback)(void);

/**
* Register a function to be called when processing is interrupted.
* Callback function for use in interruption. The callback will be invoked at each
* possible interruption point and can be used to request interruption.
*
* \see GEOS_interruptRegisterThreadCallback
* \see GEOS_interruptThread
*/
typedef void (GEOSInterruptThreadCallback)(void*);

/**
* Register a function to be called when a possible interruption point is reached
* on any thread. The function may be used to request interruption.
*
* \param cb Callback function to invoke
* \return the previously configured callback
* \return the previously registered callback, or NULL
* \see GEOSInterruptCallback
*/
extern GEOSInterruptCallback GEOS_DLL *GEOS_interruptRegisterCallback(
GEOSInterruptCallback* cb);

/**
* Request safe interruption of operations
* Register a function to be called when a possible interruption point is reached
* on the current thread. The function may be used to request interruption.
*
* \param cb Callback function to invoke
* \param context pointer to a context object that will be passed to `cb`
* \return the previously registered callback, or NULL
* \see GEOSInterruptCallback
*/
extern GEOSInterruptThreadCallback GEOS_DLL *GEOS_interruptRegisterThreadCallback(
GEOSInterruptThreadCallback* cb, void* context);

/**
* Request safe interruption of operations. The next thread to check for an
* interrupt will be interrupted. To request interruption of a specific thread,
* instead call GEOS_interruptThread() from a callback executed by that thread.
*/
extern void GEOS_DLL GEOS_interruptRequest(void);

/**
* Request safe interruption of operations in the current thread. This function
* should be called from a callback registered by GEOS_interruptRegisterThreadCallback()
* or GEOS_interruptRegisterCallback().
*/
extern void GEOS_DLL GEOS_interruptThread(void);

/**
* Cancel a pending interruption request
*/
Expand Down
30 changes: 26 additions & 4 deletions include/geos/util/Interrupt.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,55 @@ class GEOS_DLL Interrupt {
public:

typedef void (Callback)(void);
typedef void (ThreadCallback)(void*);

/**
* Request interruption of operations
*
* Operations will be terminated by a GEOSInterrupt
* exception at first occasion.
* exception at first occasion, by the first thread
* to check for an interrupt request.
*/
static void request();

/**
* Request interruption of operations in the current thread
*
* Operations in the current thread will be terminated by
* a GEOSInterrupt at first occasion.
*/
static void requestForCurrentThread();

/** Cancel a pending interruption request */
static void cancel();

/** Check if an interruption request is pending */
static bool check();

/** \brief
* Register a callback that will be invoked
* Register a callback that will be invoked by all threads
* before checking for interruption requests.
*
* NOTE that interruption request checking may happen
* frequently so any callback would better be quick.
* frequently so the callback should execute quickly.
*
* The callback can be used to call Interrupt::request()
*
* or Interrupt::requestForCurrentThread().
*/
static Callback* registerCallback(Callback* cb);

/** \brief
* Register a callback that will be invoked the current thread
* before checking for interruption requests.
*
* NOTE that interruption request checking may happen
* frequently so the callback should execute quickly.
*
* The callback can be used to call Interrupt::request()
* or Interrupt::requestForCurrentThread().
*/
static ThreadCallback* registerThreadCallback(ThreadCallback* cb, void* data);

/**
* Invoke the callback, if any. Process pending interruption, if any.
*
Expand Down
33 changes: 29 additions & 4 deletions src/util/Interrupt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
#include <geos/util/GEOSException.h> // for inheritance

namespace {
/* Could these be portably stored in thread-specific space ? */

// Callback and request status for interruption of any single thread
bool requested = false;
thread_local bool requested_for_thread = false;

// Callback and request status for interruption of a the current thread
geos::util::Interrupt::Callback* callback = nullptr;
thread_local geos::util::Interrupt::ThreadCallback* callback_thread = nullptr;
thread_local void* callback_thread_data = nullptr;

}

namespace geos {
Expand All @@ -37,16 +43,23 @@ Interrupt::request()
requested = true;
}

void
Interrupt::requestForCurrentThread()
{
requested_for_thread = true;
}

void
Interrupt::cancel()
{
requested = false;
requested_for_thread = false;
}

bool
Interrupt::check()
{
return requested;
return requested || requested_for_thread;
}

Interrupt::Callback*
Expand All @@ -57,14 +70,25 @@ Interrupt::registerCallback(Interrupt::Callback* cb)
return prev;
}

Interrupt::ThreadCallback*
Interrupt::registerThreadCallback(ThreadCallback* cb, void* data)
{
ThreadCallback* prev = callback_thread;
callback_thread = cb;
callback_thread_data = data;
return prev;
}

void
Interrupt::process()
{
if(callback) {
(*callback)();
}
if(requested) {
requested = false;
if(callback_thread) {
(*callback_thread)(callback_thread_data);
}
if(check()) {
interrupt();
}
}
Expand All @@ -74,6 +98,7 @@ void
Interrupt::interrupt()
{
requested = false;
requested_for_thread = false;
throw InterruptedException();
}

Expand Down
1 change: 1 addition & 0 deletions tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ foreach(_testfile ${_testfiles})
string(CONCAT _testname "geos::" ${_testname})
endif()
add_test(NAME unit-${_cmake_testname} COMMAND test_geos_unit ${_testname})
set_tests_properties(unit-${_cmake_testname} PROPERTIES TIMEOUT 30)
endforeach()

# Run all the unit tests in one go, for faster memory checking
Expand Down
46 changes: 46 additions & 0 deletions tests/unit/capi/GEOSInterruptTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <thread>

namespace tut {
//
Expand All @@ -18,6 +19,7 @@ namespace tut {
// Common data used in test cases.
struct test_capiinterrupt_data {
static int numcalls;
static int maxcalls;
static GEOSInterruptCallback* nextcb;

static void
Expand Down Expand Up @@ -56,9 +58,18 @@ struct test_capiinterrupt_data {
}
}

static void
interruptAfterMaxCalls(void* data)
{
if (++*static_cast<int*>(data) >= maxcalls) {
GEOS_interruptThread();
}
}

};

int test_capiinterrupt_data::numcalls = 0;
int test_capiinterrupt_data::maxcalls = 0;
GEOSInterruptCallback* test_capiinterrupt_data::nextcb = nullptr;

typedef test_group<test_capiinterrupt_data> group;
Expand Down Expand Up @@ -221,5 +232,40 @@ void object::test<5>
}


// Test callback is thread-local
template<>
template<>
void object::test<6>
()
{
maxcalls = 3;
int calls_1 = 0;
int calls_2 = 0;

initGEOS(notice, notice);

auto buffer = [](GEOSInterruptThreadCallback* cb, void* data) {
GEOSGeometry* geom1 = GEOSGeomFromWKT("LINESTRING (0 0, 1 0)");

GEOS_interruptRegisterThreadCallback(cb, data);

GEOSGeometry* geom2 = GEOSBuffer(geom1, 1, 8);
GEOSGeom_destroy(geom2);
GEOSGeom_destroy(geom1);
};

std::thread t1(buffer, interruptAfterMaxCalls, &calls_1);
std::thread t2(buffer, interruptAfterMaxCalls, &calls_2);

t1.join();
t2.join();

ensure_equals(calls_1, maxcalls);
ensure_equals(calls_2, maxcalls);

finishGEOS();
}


} // namespace tut

Loading