Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
bwsw committed Mar 3, 2024
2 parents 27f2590 + 418d135 commit 4702b9b
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
--directory build/html -cvf artifact.tar .
- name: Upload artifact
uses: actions/upload-artifact@main
uses: actions/upload-artifact@v3
with:
name: github-pages
path: docs/artifact.tar
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.manylinux_2_28_ARM64
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ghcr.io/insight-platform/manylinux_2_28_arm64:v0.0.1 as chef
FROM ghcr.io/insight-platform/manylinux_2_28_arm64:v0.0.3 as chef

FROM chef as planner

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.manylinux_2_28_X64
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ghcr.io/insight-platform/manylinux_2_28_x64:v0.0.1 as chef
FROM ghcr.io/insight-platform/manylinux_2_28_x64:v0.0.3 as chef

FROM chef as planner

Expand Down
4 changes: 2 additions & 2 deletions queue_py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ crate-type = ["cdylib", "lib"]
queue_rs = { path = "../queue_rs" }

[dependencies.pyo3]
version = "0.19"
version = "0.20"

[dev-dependencies]
serial_test = "2.0"
serial_test = "3"

[build-dependencies]
pyo3-build-config = "0.19"
Expand Down
5 changes: 5 additions & 0 deletions queue_py/python/rocksq/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .rocksq import *

__doc__ = rocksq.__doc__
if hasattr(rocksq, "__all__"):
__all__ = rocksq.__all__
5 changes: 5 additions & 0 deletions queue_py/python/rocksq/blocking/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .blocking import *

__doc__ = blocking.__doc__
if hasattr(blocking, "__all__"):
__all__ = blocking.__all__
18 changes: 18 additions & 0 deletions queue_py/python/rocksq/blocking/blocking.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class PersistentQueueWithCapacity:
def __init__(self, path: str, max_elements: int = 1_000_000_000): ...

def push(self, items: list[bytes], no_gil: bool = True): ...

def pop(self, max_elements: int = 1, no_gil: bool = True) -> list[bytes]: ...

@property
def is_empty(self) -> bool: ...

@property
def disk_size(self) -> int: ...

@property
def payload_size(self) -> int: ...

@property
def len(self) -> int: ...
5 changes: 5 additions & 0 deletions queue_py/python/rocksq/nonblocking/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .nonblocking import *

__doc__ = nonblocking.__doc__
if hasattr(nonblocking, "__all__"):
__all__ = nonblocking.__all__
41 changes: 41 additions & 0 deletions queue_py/python/rocksq/nonblocking/nonblocking.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Optional


class ResponseVariant:
@property
def data(self) -> Optional[list[bytes]]: ...

@property
def len(self) -> Optional[int]: ...

@property
def size(self) -> Optional[int]: ...


class Response:
@property
def is_ready(self) -> bool: ...

def try_get(self) -> Optional[ResponseVariant]: ...

def get(self) -> ResponseVariant: ...


class PersistentQueueWithCapacity:
def __init__(self, path: str, max_elements: int = 1_000_000_000, max_inflight_ops: int = 1_000): ...

def push(self, items: list[bytes], no_gil: bool = True) -> Response: ...

@property
def inflight_ops(self) -> int: ...

def pop(self, max_elements = 1, no_gil: bool = True) -> Response: ...

@property
def disk_size(self) -> Response: ...

@property
def payload_size(self) -> Response: ...

@property
def len(self) -> Response: ...
Empty file added queue_py/python/rocksq/py.typed
Empty file.
4 changes: 4 additions & 0 deletions queue_py/python/rocksq/rocksq.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
def version() -> str: ...

def remove_queue(queue_name: str): ...

32 changes: 26 additions & 6 deletions queue_py/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,17 @@ impl PersistentQueueWithCapacity {
.map(|results| {
results
.into_iter()
.map(|r| PyObject::from(PyBytes::new(py, &r)))
.collect::<Vec<_>>()
.map(|r| {
PyBytes::new_with(py, r.len(), |b: &mut [u8]| {
b.copy_from_slice(&r);
Ok(())
})
.map(PyObject::from)
})
.collect::<PyResult<Vec<_>>>()
})
.map_err(|_| PyRuntimeError::new_err("Failed to pop item"))
})
})?
}

