diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 5ac03e0..ba94e99 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -54,7 +54,7 @@ jobs: --directory build/html -cvf artifact.tar . - name: Upload artifact - uses: actions/upload-artifact@main + uses: actions/upload-artifact@v3 with: name: github-pages path: docs/artifact.tar diff --git a/docker/Dockerfile.manylinux_2_28_ARM64 b/docker/Dockerfile.manylinux_2_28_ARM64 index d8d4468..af966f5 100644 --- a/docker/Dockerfile.manylinux_2_28_ARM64 +++ b/docker/Dockerfile.manylinux_2_28_ARM64 @@ -1,4 +1,4 @@ -FROM ghcr.io/insight-platform/manylinux_2_28_arm64:v0.0.1 as chef +FROM ghcr.io/insight-platform/manylinux_2_28_arm64:v0.0.3 as chef FROM chef as planner diff --git a/docker/Dockerfile.manylinux_2_28_X64 b/docker/Dockerfile.manylinux_2_28_X64 index c2a3fb5..67f3170 100644 --- a/docker/Dockerfile.manylinux_2_28_X64 +++ b/docker/Dockerfile.manylinux_2_28_X64 @@ -1,4 +1,4 @@ -FROM ghcr.io/insight-platform/manylinux_2_28_x64:v0.0.1 as chef +FROM ghcr.io/insight-platform/manylinux_2_28_x64:v0.0.3 as chef FROM chef as planner diff --git a/queue_py/Cargo.toml b/queue_py/Cargo.toml index 3d3d308..b511ff2 100644 --- a/queue_py/Cargo.toml +++ b/queue_py/Cargo.toml @@ -19,10 +19,10 @@ crate-type = ["cdylib", "lib"] queue_rs = { path = "../queue_rs" } [dependencies.pyo3] -version = "0.19" +version = "0.20" [dev-dependencies] -serial_test = "2.0" +serial_test = "3" [build-dependencies] pyo3-build-config = "0.19" diff --git a/queue_py/python/rocksq/__init__.py b/queue_py/python/rocksq/__init__.py new file mode 100644 index 0000000..1614d6a --- /dev/null +++ b/queue_py/python/rocksq/__init__.py @@ -0,0 +1,5 @@ +from .rocksq import * + +__doc__ = rocksq.__doc__ +if hasattr(rocksq, "__all__"): + __all__ = rocksq.__all__ diff --git a/queue_py/python/rocksq/blocking/__init__.py b/queue_py/python/rocksq/blocking/__init__.py new file mode 100644 index 0000000..4e84c1f --- /dev/null +++ b/queue_py/python/rocksq/blocking/__init__.py @@ -0,0 +1,5 @@ +from .blocking import * + +__doc__ = blocking.__doc__ +if hasattr(blocking, "__all__"): + __all__ = blocking.__all__ diff --git a/queue_py/python/rocksq/blocking/blocking.pyi b/queue_py/python/rocksq/blocking/blocking.pyi new file mode 100644 index 0000000..1a7272d --- /dev/null +++ b/queue_py/python/rocksq/blocking/blocking.pyi @@ -0,0 +1,18 @@ +class PersistentQueueWithCapacity: + def __init__(self, path: str, max_elements: int = 1_000_000_000): ... + + def push(self, items: list[bytes], no_gil: bool = True): ... + + def pop(self, max_elements: int = 1, no_gil: bool = True) -> list[bytes]: ... + + @property + def is_empty(self) -> bool: ... + + @property + def disk_size(self) -> int: ... + + @property + def payload_size(self) -> int: ... + + @property + def len(self) -> int: ... \ No newline at end of file diff --git a/queue_py/python/rocksq/nonblocking/__init__.py b/queue_py/python/rocksq/nonblocking/__init__.py new file mode 100644 index 0000000..643e727 --- /dev/null +++ b/queue_py/python/rocksq/nonblocking/__init__.py @@ -0,0 +1,5 @@ +from .nonblocking import * + +__doc__ = nonblocking.__doc__ +if hasattr(nonblocking, "__all__"): + __all__ = nonblocking.__all__ diff --git a/queue_py/python/rocksq/nonblocking/nonblocking.pyi b/queue_py/python/rocksq/nonblocking/nonblocking.pyi new file mode 100644 index 0000000..31e72d0 --- /dev/null +++ b/queue_py/python/rocksq/nonblocking/nonblocking.pyi @@ -0,0 +1,41 @@ +from typing import Optional + + +class ResponseVariant: + @property + def data(self) -> Optional[list[bytes]]: ... + + @property + def len(self) -> Optional[int]: ... + + @property + def size(self) -> Optional[int]: ... + + +class Response: + @property + def is_ready(self) -> bool: ... + + def try_get(self) -> Optional[ResponseVariant]: ... + + def get(self) -> ResponseVariant: ... + + +class PersistentQueueWithCapacity: + def __init__(self, path: str, max_elements: int = 1_000_000_000, max_inflight_ops: int = 1_000): ... + + def push(self, items: list[bytes], no_gil: bool = True) -> Response: ... + + @property + def inflight_ops(self) -> int: ... + + def pop(self, max_elements = 1, no_gil: bool = True) -> Response: ... + + @property + def disk_size(self) -> Response: ... + + @property + def payload_size(self) -> Response: ... + + @property + def len(self) -> Response: ... diff --git a/queue_py/python/rocksq/py.typed b/queue_py/python/rocksq/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/queue_py/python/rocksq/rocksq.pyi b/queue_py/python/rocksq/rocksq.pyi new file mode 100644 index 0000000..f20501b --- /dev/null +++ b/queue_py/python/rocksq/rocksq.pyi @@ -0,0 +1,4 @@ +def version() -> str: ... + +def remove_queue(queue_name: str): ... + diff --git a/queue_py/src/blocking.rs b/queue_py/src/blocking.rs index 5523c48..b51a75a 100644 --- a/queue_py/src/blocking.rs +++ b/queue_py/src/blocking.rs @@ -106,11 +106,17 @@ impl PersistentQueueWithCapacity { .map(|results| { results .into_iter() - .map(|r| PyObject::from(PyBytes::new(py, &r))) - .collect::>() + .map(|r| { + PyBytes::new_with(py, r.len(), |b: &mut [u8]| { + b.copy_from_slice(&r); + Ok(()) + }) + .map(PyObject::from) + }) + .collect::>>() }) .map_err(|_| PyRuntimeError::new_err("Failed to pop item")) - }) + })? } /// Checks if the queue is empty. @@ -120,11 +126,12 @@ impl PersistentQueueWithCapacity { /// bool /// ``True`` if the queue is empty, ``False`` otherwise. /// + #[getter] fn is_empty(&self) -> bool { self.0.is_empty() } - /// Returns the size of the queue in bytes. + /// Returns the disk size of the queue in bytes. /// /// Returns /// ------- @@ -135,16 +142,28 @@ impl PersistentQueueWithCapacity { /// PyRuntimeError /// If the method fails. /// - fn size(&self) -> PyResult { + #[getter] + fn disk_size(&self) -> PyResult { Python::with_gil(|py| { py.allow_threads(|| { - self.0.size().map_err(|e| { + self.0.disk_size().map_err(|e| { PyRuntimeError::new_err(format!("Failed to get queue size: {}", e)) }) }) }) } + /// Returns the size of the queue in bytes (only payload). + /// + /// Returns + /// ------- + /// size : int + /// + #[getter] + fn payload_size(&self) -> u64 { + self.0.payload_size() + } + /// Returns the number of elements in the queue. /// /// Returns @@ -152,6 +171,7 @@ impl PersistentQueueWithCapacity { /// int /// The number of elements in the queue. /// + #[getter] fn len(&self) -> usize { self.0.len() } diff --git a/queue_py/src/nonblocking.rs b/queue_py/src/nonblocking.rs index 8b8646e..c81f753 100644 --- a/queue_py/src/nonblocking.rs +++ b/queue_py/src/nonblocking.rs @@ -27,22 +27,27 @@ impl ResponseVariant { /// #[getter] fn data(&self) -> PyResult>> { - match &self.0 { - queue_rs::nonblocking::ResponseVariant::Pop(data) => Ok(data - .as_ref() - .map(|results| { - Python::with_gil(|py| { - Some( - results - .iter() - .map(|r| PyObject::from(PyBytes::new(py, r))) - .collect::>(), - ) + Python::with_gil(|py| match &self.0 { + queue_rs::nonblocking::ResponseVariant::Pop(data) => Ok(Some( + data.as_ref() + .map(|results| { + results + .iter() + .map(|r| { + PyBytes::new_with(py, r.len(), |b: &mut [u8]| { + b.copy_from_slice(r); + Ok(()) + }) + .map(PyObject::from) + }) + .collect::>>() }) - }) - .map_err(|e| PyRuntimeError::new_err(format!("Failed to get response: {}", e)))?), + .map_err(|e| { + PyRuntimeError::new_err(format!("Failed to get response: {}", e)) + })??, + )), _ => Ok(None), - } + }) } /// Returns the length of the queue. @@ -236,6 +241,13 @@ impl PersistentQueueWithCapacity { .map(Response) } + #[getter] + pub fn inflight_ops(&self) -> PyResult { + self.0 + .inflight_ops() + .map_err(|e| PyRuntimeError::new_err(format!("Failed to get inflight ops: {}", e))) + } + /// Retrieves items from the queue. /// /// **GIL**: the method can optionally be called without the GIL. @@ -271,7 +283,27 @@ impl PersistentQueueWithCapacity { }) } - /// Returns the number of elements in the queue. + /// Returns the disk size of the queue. + /// + /// Raises + /// ------ + /// PyRuntimeError + /// + /// Returns + /// ------- + /// :py:class:`Response` + /// The future-like object which must be used to get the actual response. For the size operation, + /// the response object is useful to call for ``is_ready()``, ``try_get()`` and ``get()``. + /// + #[getter] + pub fn disk_size(&self) -> PyResult { + self.0 + .disk_size() + .map(Response) + .map_err(|e| PyRuntimeError::new_err(format!("Failed to get size: {}", e))) + } + + /// Returns the payload size of the queue. /// /// Raises /// ------ @@ -283,9 +315,10 @@ impl PersistentQueueWithCapacity { /// The future-like object which must be used to get the actual response. For the size operation, /// the response object is useful to call for ``is_ready()``, ``try_get()`` and ``get()``. /// - pub fn size(&self) -> PyResult { + #[getter] + pub fn payload_size(&self) -> PyResult { self.0 - .size() + .payload_size() .map(Response) .map_err(|e| PyRuntimeError::new_err(format!("Failed to get size: {}", e))) } @@ -303,6 +336,7 @@ impl PersistentQueueWithCapacity { /// The future-like object which must be used to get the actual response. For the length operation, /// the response object is useful to call for ``is_ready()``, ``try_get()`` and ``get()``. /// + #[getter] pub fn len(&self) -> PyResult { self.0 .len() diff --git a/queue_rs/src/blocking.rs b/queue_rs/src/blocking.rs index fe76071..01ded4b 100644 --- a/queue_rs/src/blocking.rs +++ b/queue_rs/src/blocking.rs @@ -17,8 +17,12 @@ impl PersistentQueueWithCapacity { self.0.lock().is_empty() } - pub fn size(&self) -> Result { - self.0.lock().size() + pub fn disk_size(&self) -> Result { + self.0.lock().disk_size() + } + + pub fn payload_size(&self) -> u64 { + self.0.lock().payload_size() } pub fn len(&self) -> usize { diff --git a/queue_rs/src/lib.rs b/queue_rs/src/lib.rs index 4867106..64c9c17 100644 --- a/queue_rs/src/lib.rs +++ b/queue_rs/src/lib.rs @@ -13,6 +13,7 @@ pub fn version() -> &'static str { pub struct PersistentQueueWithCapacity { db: DB, path: String, + space_stat: u64, write_index: u64, read_index: u64, max_elements: u64, @@ -21,11 +22,12 @@ pub struct PersistentQueueWithCapacity { const U64_BYTE_LEN: usize = 8; const WRITE_INDEX_CELL: u64 = u64::MAX; const READ_INDEX_CELL: u64 = u64::MAX - 1; +const SPACE_STAT_CELL: u64 = u64::MAX - 2; #[cfg(test)] const MAX_ALLOWED_INDEX: u64 = 4; #[cfg(not(test))] -const MAX_ALLOWED_INDEX: u64 = u64::MAX - 2; +const MAX_ALLOWED_INDEX: u64 = u64::MAX - 100; // db_opts.set_write_buffer_size(64 * 1024 * 1024); // db_opts.set_max_write_buffer_number(5); @@ -58,11 +60,22 @@ impl PersistentQueueWithCapacity { None => 0u64, }; + let space_stat_opt = db.get(Self::index_to_key(SPACE_STAT_CELL))?; + let space_stat = match space_stat_opt { + Some(v) => { + let mut buf = [0u8; U64_BYTE_LEN]; + buf.copy_from_slice(&v); + u64::from_le_bytes(buf) + } + None => 0u64, + }; + Ok(Self { db, path: path.to_string(), write_index, read_index, + space_stat, max_elements: max_elements as u64, }) } @@ -75,7 +88,7 @@ impl PersistentQueueWithCapacity { Ok(DB::destroy(&Options::default(), path)?) } - pub fn size(&self) -> Result { + pub fn disk_size(&self) -> Result { Ok(fs::dir_size(&self.path)?) } @@ -87,6 +100,10 @@ impl PersistentQueueWithCapacity { }) as usize } + pub fn payload_size(&self) -> u64 { + self.space_stat + } + pub fn is_empty(&self) -> bool { self.write_index == self.read_index } @@ -111,6 +128,13 @@ impl PersistentQueueWithCapacity { self.write_index.to_le_bytes(), ); + self.space_stat += values.iter().map(|v| v.len() as u64).sum::(); + + batch.put( + Self::index_to_key(SPACE_STAT_CELL), + self.space_stat.to_le_bytes(), + ); + self.db.write(batch)?; Ok(()) @@ -141,6 +165,11 @@ impl PersistentQueueWithCapacity { } } if !res.is_empty() { + self.space_stat -= res.iter().map(|v| v.len() as u64).sum::(); + batch.put( + Self::index_to_key(SPACE_STAT_CELL), + self.space_stat.to_le_bytes(), + ); batch.put( Self::index_to_key(READ_INDEX_CELL), self.read_index.to_le_bytes(), @@ -164,6 +193,7 @@ mod tests { db.push(&[&[1, 2, 3]]).unwrap(); db.push(&[&[4, 5, 6]]).unwrap(); assert_eq!(db.len(), 2); + assert_eq!(db.payload_size(), 6); assert!(matches!(db.pop(1), Ok(v ) if v == vec![vec![1, 2, 3]])); assert!(matches!(db.pop(1), Ok(v) if v == vec![vec![4, 5, 6]])); assert!(db.is_empty()); @@ -197,6 +227,7 @@ mod tests { db.push(&[&[1, 2, 3]]).unwrap(); db.push(&[&[4, 5, 6]]).unwrap(); assert!(matches!(db.push(&[&[1, 2, 3]]), Err(_))); + assert_eq!(db.payload_size(), 6); } PersistentQueueWithCapacity::remove_db(&path).unwrap(); } @@ -214,8 +245,10 @@ mod tests { { let mut db = PersistentQueueWithCapacity::new(&path, 10, Options::default()).unwrap(); + assert_eq!(db.payload_size(), 9); let res = db.pop(1).unwrap(); assert_eq!(res, vec![vec![1, 2, 3]]); + assert_eq!(db.payload_size(), 6); } { diff --git a/queue_rs/src/nonblocking.rs b/queue_rs/src/nonblocking.rs index dcb8f1d..dff8a48 100644 --- a/queue_rs/src/nonblocking.rs +++ b/queue_rs/src/nonblocking.rs @@ -6,7 +6,8 @@ pub enum Operation { Push(Vec>), Pop(usize), Length, - Size, + PayloadSize, + DiskSize, Stop, } @@ -70,10 +71,14 @@ fn start_op_loop( let resp = queue.len(); resp_tx.send(ResponseVariant::Length(resp))?; } - Ok((Operation::Size, resp_tx)) => { - let resp = queue.size(); + Ok((Operation::DiskSize, resp_tx)) => { + let resp = queue.disk_size(); resp_tx.send(ResponseVariant::Size(resp))?; } + Ok((Operation::PayloadSize, resp_tx)) => { + let resp = queue.payload_size(); + resp_tx.send(ResponseVariant::Size(Ok(resp as usize)))?; + } Ok((Operation::Stop, resp_tx)) => { resp_tx.send(ResponseVariant::Stop)?; break; @@ -127,7 +132,17 @@ impl PersistentQueueWithCapacity { Ok(Response(rx)) } - pub fn size(&self) -> Result { + pub fn inflight_ops(&self) -> Result { + if !self.is_healthy() { + return Err(anyhow::anyhow!( + "Queue is unhealthy: cannot use it anymore." + )); + } + + Ok(self.0 .1.len()) + } + + pub fn disk_size(&self) -> Result { if !self.is_healthy() { return Err(anyhow::anyhow!( "Queue is unhealthy: cannot use it anymore." @@ -135,7 +150,19 @@ impl PersistentQueueWithCapacity { } let (tx, rx) = crossbeam_channel::bounded(1); - self.0 .1.send((Operation::Size, tx))?; + self.0 .1.send((Operation::DiskSize, tx))?; + Ok(Response(rx)) + } + + pub fn payload_size(&self) -> Result { + if !self.is_healthy() { + return Err(anyhow::anyhow!( + "Queue is unhealthy: cannot use it anymore." + )); + } + + let (tx, rx) = crossbeam_channel::bounded(1); + self.0 .1.send((Operation::PayloadSize, tx))?; Ok(Response(rx)) } @@ -196,10 +223,16 @@ mod tests { super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default()) .unwrap(); assert!(queue.is_healthy()); + let resp = queue.len().unwrap().get().unwrap(); assert!(matches!(resp, super::ResponseVariant::Length(0))); + let resp = queue.push(&[&[1u8, 2u8, 3u8]]).unwrap().get().unwrap(); assert!(matches!(resp, super::ResponseVariant::Push(Ok(())))); + + let resp = queue.payload_size().unwrap().get().unwrap(); + assert!(matches!(resp, super::ResponseVariant::Size(Ok(3)))); + let resp = queue.len().unwrap().get().unwrap(); assert!(matches!(resp, super::ResponseVariant::Length(1))); let resp = queue.pop(1).unwrap().get().unwrap(); @@ -218,7 +251,7 @@ mod tests { let queue = super::PersistentQueueWithCapacity::new(&path, 1000, 1000, rocksdb::Options::default()) .unwrap(); - let size_query = queue.size().unwrap(); + let size_query = queue.disk_size().unwrap(); let size = size_query.get().unwrap(); assert!(matches!(size, super::ResponseVariant::Size(Ok(r)) if r > 0)); }