diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index 81a06cdb79a4f2..45b20c939ebf5c 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -25,12 +25,153 @@ static struct PyModuleDef queuemodule; #define simplequeue_get_state_by_type(type) \ (simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule))) +static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8; + +typedef struct { + // Where to place the next item + Py_ssize_t put_idx; + + // Where to get the next item + Py_ssize_t get_idx; + + PyObject **items; + + // Total number of items that may be stored + Py_ssize_t items_cap; + + // Number of items stored + Py_ssize_t num_items; +} RingBuf; + +static int +RingBuf_Init(RingBuf *buf) +{ + buf->put_idx = 0; + buf->get_idx = 0; + buf->items_cap = INITIAL_RING_BUF_CAPACITY; + buf->num_items = 0; + buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *)); + if (buf->items == NULL) { + PyErr_NoMemory(); + return -1; + } + return 0; +} + +static PyObject * +RingBuf_At(RingBuf *buf, Py_ssize_t idx) +{ + assert(idx >= 0 && idx < buf->num_items); + return buf->items[(buf->get_idx + idx) % buf->items_cap]; +} + +static void +RingBuf_Fini(RingBuf *buf) +{ + PyObject **items = buf->items; + Py_ssize_t num_items = buf->num_items; + Py_ssize_t cap = buf->items_cap; + Py_ssize_t idx = buf->get_idx; + buf->items = NULL; + buf->put_idx = 0; + buf->get_idx = 0; + buf->num_items = 0; + buf->items_cap = 0; + for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) { + Py_CLEAR(items[idx]); + } + PyMem_Free(items); +} + +// Resize the underlying items array of buf to the new capacity and arrange +// the items contiguously in the new items array. +// +// Returns -1 on allocation failure or 0 on success. +static int +resize_ringbuf(RingBuf *buf, Py_ssize_t capacity) +{ + Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity); + if (new_capacity == buf->items_cap) { + return 0; + } + assert(buf->num_items <= new_capacity); + + PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *)); + if (new_items == NULL) { + return -1; + } + + // Copy the "tail" of the old items array. This corresponds to "head" of + // the abstract ring buffer. + Py_ssize_t tail_size = + Py_MIN(buf->num_items, buf->items_cap - buf->get_idx); + if (tail_size > 0) { + memcpy(new_items, buf->items + buf->get_idx, + tail_size * sizeof(PyObject *)); + } + + // Copy the "head" of the old items array, if any. This corresponds to the + // "tail" of the abstract ring buffer. + Py_ssize_t head_size = buf->num_items - tail_size; + if (head_size > 0) { + memcpy(new_items + tail_size, buf->items, + head_size * sizeof(PyObject *)); + } + + PyMem_Free(buf->items); + buf->items = new_items; + buf->items_cap = new_capacity; + buf->get_idx = 0; + buf->put_idx = buf->num_items; + + return 0; +} + +// Returns an owned reference +static PyObject * +RingBuf_Get(RingBuf *buf) +{ + assert(buf->num_items > 0); + + if (buf->num_items < (buf->items_cap / 4)) { + // Items is less than 25% occupied, shrink it by 50%. This allows for + // growth without immediately needing to resize the underlying items + // array. + // + // It's safe it ignore allocation failures here; shrinking is an + // optimization that isn't required for correctness. + resize_ringbuf(buf, buf->items_cap / 2); + } + + PyObject *item = buf->items[buf->get_idx]; + buf->items[buf->get_idx] = NULL; + buf->get_idx = (buf->get_idx + 1) % buf->items_cap; + buf->num_items--; + return item; +} + +// Returns 0 on success or -1 if the buffer failed to grow +static int +RingBuf_Put(RingBuf *buf, PyObject *item) +{ + if (buf->num_items == buf->items_cap) { + // Buffer is full, grow it. + if (resize_ringbuf(buf, buf->items_cap * 2) < 0) { + PyErr_NoMemory(); + return -1; + } + } + buf->items[buf->put_idx] = Py_NewRef(item); + buf->put_idx = (buf->put_idx + 1) % buf->items_cap; + buf->num_items++; + return 0; +} + typedef struct { PyObject_HEAD PyThread_type_lock lock; int locked; - PyObject *lst; - Py_ssize_t lst_pos; + RingBuf buf; PyObject *weakreflist; } simplequeueobject; @@ -43,7 +184,7 @@ class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(ty static int simplequeue_clear(simplequeueobject *self) { - Py_CLEAR(self->lst); + RingBuf_Fini(&self->buf); return 0; } @@ -69,7 +210,10 @@ simplequeue_dealloc(simplequeueobject *self) static int simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg) { - Py_VISIT(self->lst); + RingBuf *buf = &self->buf; + for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) { + Py_VISIT(RingBuf_At(buf, i)); + } Py_VISIT(Py_TYPE(self)); return 0; } @@ -90,15 +234,13 @@ simplequeue_new_impl(PyTypeObject *type) self = (simplequeueobject *) type->tp_alloc(type, 0); if (self != NULL) { self->weakreflist = NULL; - self->lst = PyList_New(0); self->lock = PyThread_allocate_lock(); - self->lst_pos = 0; if (self->lock == NULL) { Py_DECREF(self); PyErr_SetString(PyExc_MemoryError, "can't allocate lock"); return NULL; } - if (self->lst == NULL) { + if (RingBuf_Init(&self->buf) < 0) { Py_DECREF(self); return NULL; } @@ -126,7 +268,7 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, /*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/ { /* BEGIN GIL-protected critical section */ - if (PyList_Append(self->lst, item) < 0) + if (RingBuf_Put(&self->buf, item) < 0) return NULL; if (self->locked) { /* A get() may be waiting, wake it up */ @@ -155,33 +297,6 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item) return _queue_SimpleQueue_put_impl(self, item, 0, Py_None); } -static PyObject * -simplequeue_pop_item(simplequeueobject *self) -{ - Py_ssize_t count, n; - PyObject *item; - - n = PyList_GET_SIZE(self->lst); - assert(self->lst_pos < n); - - item = PyList_GET_ITEM(self->lst, self->lst_pos); - Py_INCREF(Py_None); - PyList_SET_ITEM(self->lst, self->lst_pos, Py_None); - self->lst_pos += 1; - count = n - self->lst_pos; - if (self->lst_pos > count) { - /* The list is more than 50% empty, reclaim space at the beginning */ - if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) { - /* Undo pop */ - self->lst_pos -= 1; - PyList_SET_ITEM(self->lst, self->lst_pos, item); - return NULL; - } - self->lst_pos = 0; - } - return item; -} - /*[clinic input] _queue.SimpleQueue.get @@ -249,7 +364,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, * So we simply try to acquire the lock in a loop, until the condition * (queue non-empty) becomes true. */ - while (self->lst_pos == PyList_GET_SIZE(self->lst)) { + while (self->buf.num_items == 0) { /* First a simple non-blocking try without releasing the GIL */ r = PyThread_acquire_lock_timed(self->lock, 0, 0); if (r == PY_LOCK_FAILURE && microseconds != 0) { @@ -279,8 +394,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, } /* BEGIN GIL-protected critical section */ - assert(self->lst_pos < PyList_GET_SIZE(self->lst)); - item = simplequeue_pop_item(self); + item = RingBuf_Get(&self->buf); if (self->locked) { PyThread_release_lock(self->lock); self->locked = 0; @@ -320,7 +434,7 @@ static int _queue_SimpleQueue_empty_impl(simplequeueobject *self) /*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/ { - return self->lst_pos == PyList_GET_SIZE(self->lst); + return self->buf.num_items == 0; } /*[clinic input] @@ -333,7 +447,7 @@ static Py_ssize_t _queue_SimpleQueue_qsize_impl(simplequeueobject *self) /*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/ { - return PyList_GET_SIZE(self->lst) - self->lst_pos; + return self->buf.num_items; } static int