/// Checks if the queue is empty.
Expand All @@ -120,11 +126,12 @@ impl PersistentQueueWithCapacity {
/// bool
/// ``True`` if the queue is empty, ``False`` otherwise.
///
#[getter]
fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Returns the size of the queue in bytes.
/// Returns the disk size of the queue in bytes.
///
/// Returns
/// -------
Expand All @@ -135,23 +142,36 @@ impl PersistentQueueWithCapacity {
/// PyRuntimeError
/// If the method fails.
///
fn size(&self) -> PyResult<usize> {
#[getter]
fn disk_size(&self) -> PyResult<usize> {
Python::with_gil(|py| {
py.allow_threads(|| {
self.0.size().map_err(|e| {
self.0.disk_size().map_err(|e| {
PyRuntimeError::new_err(format!("Failed to get queue size: {}", e))
})
})
})
}

/// Returns the size of the queue in bytes (only payload).
///
/// Returns
/// -------
/// size : int
///
#[getter]
fn payload_size(&self) -> u64 {
self.0.payload_size()
}

/// Returns the number of elements in the queue.
///
/// Returns
/// -------
/// int
/// The number of elements in the queue.
///
#[getter]
fn len(&self) -> usize {
self.0.len()
}
Expand Down
68 changes: 51 additions & 17 deletions queue_py/src/nonblocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,27 @@ impl ResponseVariant {
///
#[getter]
fn data(&self) -> PyResult<Option<Vec<PyObject>>> {
match &self.0 {
queue_rs::nonblocking::ResponseVariant::Pop(data) => Ok(data
.as_ref()
.map(|results| {
Python::with_gil(|py| {
Some(
results
.iter()
.map(|r| PyObject::from(PyBytes::new(py, r)))
.collect::<Vec<_>>(),
)
Python::with_gil(|py| match &self.0 {
queue_rs::nonblocking::ResponseVariant::Pop(data) => Ok(Some(
data.as_ref()
.map(|results| {
results
.iter()
.map(|r| {
PyBytes::new_with(py, r.len(), |b: &mut [u8]| {
b.copy_from_slice(r);
Ok(())
})
.map(PyObject::from)
})
.collect::<PyResult<Vec<_>>>()
})
})
.map_err(|e| PyRuntimeError::new_err(format!("Failed to get response: {}", e)))?),
.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to get response: {}", e))
})??,
)),
_ => Ok(None),
}
})
}

/// Returns the length of the queue.
Expand Down Expand Up @@ -236,6 +241,13 @@ impl PersistentQueueWithCapacity {
.map(Response)
}

#[getter]
pub fn inflight_ops(&self) -> PyResult<usize> {
self.0
.inflight_ops()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to get inflight ops: {}", e)))
}

/// Retrieves items from the queue.
///
/// **GIL**: the method can optionally be called without the GIL.
Expand Down Expand Up @@ -271,7 +283,27 @@ impl PersistentQueueWithCapacity {
})
}

/// Returns the number of elements in the queue.
/// Returns the disk size of the queue.
///
/// Raises
/// ------
/// PyRuntimeError
///
/// Returns
/// -------
/// :py:class:`Response`
/// The future-like object which must be used to get the actual response. For the size operation,
/// the response object is useful to call for ``is_ready()``, ``try_get()`` and ``get()``.
///
#[getter]
pub fn disk_size(&self) -> PyResult<Response> {
self.0
.disk_size()
.map(Response)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to get size: {}", e)))
}

/// Returns the payload size of the queue.
///
/// Raises
/// ------
Expand All @@ -283,9 +315,10 @@ impl PersistentQueueWithCapacity {
/// The future-like object which must be used to get the actual response. For the size operation,
/// the response object is useful to call for ``is_ready()``, ``try_get()`` and ``get()``.
///
pub fn size(&self) -> PyResult<Response> {
#[getter]
pub fn payload_size(&self) -> PyResult<Response> {
self.0
.size()
.payload_size()
.map(Response)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to get size: {}", e)))
}
Expand All @@ -303,6 +336,7 @@ impl PersistentQueueWithCapacity {
/// The future-like object which must be used to get the actual response. For the length operation,
/// the response object is useful to call for ``is_ready()``, ``try_get()`` and ``get()``.
///
#[getter]
pub fn len(&self) -> PyResult<Response> {
self.0
.len()
Expand Down
8 changes: 6 additions & 2 deletions queue_rs/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ impl PersistentQueueWithCapacity {
self.0.lock().is_empty()
}

pub fn size(&self) -> Result<usize> {
self.0.lock().size()
pub fn disk_size(&self) -> Result<usize> {
self.0.lock().disk_size()
}

pub fn payload_size(&self) -> u64 {
self.0.lock().payload_size()
}

pub fn len(&self) -> usize {
Expand Down
Loading

0 comments on commit 4702b9b

Please sign in to comment.