Skip to content

Commit

Permalink
Issue #5. Return the info whether elements in mpmc queue expired afte…
Browse files Browse the repository at this point in the history
…r last reading
  • Loading branch information
ksenia-vazhdaeva committed Aug 22, 2024
1 parent 2c3591c commit 4b1a8fe
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 62 deletions.
18 changes: 11 additions & 7 deletions python/mpmc_queue_blocking_mixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@
data = [bytes(str(i), 'utf-8')]
q.add(data, no_gil=RELEASE_GIL)
v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == data
assert len(v[0]) == NUM
assert v[0] == data
assert not v[1]

end = time.time()

print("Time taken: %f" % (end - start))

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == 0
assert len(v[0]) == 0
assert not v[1]

v = q.next(label=LABEL_TWO, start_position=StartPosition.Newest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(OPS-1), 'utf-8')]
assert len(v[0]) == NUM
assert v[0] == [bytes(str(OPS-1), 'utf-8')]
assert not v[1]

labels = q.labels
assert len(labels) == 2
Expand All @@ -54,5 +57,6 @@
assert LABEL_TWO in labels

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(0), 'utf-8')]
assert len(v[0]) == NUM
assert v[0] == [bytes(str(0), 'utf-8')]
assert not v[1]
5 changes: 3 additions & 2 deletions python/mpmc_queue_blocking_w_r.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@

for i in range(OPS):
v = q.next(label=LABEL, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(i), 'utf-8')]
assert len(v[0]) == NUM
assert v[0] == [bytes(str(i), 'utf-8')]
assert not v[1]

end = time.time()

Expand Down
18 changes: 11 additions & 7 deletions python/mpmc_queue_nonblocking_mixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@
data = [bytes(str(i), 'utf-8')]
q.add(data, no_gil=RELEASE_GIL).get()
v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == data
assert len(v[0]) == NUM
assert v[0] == data
assert not v[1]

end = time.time()

print("Time taken: %f" % (end - start))

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == 0
assert len(v[0]) == 0
assert not v[1]

v = q.next(label=LABEL_TWO, start_position=StartPosition.Newest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == [bytes(str(OPS-1), 'utf-8')]
assert len(v[0]) == NUM
assert v[0] == [bytes(str(OPS-1), 'utf-8')]
assert not v[1]

labels = q.labels.get().labels
assert len(labels) == 2
Expand All @@ -54,5 +57,6 @@
assert LABEL_TWO in labels

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == [bytes(str(0), 'utf-8')]
assert len(v[0]) == NUM
assert v[0] == [bytes(str(0), 'utf-8')]
assert not v[1]
2 changes: 1 addition & 1 deletion queue_py/python/rocksq/blocking/blocking.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class MpmcQueue:

def add(self, items: list[bytes], no_gil: bool = True): ...

def next(self, label: str, start_position: StartPosition, max_elements: int = 1, no_gil: bool = True) -> list[bytes]: ...
def next(self, label: str, start_position: StartPosition, max_elements: int = 1, no_gil: bool = True) -> (list[bytes], bool): ...

@property
def is_empty(self) -> bool: ...
Expand Down
2 changes: 1 addition & 1 deletion queue_py/python/rocksq/nonblocking/nonblocking.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PersistentQueueWithCapacity:

class MpmcResponseVariant:
@property
def data(self) -> Optional[list[bytes]]: ...
def data(self) -> Optional[(list[bytes], bool)]: ...

@property
def labels(self) -> Optional[list[str]]: ...
Expand Down
8 changes: 5 additions & 3 deletions queue_py/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl MpmcQueue {
start_position: StartPosition,
max_elements: usize,
no_gil: bool,
) -> PyResult<Vec<PyObject>> {
) -> PyResult<(Vec<PyObject>, bool)> {
Python::with_gil(|py| {
let start_position = match start_position {
StartPosition::Oldest => mpmc::StartPosition::Oldest,
Expand All @@ -297,8 +297,9 @@ impl MpmcQueue {
} else {
self.0.next(max_elements, label, start_position)
}
.map(|results| {
results
.map(|result| {
result
.0
.into_iter()
.map(|r| {
PyBytes::new_with(py, r.len(), |b: &mut [u8]| {
Expand All @@ -308,6 +309,7 @@ impl MpmcQueue {
.map(PyObject::from)
})
.collect::<PyResult<Vec<_>>>()
.map(|e| (e, result.1))
})
.map_err(|_| PyRuntimeError::new_err("Failed to retrieve items"))
})?
Expand Down
8 changes: 5 additions & 3 deletions queue_py/src/nonblocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,13 @@ impl MpmcResponseVariant {
/// if the future doesn't represent the ``next()`` operation.
///
#[getter]
fn data(&self) -> PyResult<Option<Vec<PyObject>>> {
fn data(&self) -> PyResult<Option<(Vec<PyObject>, bool)>> {
Python::with_gil(|py| match &self.0 {
queue_rs::nonblocking::MpmcResponseVariant::Next(data) => Ok(Some(
data.as_ref()
.map(|results| {
results
.map(|result| {
result
.0
.iter()
.map(|r| {
PyBytes::new_with(py, r.len(), |b: &mut [u8]| {
Expand All @@ -387,6 +388,7 @@ impl MpmcResponseVariant {
.map(PyObject::from)
})
.collect::<PyResult<Vec<_>>>()
.map(|e| (e, result.1))
})
.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to get response: {}", e))
Expand Down
2 changes: 1 addition & 1 deletion queue_rs/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl MpmcQueue {
max_elts: usize,
label: &str,
start_position: StartPosition,
) -> Result<Vec<Vec<u8>>> {
) -> Result<(Vec<Vec<u8>>, bool)> {
self.0.lock().next(max_elts, label, start_position)
}

Expand Down
Loading

0 comments on commit 4b1a8fe

Please sign in to comment.