Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mpmc queue #3

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions python/mpmc_queue_blocking_mixed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import time
import os
from rocksq import remove_mpmc_queue, StartPosition
from rocksq.blocking import MpmcQueue

NUM = 1
OPS = 1000
RELEASE_GIL = True
PATH = '/tmp/mpmc-queue'
TTL = 60
LABEL_ONE = 'label1'
LABEL_TWO = 'label2'
LABEL_THREE = 'label3'

# if directory exists, remove it
if os.path.exists(PATH):
remove_mpmc_queue(PATH)

q = MpmcQueue(PATH, TTL)

start = time.time()
for i in range(OPS):
data = [bytes(str(i), 'utf-8')]
q.add(data, no_gil=RELEASE_GIL)
v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == data

end = time.time()

print("Time taken: %f" % (end - start))

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == 0

v = q.next(label=LABEL_TWO, start_position=StartPosition.Newest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(OPS-1), 'utf-8')]

labels = q.labels
assert len(labels) == 2
assert LABEL_ONE in labels
assert LABEL_TWO in labels

r = q.remove_label(LABEL_THREE)
assert not r

r = q.remove_label(LABEL_ONE)
assert r

labels = q.labels
assert len(labels) == 1
assert LABEL_ONE not in labels
assert LABEL_TWO in labels

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(0), 'utf-8')]
31 changes: 31 additions & 0 deletions python/mpmc_queue_blocking_w_r.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import time
import os
from rocksq import remove_mpmc_queue, StartPosition
from rocksq.blocking import MpmcQueue

NUM = 1
OPS = 1000
RELEASE_GIL = True
PATH = '/tmp/mpmc-queue'
TTL = 60
LABEL = 'label'

# if directory exists, remove it
if os.path.exists(PATH):
remove_mpmc_queue(PATH)

q = MpmcQueue(PATH, TTL)

start = time.time()
for i in range(OPS):
data = [bytes(str(i), 'utf-8')]
q.add(data, no_gil=RELEASE_GIL)

for i in range(OPS):
v = q.next(label=LABEL, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(i), 'utf-8')]

end = time.time()

print("Time taken: %f" % (end - start))
58 changes: 58 additions & 0 deletions python/mpmc_queue_nonblocking_mixed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import time
import os
from rocksq import remove_mpmc_queue, StartPosition
from rocksq.nonblocking import MpmcQueue

NUM = 1
OPS = 1000
RELEASE_GIL = True
PATH = '/tmp/mpmc-queue'
TTL = 60
LABEL_ONE = 'label1'
LABEL_TWO = 'label2'
LABEL_THREE = 'label3'

# if directory exists, remove it
if os.path.exists(PATH):
remove_mpmc_queue(PATH)

q = MpmcQueue(PATH, TTL)

start = time.time()
for i in range(OPS):
data = [bytes(str(i), 'utf-8')]
q.add(data, no_gil=RELEASE_GIL).get()
v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == data

end = time.time()

print("Time taken: %f" % (end - start))

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == 0

v = q.next(label=LABEL_TWO, start_position=StartPosition.Newest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == [bytes(str(OPS-1), 'utf-8')]

labels = q.labels.get().labels
assert len(labels) == 2
assert LABEL_ONE in labels
assert LABEL_TWO in labels

r = q.remove_label(LABEL_THREE).get().removed_label
assert not r

r = q.remove_label(LABEL_ONE).get().removed_label
assert r

labels = q.labels.get().labels
assert len(labels) == 1
assert LABEL_ONE not in labels
assert LABEL_TWO in labels

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == [bytes(str(0), 'utf-8')]
28 changes: 27 additions & 1 deletion queue_py/python/rocksq/blocking/blocking.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from rocksq import StartPosition

class PersistentQueueWithCapacity:
def __init__(self, path: str, max_elements: int = 1_000_000_000): ...

Expand All @@ -15,4 +17,28 @@ class PersistentQueueWithCapacity:
def payload_size(self) -> int: ...

@property
def len(self) -> int: ...
def len(self) -> int: ...

class MpmcQueue:
def __init__(self, path: str, ttl: int): ...

def add(self, items: list[bytes], no_gil: bool = True): ...

def next(self, label: str, start_position: StartPosition, max_elements: int = 1, no_gil: bool = True) -> list[bytes]: ...

@property
def is_empty(self) -> bool: ...

@property
def disk_size(self) -> int: ...

@property
def payload_size(self) -> int: ...

@property
def len(self) -> int: ...

@property
def labels(self): list[str]: ...

def remove_label(self, label: str) -> bool: ...
47 changes: 46 additions & 1 deletion queue_py/python/rocksq/nonblocking/nonblocking.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Optional

from rocksq import StartPosition

class ResponseVariant:
@property
Expand Down Expand Up @@ -39,3 +39,48 @@ class PersistentQueueWithCapacity:

@property
def len(self) -> Response: ...

class MpmcResponseVariant:
@property
def data(self) -> Optional[list[bytes]]: ...

@property
def labels(self) -> Optional[list[str]]: ...

@property
def removed_label(self) -> Optional[bool]: ...

@property
def len(self) -> Optional[int]: ...

@property
def size(self) -> Optional[int]: ...

class MpmcResponse:
@property
def is_ready(self) -> bool: ...

def try_get(self) -> Optional[MpmcResponseVariant]: ...

def get(self) -> MpmcResponseVariant: ...

class MpmcQueue:
def __init__(self, path: str, ttl: int, max_inflight_ops: int = 1_000): ...

def add(self, items: list[bytes], no_gil: bool = True) -> MpmcResponse: ...

@property
def inflight_ops(self) -> int: ...

def next(self, label: str, start_position: StartPosition, max_elements = 1, no_gil: bool = True) -> MpmcResponse: ...

@property
def disk_size(self) -> MpmcResponse: ...

@property
def len(self) -> MpmcResponse: ...

@property
def labels(self) -> MpmcResponse: ...

def remove_label(self, label: str) -> MpmcResponse: ...
7 changes: 7 additions & 0 deletions queue_py/python/rocksq/rocksq.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from enum import Enum

def version() -> str: ...

def remove_queue(queue_name: str): ...

def remove_mpmc_queue(queue_name: str): ...

class StartPosition(Enum):
Oldest=0
Newest=1
Loading
Loading