diff --git a/python/mpmc_queue_blocking_mixed.py b/python/mpmc_queue_blocking_mixed.py index e84a3e3..12939a3 100644 --- a/python/mpmc_queue_blocking_mixed.py +++ b/python/mpmc_queue_blocking_mixed.py @@ -23,19 +23,22 @@ data = [bytes(str(i), 'utf-8')] q.add(data, no_gil=RELEASE_GIL) v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL) - assert len(v) == NUM - assert v == data + assert len(v[0]) == NUM + assert v[0] == data + assert not v[1] end = time.time() print("Time taken: %f" % (end - start)) v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL) -assert len(v) == 0 +assert len(v[0]) == 0 +assert not v[1] v = q.next(label=LABEL_TWO, start_position=StartPosition.Newest, max_elements=NUM, no_gil=RELEASE_GIL) -assert len(v) == NUM -assert v == [bytes(str(OPS-1), 'utf-8')] +assert len(v[0]) == NUM +assert v[0] == [bytes(str(OPS-1), 'utf-8')] +assert not v[1] labels = q.labels assert len(labels) == 2 @@ -54,5 +57,6 @@ assert LABEL_TWO in labels v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL) -assert len(v) == NUM -assert v == [bytes(str(0), 'utf-8')] +assert len(v[0]) == NUM +assert v[0] == [bytes(str(0), 'utf-8')] +assert not v[1] diff --git a/python/mpmc_queue_blocking_w_r.py b/python/mpmc_queue_blocking_w_r.py index 8b2f6a4..fa06a87 100644 --- a/python/mpmc_queue_blocking_w_r.py +++ b/python/mpmc_queue_blocking_w_r.py @@ -23,8 +23,9 @@ for i in range(OPS): v = q.next(label=LABEL, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL) - assert len(v) == NUM - assert v == [bytes(str(i), 'utf-8')] + assert len(v[0]) == NUM + assert v[0] == [bytes(str(i), 'utf-8')] + assert not v[1] end = time.time() diff --git a/python/mpmc_queue_nonblocking_mixed.py b/python/mpmc_queue_nonblocking_mixed.py index 0f85497..46818dc 100644 --- a/python/mpmc_queue_nonblocking_mixed.py +++ b/python/mpmc_queue_nonblocking_mixed.py @@ -23,19 +23,22 @@ data = [bytes(str(i), 'utf-8')] q.add(data, no_gil=RELEASE_GIL).get() v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data - assert len(v) == NUM - assert v == data + assert len(v[0]) == NUM + assert v[0] == data + assert not v[1] end = time.time() print("Time taken: %f" % (end - start)) v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data -assert len(v) == 0 +assert len(v[0]) == 0 +assert not v[1] v = q.next(label=LABEL_TWO, start_position=StartPosition.Newest, max_elements=NUM, no_gil=RELEASE_GIL).get().data -assert len(v) == NUM -assert v == [bytes(str(OPS-1), 'utf-8')] +assert len(v[0]) == NUM +assert v[0] == [bytes(str(OPS-1), 'utf-8')] +assert not v[1] labels = q.labels.get().labels assert len(labels) == 2 @@ -54,5 +57,6 @@ assert LABEL_TWO in labels v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data -assert len(v) == NUM -assert v == [bytes(str(0), 'utf-8')] +assert len(v[0]) == NUM +assert v[0] == [bytes(str(0), 'utf-8')] +assert not v[1] diff --git a/queue_py/python/rocksq/blocking/blocking.pyi b/queue_py/python/rocksq/blocking/blocking.pyi index bb8feae..f49f58c 100644 --- a/queue_py/python/rocksq/blocking/blocking.pyi +++ b/queue_py/python/rocksq/blocking/blocking.pyi @@ -24,7 +24,7 @@ class MpmcQueue: def add(self, items: list[bytes], no_gil: bool = True): ... - def next(self, label: str, start_position: StartPosition, max_elements: int = 1, no_gil: bool = True) -> list[bytes]: ... + def next(self, label: str, start_position: StartPosition, max_elements: int = 1, no_gil: bool = True) -> (list[bytes], bool): ... @property def is_empty(self) -> bool: ... diff --git a/queue_py/python/rocksq/nonblocking/nonblocking.pyi b/queue_py/python/rocksq/nonblocking/nonblocking.pyi index e4ba247..7f7fb5d 100644 --- a/queue_py/python/rocksq/nonblocking/nonblocking.pyi +++ b/queue_py/python/rocksq/nonblocking/nonblocking.pyi @@ -42,7 +42,7 @@ class PersistentQueueWithCapacity: class MpmcResponseVariant: @property - def data(self) -> Optional[list[bytes]]: ... + def data(self) -> Optional[(list[bytes], bool)]: ... @property def labels(self) -> Optional[list[str]]: ... diff --git a/queue_py/src/blocking.rs b/queue_py/src/blocking.rs index a839365..ebef746 100644 --- a/queue_py/src/blocking.rs +++ b/queue_py/src/blocking.rs @@ -286,7 +286,7 @@ impl MpmcQueue { start_position: StartPosition, max_elements: usize, no_gil: bool, - ) -> PyResult> { + ) -> PyResult<(Vec, bool)> { Python::with_gil(|py| { let start_position = match start_position { StartPosition::Oldest => mpmc::StartPosition::Oldest, @@ -297,8 +297,9 @@ impl MpmcQueue { } else { self.0.next(max_elements, label, start_position) } - .map(|results| { - results + .map(|result| { + result + .0 .into_iter() .map(|r| { PyBytes::new_with(py, r.len(), |b: &mut [u8]| { @@ -308,6 +309,7 @@ impl MpmcQueue { .map(PyObject::from) }) .collect::>>() + .map(|e| (e, result.1)) }) .map_err(|_| PyRuntimeError::new_err("Failed to retrieve items")) })? diff --git a/queue_py/src/nonblocking.rs b/queue_py/src/nonblocking.rs index 712fe1d..9399a40 100644 --- a/queue_py/src/nonblocking.rs +++ b/queue_py/src/nonblocking.rs @@ -372,12 +372,13 @@ impl MpmcResponseVariant { /// if the future doesn't represent the ``next()`` operation. /// #[getter] - fn data(&self) -> PyResult>> { + fn data(&self) -> PyResult, bool)>> { Python::with_gil(|py| match &self.0 { queue_rs::nonblocking::MpmcResponseVariant::Next(data) => Ok(Some( data.as_ref() - .map(|results| { - results + .map(|result| { + result + .0 .iter() .map(|r| { PyBytes::new_with(py, r.len(), |b: &mut [u8]| { @@ -387,6 +388,7 @@ impl MpmcResponseVariant { .map(PyObject::from) }) .collect::>>() + .map(|e| (e, result.1)) }) .map_err(|e| { PyRuntimeError::new_err(format!("Failed to get response: {}", e)) diff --git a/queue_rs/src/blocking.rs b/queue_rs/src/blocking.rs index 5c5d1c8..07a7593 100644 --- a/queue_rs/src/blocking.rs +++ b/queue_rs/src/blocking.rs @@ -79,7 +79,7 @@ impl MpmcQueue { max_elts: usize, label: &str, start_position: StartPosition, - ) -> Result>> { + ) -> Result<(Vec>, bool)> { self.0.lock().next(max_elts, label, start_position) } diff --git a/queue_rs/src/mpmc.rs b/queue_rs/src/mpmc.rs index abdcb67..f34709e 100644 --- a/queue_rs/src/mpmc.rs +++ b/queue_rs/src/mpmc.rs @@ -199,7 +199,7 @@ impl MpmcQueue { max_elts: usize, label: &str, start_position: StartPosition, - ) -> Result>> { + ) -> Result<(Vec>, bool)> { let mut res = Vec::with_capacity(max_elts); self.actualize_indices()?; @@ -239,14 +239,15 @@ impl MpmcQueue { reader.index = next_index(reader.index); end = reader.index == self.write_index; } + reader.end_timestamp = if end { Some(self.write_timestamp) } else { None }; - // for future to let the user know - let _expired = reader.expired; + let expired = reader.expired; reader.expired = false; + if !self.read_indices.get(&label).is_some_and(|e| *e == reader) { self.db.put_cf( reader_cf, @@ -257,7 +258,7 @@ impl MpmcQueue { self.read_indices.insert(label, reader); } - Ok(res) + Ok((res, expired)) } pub fn get_labels(&self) -> Vec { @@ -516,7 +517,8 @@ mod tests { test(ttl, |mut queue| { let result = queue.next(100, label, StartPosition::Oldest).unwrap(); - assert_eq!(result.is_empty(), true); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!( queue.read_indices, HashMap::from([( @@ -535,7 +537,8 @@ mod tests { test(ttl, |mut queue| { let result = queue.next(100, label, StartPosition::Newest).unwrap(); - assert_eq!(result.is_empty(), true); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!( queue.read_indices, HashMap::from([( @@ -560,7 +563,8 @@ mod tests { let result = queue.next(2, label, start_position).unwrap(); - assert_eq!(result, vec![value_one.to_vec(), value_two.to_vec()]); + assert_eq!(result.0, vec![value_one.to_vec(), value_two.to_vec()]); + assert_eq!(result.1, false); assert_eq!( queue.read_indices, HashMap::from([(label.to_string(), Reader::new(2, None, false))]) @@ -568,7 +572,8 @@ mod tests { let result = queue.next(2, label, start_position).unwrap(); - assert_eq!(result, vec![value_three.to_vec()]); + assert_eq!(result.0, vec![value_three.to_vec()]); + assert_eq!(result.1, false); assert_eq!( queue.read_indices, HashMap::from([( @@ -593,7 +598,8 @@ mod tests { let result = queue.next(2, label, start_position).unwrap(); - assert_eq!(result, vec![value_three.to_vec()]); + assert_eq!(result.0, vec![value_three.to_vec()]); + assert_eq!(result.1, false); assert_eq!( queue.read_indices, HashMap::from([( @@ -604,7 +610,8 @@ mod tests { let result = queue.next(2, label, start_position).unwrap(); - assert_eq!(result.is_empty(), true); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!( queue.read_indices, HashMap::from([( @@ -629,7 +636,8 @@ mod tests { let result = queue.next(1, label, StartPosition::Newest).unwrap(); - assert_eq!(result, vec![last_value.to_vec()]); + assert_eq!(result.0, vec![last_value.to_vec()]); + assert_eq!(result.1, false); assert_eq!( queue.read_indices, HashMap::from([( @@ -640,7 +648,8 @@ mod tests { let result = queue.next(1, label, StartPosition::Newest).unwrap(); - assert_eq!(result.is_empty(), true); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!( queue.read_indices, HashMap::from([( @@ -664,7 +673,8 @@ mod tests { let result = queue.next(2, label_one, StartPosition::Oldest).unwrap(); - assert_eq!(result.is_empty(), true); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 2); assert_eq!(queue.write_index, 2); assert_eq!( @@ -699,8 +709,10 @@ mod tests { // expire all wait_and_expire(&mut queue, ttl.mul(2)); - queue.next(1, label_two, StartPosition::Oldest).unwrap(); + let result = queue.next(1, label_two, StartPosition::Oldest).unwrap(); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 2); assert_eq!(queue.write_index, 2); assert_eq!( @@ -718,6 +730,42 @@ mod tests { ); assert_eq!(queue.empty, true); assert_eq!(queue.len(), 0); + + let result = queue.next(1, label_one, StartPosition::Oldest).unwrap(); + + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, true); + assert_eq!( + queue.read_indices, + HashMap::from([ + ( + label_one.to_string(), + Reader::new(2, Some(queue.write_timestamp), false) + ), + ( + label_two.to_string(), + Reader::new(2, Some(queue.write_timestamp), false) + ) + ]) + ); + + let result = queue.next(1, label_one, StartPosition::Oldest).unwrap(); + + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); + assert_eq!( + queue.read_indices, + HashMap::from([ + ( + label_one.to_string(), + Reader::new(2, Some(queue.write_timestamp), false) + ), + ( + label_two.to_string(), + Reader::new(2, Some(queue.write_timestamp), false) + ) + ]) + ); }); } @@ -734,7 +782,8 @@ mod tests { let result = queue.next(2, label_one, StartPosition::Newest).unwrap(); - assert_eq!(result.is_empty(), true); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 2); assert_eq!(queue.write_index, 2); assert_eq!( @@ -769,8 +818,10 @@ mod tests { // expire all wait_and_expire(&mut queue, ttl.mul(2)); - queue.next(1, label_two, StartPosition::Newest).unwrap(); + let result = queue.next(1, label_two, StartPosition::Newest).unwrap(); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 2); assert_eq!(queue.write_index, 2); assert_eq!( @@ -788,6 +839,42 @@ mod tests { ); assert_eq!(queue.empty, true); assert_eq!(queue.len(), 0); + + let result = queue.next(1, label_one, StartPosition::Oldest).unwrap(); + + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, true); + assert_eq!( + queue.read_indices, + HashMap::from([ + ( + label_one.to_string(), + Reader::new(2, Some(queue.write_timestamp), false) + ), + ( + label_two.to_string(), + Reader::new(2, Some(queue.write_timestamp), false) + ) + ]) + ); + + let result = queue.next(1, label_one, StartPosition::Oldest).unwrap(); + + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); + assert_eq!( + queue.read_indices, + HashMap::from([ + ( + label_one.to_string(), + Reader::new(2, Some(queue.write_timestamp), false) + ), + ( + label_two.to_string(), + Reader::new(2, Some(queue.write_timestamp), false) + ) + ]) + ); }); } @@ -811,7 +898,8 @@ mod tests { let result = queue.next(1, label, StartPosition::Oldest).unwrap(); - assert_eq!(result, vec![value]); + assert_eq!(result.0, vec![value]); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 1); assert_eq!(queue.write_index, 3); assert_eq!( @@ -843,7 +931,8 @@ mod tests { let result = queue.next(1, label, StartPosition::Newest).unwrap(); - assert_eq!(result, vec![value]); + assert_eq!(result.0, vec![value]); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 1); assert_eq!(queue.write_index, 3); assert_eq!( @@ -881,13 +970,24 @@ mod tests { queue.add(&[value_three, value_four]).unwrap(); // read < write, start > read - queue.next(1, label_one, start_position).unwrap(); + let result = queue.next(1, label_one, start_position).unwrap(); + assert_eq!(result.0, vec![value_one]); + assert_eq!(result.1, false); // read < write, start = read - queue.next(2, label_two, start_position).unwrap(); + let result = queue.next(2, label_two, start_position).unwrap(); + assert_eq!(result.0, vec![value_one, value_two]); + assert_eq!(result.1, false); // read < write, start < read - queue.next(3, label_three, start_position).unwrap(); + let result = queue.next(3, label_three, start_position).unwrap(); + assert_eq!(result.0, vec![value_one, value_two, value_three]); + assert_eq!(result.1, false); // read = write - queue.next(4, label_four, start_position).unwrap(); + let result = queue.next(4, label_four, start_position).unwrap(); + assert_eq!( + result.0, + vec![value_one, value_two, value_three, value_four] + ); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 0); assert_eq!(queue.write_index, 4); @@ -908,7 +1008,9 @@ mod tests { wait_and_expire(&mut queue, quarter_ttl.mul(2)); - queue.next(1, label_five, start_position).unwrap(); + let result = queue.next(1, label_five, start_position).unwrap(); + assert_eq!(result.0, vec![value_three]); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 2); assert_eq!(queue.write_index, 4); @@ -930,7 +1032,9 @@ mod tests { wait_and_expire(&mut queue, quarter_ttl.mul(3)); - queue.next(1, label_five, start_position).unwrap(); + let result = queue.next(1, label_five, start_position).unwrap(); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, true); assert_eq!(queue.start_index, 4); assert_eq!(queue.write_index, 4); @@ -988,7 +1092,9 @@ mod tests { // expire all to emulate write index < start index wait_and_expire(&mut queue, quarter_ttl.mul(5)); - queue.next(1, label_one, start_position).unwrap(); + let result = queue.next(1, label_one, start_position).unwrap(); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 2); assert_eq!(queue.write_index, 2); @@ -1011,16 +1117,44 @@ mod tests { .add(&[value_three, value_four, value_five, value_six]) .unwrap(); // read > write, start > read - queue.next(1, label_one, start_position).unwrap(); + let result = queue.next(1, label_one, start_position).unwrap(); + assert_eq!(result.0, vec![value_one]); + assert_eq!(result.1, false); // read > write, start = read - queue.next(2, label_two, start_position).unwrap(); + let result = queue.next(2, label_two, start_position).unwrap(); + assert_eq!(result.0, vec![value_one, value_two]); + assert_eq!(result.1, false); // read > write, start < read - queue.next(3, label_three, start_position).unwrap(); + let result = queue.next(3, label_three, start_position).unwrap(); + assert_eq!(result.0, vec![value_one, value_two, value_three]); + assert_eq!(result.1, false); // read < write, start > read - queue.next(4, label_four, start_position).unwrap(); - queue.next(5, label_five, start_position).unwrap(); + let result = queue.next(4, label_four, start_position).unwrap(); + assert_eq!( + result.0, + vec![value_one, value_two, value_three, value_four] + ); + assert_eq!(result.1, false); + let result = queue.next(5, label_five, start_position).unwrap(); + assert_eq!( + result.0, + vec![value_one, value_two, value_three, value_four, value_five] + ); + assert_eq!(result.1, false); // read == write - queue.next(6, label_six, start_position).unwrap(); + let result = queue.next(6, label_six, start_position).unwrap(); + assert_eq!( + result.0, + vec![ + value_one, + value_two, + value_three, + value_four, + value_five, + value_six + ] + ); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 2); assert_eq!(queue.write_index, 2); @@ -1044,7 +1178,9 @@ mod tests { // expire value one and value two wait_and_expire(&mut queue, quarter_ttl.mul(2)); - queue.next(1, label_five, start_position).unwrap(); + let result = queue.next(1, label_five, start_position).unwrap(); + assert_eq!(result.0, vec![value_six]); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 4); assert_eq!(queue.write_index, 2); @@ -1071,7 +1207,9 @@ mod tests { // expire all elements wait_and_expire(&mut queue, quarter_ttl.mul(3)); - queue.next(1, label_five, start_position).unwrap(); + let result = queue.next(1, label_five, start_position).unwrap(); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, false); assert_eq!(queue.start_index, 2); assert_eq!(queue.write_index, 2); @@ -1189,7 +1327,8 @@ mod tests { let result = queue.next(2, label, StartPosition::Oldest).unwrap(); - assert_eq!(result, vec![value_three, value_four]); + assert_eq!(result.0, vec![value_three, value_four]); + assert_eq!(result.1, true); assert_eq!(queue.start_index, 0); assert_eq!(queue.write_index, 4); assert_eq!( @@ -1224,7 +1363,8 @@ mod tests { let result = queue.next(4, label, StartPosition::Oldest).unwrap(); - assert_eq!(result.is_empty(), true); + assert_eq!(result.0.is_empty(), true); + assert_eq!(result.1, true); assert_eq!(queue.start_index, 0); assert_eq!(queue.write_index, 4); assert_eq!( diff --git a/queue_rs/src/nonblocking.rs b/queue_rs/src/nonblocking.rs index 23ed4e1..9de956d 100644 --- a/queue_rs/src/nonblocking.rs +++ b/queue_rs/src/nonblocking.rs @@ -36,7 +36,7 @@ pub enum MpmcOperation { pub enum MpmcResponseVariant { Add(Result<()>), - Next(Result>>), + Next(Result<(Vec>, bool)>), Length(usize), Size(Result), GetLabels(Vec), @@ -463,7 +463,7 @@ mod tests { .get() .unwrap(); assert!( - matches!(resp, super::MpmcResponseVariant::Next(Ok(v)) if v == vec![vec![1u8, 2u8, 3u8]]) + matches!(resp, super::MpmcResponseVariant::Next(Ok(v)) if v == (vec![vec![1u8, 2u8, 3u8]], false)) ); let resp = queue.len().unwrap().get().unwrap(); assert!(matches!(resp, super::MpmcResponseVariant::Length(1)));