From f6c2da083683942c2adbe40f2b85f7099c4b035d Mon Sep 17 00:00:00 2001 From: Matt Page Date: Mon, 16 Sep 2024 15:35:29 -0700 Subject: [PATCH] gh-123940: Ensure force-terminated daemon threads can be joined During finalization, daemon threads are force to exit immediately (without returning through the call-stack normally) upon acquiring the GIL. Finalizers that run after this must be able to join the forcefully terminated threads. The current implemenation notified of thread completetion before returning from `thread_run`. This code will never execute if the thread is forced to exit during finalization. Any code that attempts to join such a thread will block indefinitely. To fix this, use the old approach of notifying of thread completion when the thread state is cleared. This happens both when `thread_run` exits normally and when thread states are destroyed as part of finalization (which happens immediately after forcing daemon threads to exit, before any python code can run). --- Include/cpython/pystate.h | 3 ++ Include/internal/pycore_lock.h | 10 +++++++ Include/internal/pycore_pystate.h | 3 ++ Lib/test/test_threading.py | 33 +++++++++++++++++++++ Modules/_threadmodule.c | 34 +++++++++++++++------- Python/lock.c | 24 ++++++++++++++++ Python/pystate.c | 48 ++++++++++++++++++++++++++----- 7 files changed, 137 insertions(+), 18 deletions(-) diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h index f005729fff11b68..344395051145430 100644 --- a/Include/cpython/pystate.h +++ b/Include/cpython/pystate.h @@ -200,6 +200,9 @@ struct _ts { The PyThreadObject must hold the only reference to this value. */ PyObject *threading_local_sentinel; + + /* Set when the thread is about to exit */ + struct _PyEventRc *thread_is_exiting; }; #ifdef Py_DEBUG diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h index e6da083b807ce5b..971f7a54a631e3d 100644 --- a/Include/internal/pycore_lock.h +++ b/Include/internal/pycore_lock.h @@ -93,6 +93,16 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt); PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns, int detach); +// A one-time event notification with reference counting +typedef struct _PyEventRc { + PyEvent event; + Py_ssize_t refcount; +} _PyEventRc; + +extern _PyEventRc *_PyEventRc_New(void); +extern void _PyEventRc_Incref(_PyEventRc *erc); +extern void _PyEventRc_Decref(_PyEventRc *erc); + // _PyRawMutex implements a word-sized mutex that that does not depend on the // parking lot API, and therefore can be used in the parking lot // implementation. diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h index fade55945b7dbf3..d221cc1ddf0c8c1 100644 --- a/Include/internal/pycore_pystate.h +++ b/Include/internal/pycore_pystate.h @@ -223,6 +223,9 @@ extern void _PyThreadState_Bind(PyThreadState *tstate); PyAPI_FUNC(PyThreadState *) _PyThreadState_NewBound( PyInterpreterState *interp, int whence); +extern PyThreadState * +_PyThreadState_NewWithEvent(PyInterpreterState *interp, int whence, + _PyEventRc *exiting_event); extern PyThreadState * _PyThreadState_RemoveExcept(PyThreadState *tstate); extern void _PyThreadState_DeleteList(PyThreadState *list); extern void _PyThreadState_ClearMimallocHeaps(PyThreadState *tstate); diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 329767aa82e336b..e36fa896f3efcef 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1171,6 +1171,39 @@ def __del__(self): self.assertEqual(out.strip(), b"OK") self.assertIn(b"can't create new thread at interpreter shutdown", err) + @unittest.skipIf(support.Py_GIL_DISABLED, "daemon threads don't force exit") + def test_join_force_terminated_daemon_thread_in_finalization(self): + # gh-123940: Py_Finalize() forces all daemon threads to exit + # immediately (without unwinding the stack) upon acquiring the + # GIL. Finalizers that run after this must be able to join the daemon + # threads that were forced to exit. + code = textwrap.dedent(""" + import threading + + + def loop(): + while True: + pass + + + class Cycle: + def __init__(self): + self.self_ref = self + self.thr = threading.Thread(target=loop, daemon=True) + self.thr.start() + + def __del__(self): + self.thr.join() + + # Cycle holds a reference to itself, which ensures it is cleaned + # up during the GC that runs after daemon threads have been + # forced to exit during finalization. + Cycle() + """) + rc, out, err = assert_python_ok("-c", code) + self.assertEqual(err, b"") + + class ThreadJoinOnShutdown(BaseTestCase): def _run_and_join(self, script): diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index b3ed8e7bc56b9ea..b8a3e72c82f4ae5 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -96,7 +96,7 @@ typedef struct { // thread is about to exit. This is used to avoid false positives when // detecting self-join attempts. See the comment in `ThreadHandle_join()` // for a more detailed explanation. - PyEvent thread_is_exiting; + _PyEventRc *thread_is_exiting; // Serializes calls to `join` and `set_done`. _PyOnceFlag once; @@ -174,6 +174,12 @@ remove_from_shutdown_handles(ThreadHandle *handle) static ThreadHandle * ThreadHandle_new(void) { + _PyEventRc *exiting = _PyEventRc_New(); + if (exiting == NULL) { + PyErr_NoMemory(); + return NULL; + } + ThreadHandle *self = (ThreadHandle *)PyMem_RawCalloc(1, sizeof(ThreadHandle)); if (self == NULL) { @@ -183,7 +189,7 @@ ThreadHandle_new(void) self->ident = 0; self->os_handle = 0; self->has_os_handle = 0; - self->thread_is_exiting = (PyEvent){0}; + self->thread_is_exiting = exiting; self->mutex = (PyMutex){_Py_UNLOCKED}; self->once = (_PyOnceFlag){0}; self->state = THREAD_HANDLE_NOT_STARTED; @@ -226,6 +232,8 @@ ThreadHandle_decref(ThreadHandle *self) return; } + _PyEventRc_Decref(self->thread_is_exiting); + // Remove ourself from the global list of handles HEAD_LOCK(&_PyRuntime); if (self->node.next != NULL) { @@ -268,7 +276,7 @@ _PyThread_AfterFork(struct _pythread_runtime_state *state) handle->state = THREAD_HANDLE_DONE; handle->once = (_PyOnceFlag){_Py_ONCE_INITIALIZED}; handle->mutex = (PyMutex){_Py_UNLOCKED}; - _PyEvent_Notify(&handle->thread_is_exiting); + _PyEvent_Notify(&handle->thread_is_exiting->event); llist_remove(node); remove_from_shutdown_handles(handle); } @@ -357,8 +365,6 @@ thread_run(void *boot_raw) exit: // Don't need to wait for this thread anymore remove_from_shutdown_handles(handle); - - _PyEvent_Notify(&handle->thread_is_exiting); ThreadHandle_decref(handle); // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with @@ -371,7 +377,7 @@ static int force_done(ThreadHandle *handle) { assert(get_thread_handle_state(handle) == THREAD_HANDLE_STARTING); - _PyEvent_Notify(&handle->thread_is_exiting); + _PyEvent_Notify(&handle->thread_is_exiting->event); set_thread_handle_state(handle, THREAD_HANDLE_DONE); return 0; } @@ -402,7 +408,8 @@ ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args, goto start_failed; } PyInterpreterState *interp = _PyInterpreterState_GET(); - boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING); + boot->tstate = _PyThreadState_NewWithEvent( + interp, _PyThreadState_WHENCE_THREADING, self->thread_is_exiting); if (boot->tstate == NULL) { PyMem_RawFree(boot); if (!PyErr_Occurred()) { @@ -492,7 +499,7 @@ ThreadHandle_join(ThreadHandle *self, PyTime_t timeout_ns) // To work around this, we set `thread_is_exiting` immediately before // `thread_run` returns. We can be sure that we are not attempting to join // ourselves if the handle's thread is about to exit. - if (!_PyEvent_IsSet(&self->thread_is_exiting) && + if (!_PyEvent_IsSet(&self->thread_is_exiting->event) && ThreadHandle_ident(self) == PyThread_get_thread_ident_ex()) { // PyThread_join_thread() would deadlock or error out. PyErr_SetString(ThreadError, "Cannot join current thread"); @@ -502,7 +509,8 @@ ThreadHandle_join(ThreadHandle *self, PyTime_t timeout_ns) // Wait until the deadline for the thread to exit. PyTime_t deadline = timeout_ns != -1 ? _PyDeadline_Init(timeout_ns) : 0; int detach = 1; - while (!PyEvent_WaitTimed(&self->thread_is_exiting, timeout_ns, detach)) { + while (!PyEvent_WaitTimed(&self->thread_is_exiting->event, timeout_ns, + detach)) { if (deadline) { // _PyDeadline_Get will return a negative value if the deadline has // been exceeded. @@ -537,7 +545,7 @@ set_done(ThreadHandle *handle) PyErr_SetString(ThreadError, "failed detaching handle"); return -1; } - _PyEvent_Notify(&handle->thread_is_exiting); + _PyEvent_Notify(&handle->thread_is_exiting->event); set_thread_handle_state(handle, THREAD_HANDLE_DONE); return 0; } @@ -649,7 +657,7 @@ static PyObject * PyThreadHandleObject_is_done(PyThreadHandleObject *self, PyObject *Py_UNUSED(ignored)) { - if (_PyEvent_IsSet(&self->handle->thread_is_exiting)) { + if (_PyEvent_IsSet(&self->handle->thread_is_exiting->event)) { Py_RETURN_TRUE; } else { @@ -2419,6 +2427,10 @@ thread__make_thread_handle(PyObject *module, PyObject *identobj) if (PyErr_Occurred()) { return NULL; } + _PyEventRc *exiting_event = _PyEventRc_New(); + if (exiting_event == NULL) { + return NULL; + } PyThreadHandleObject *hobj = PyThreadHandleObject_new(state->thread_handle_type); if (hobj == NULL) { diff --git a/Python/lock.c b/Python/lock.c index 57675fe1873fa28..67e970f20824f81 100644 --- a/Python/lock.c +++ b/Python/lock.c @@ -294,6 +294,30 @@ PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns, int detach) } } +_PyEventRc * +_PyEventRc_New(void) +{ + _PyEventRc *erc = (_PyEventRc *)PyMem_RawCalloc(1, sizeof(_PyEventRc)); + if (erc != NULL) { + erc->refcount = 1; + } + return erc; +} + +void +_PyEventRc_Incref(_PyEventRc *erc) +{ + _Py_atomic_add_ssize(&erc->refcount, 1); +} + +void +_PyEventRc_Decref(_PyEventRc *erc) +{ + if (_Py_atomic_add_ssize(&erc->refcount, -1) == 1) { + PyMem_RawFree(erc); + } +} + static int unlock_once(_PyOnceFlag *o, int res) { diff --git a/Python/pystate.c b/Python/pystate.c index 54caf373e91d6cf..a4bb696d3f508d9 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1458,8 +1458,8 @@ free_threadstate(_PyThreadStateImpl *tstate) */ static void -init_threadstate(_PyThreadStateImpl *_tstate, - PyInterpreterState *interp, uint64_t id, int whence) +init_threadstate(_PyThreadStateImpl *_tstate, PyInterpreterState *interp, + uint64_t id, int whence, _PyEventRc *exiting_event) { PyThreadState *tstate = (PyThreadState *)_tstate; if (tstate->_status.initialized) { @@ -1467,6 +1467,7 @@ init_threadstate(_PyThreadStateImpl *_tstate, } assert(interp != NULL); + tstate->thread_is_exiting = exiting_event; tstate->interp = interp; tstate->eval_breaker = _Py_atomic_load_uintptr_relaxed(&interp->ceval.instrumentation_version); @@ -1530,8 +1531,19 @@ add_threadstate(PyInterpreterState *interp, PyThreadState *tstate, interp->threads.head = tstate; } +static _PyEventRc * +ensure_event(_PyEventRc *exiting_event) +{ + if (exiting_event != NULL) { + _PyEventRc_Incref(exiting_event); + return exiting_event; + } + return _PyEventRc_New(); +} + static PyThreadState * -new_threadstate(PyInterpreterState *interp, int whence) +new_threadstate(PyInterpreterState *interp, int whence, + _PyEventRc *exiting_event) { _PyThreadStateImpl *tstate; _PyRuntimeState *runtime = interp->runtime; @@ -1544,10 +1556,18 @@ new_threadstate(PyInterpreterState *interp, int whence) if (new_tstate == NULL) { return NULL; } + + exiting_event = ensure_event(exiting_event); + if (exiting_event == NULL) { + PyMem_RawFree(new_tstate); + return NULL; + } + #ifdef Py_GIL_DISABLED Py_ssize_t qsbr_idx = _Py_qsbr_reserve(interp); if (qsbr_idx < 0) { PyMem_RawFree(new_tstate); + _PyEventRc_Decref(exiting_event); return NULL; } #endif @@ -1578,7 +1598,7 @@ new_threadstate(PyInterpreterState *interp, int whence) sizeof(*tstate)); } - init_threadstate(tstate, interp, id, whence); + init_threadstate(tstate, interp, id, whence, exiting_event); add_threadstate(interp, (PyThreadState *)tstate, old_head); HEAD_UNLOCK(runtime); @@ -1606,7 +1626,7 @@ PyThreadState_New(PyInterpreterState *interp) PyThreadState * _PyThreadState_NewBound(PyInterpreterState *interp, int whence) { - PyThreadState *tstate = new_threadstate(interp, whence); + PyThreadState *tstate = new_threadstate(interp, whence, NULL); if (tstate) { bind_tstate(tstate); // This makes sure there's a gilstate tstate bound @@ -1622,7 +1642,14 @@ _PyThreadState_NewBound(PyInterpreterState *interp, int whence) PyThreadState * _PyThreadState_New(PyInterpreterState *interp, int whence) { - return new_threadstate(interp, whence); + return new_threadstate(interp, whence, NULL); +} + +PyThreadState * +_PyThreadState_NewWithEvent(PyInterpreterState *interp, int whence, + _PyEventRc *exiting_event) +{ + return new_threadstate(interp, whence, exiting_event); } // We keep this for stable ABI compabibility. @@ -1732,6 +1759,13 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(tstate->context); + if (tstate->thread_is_exiting != NULL) { + _PyEventRc *erc = tstate->thread_is_exiting; + tstate->thread_is_exiting = NULL; + _PyEvent_Notify(&erc->event); + _PyEventRc_Decref(erc); + } + #ifdef Py_GIL_DISABLED // Each thread should clear own freelists in free-threading builds. struct _Py_freelists *freelists = _Py_freelists_GET(); @@ -2760,7 +2794,7 @@ PyGILState_Ensure(void) /* Create a new Python thread state for this thread */ // XXX Use PyInterpreterState_EnsureThreadState()? tcur = new_threadstate(runtime->gilstate.autoInterpreterState, - _PyThreadState_WHENCE_GILSTATE); + _PyThreadState_WHENCE_GILSTATE, NULL); if (tcur == NULL) { Py_FatalError("Couldn't create thread-state for new thread"); }