Skip to content

Commit

Permalink
Merge pull request #3 from insight-platform/mpmc
Browse files Browse the repository at this point in the history
Implement mpmc queue
  • Loading branch information
ksenia-vazhdaeva authored Aug 22, 2024
2 parents 9958df3 + 2c3591c commit a36fe68
Show file tree
Hide file tree
Showing 15 changed files with 2,686 additions and 144 deletions.
58 changes: 58 additions & 0 deletions python/mpmc_queue_blocking_mixed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import time
import os
from rocksq import remove_mpmc_queue, StartPosition
from rocksq.blocking import MpmcQueue

NUM = 1
OPS = 1000
RELEASE_GIL = True
PATH = '/tmp/mpmc-queue'
TTL = 60
LABEL_ONE = 'label1'
LABEL_TWO = 'label2'
LABEL_THREE = 'label3'

# if directory exists, remove it
if os.path.exists(PATH):
remove_mpmc_queue(PATH)

q = MpmcQueue(PATH, TTL)

start = time.time()
for i in range(OPS):
data = [bytes(str(i), 'utf-8')]
q.add(data, no_gil=RELEASE_GIL)
v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == data

end = time.time()

print("Time taken: %f" % (end - start))

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == 0

v = q.next(label=LABEL_TWO, start_position=StartPosition.Newest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(OPS-1), 'utf-8')]

labels = q.labels
assert len(labels) == 2
assert LABEL_ONE in labels
assert LABEL_TWO in labels

r = q.remove_label(LABEL_THREE)
assert not r

r = q.remove_label(LABEL_ONE)
assert r

labels = q.labels
assert len(labels) == 1
assert LABEL_ONE not in labels
assert LABEL_TWO in labels

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(0), 'utf-8')]
31 changes: 31 additions & 0 deletions python/mpmc_queue_blocking_w_r.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import time
import os
from rocksq import remove_mpmc_queue, StartPosition
from rocksq.blocking import MpmcQueue

NUM = 1
OPS = 1000
RELEASE_GIL = True
PATH = '/tmp/mpmc-queue'
TTL = 60
LABEL = 'label'

# if directory exists, remove it
if os.path.exists(PATH):
remove_mpmc_queue(PATH)

q = MpmcQueue(PATH, TTL)

start = time.time()
for i in range(OPS):
data = [bytes(str(i), 'utf-8')]
q.add(data, no_gil=RELEASE_GIL)

for i in range(OPS):
v = q.next(label=LABEL, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(i), 'utf-8')]

end = time.time()

print("Time taken: %f" % (end - start))
58 changes: 58 additions & 0 deletions python/mpmc_queue_nonblocking_mixed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import time
import os
from rocksq import remove_mpmc_queue, StartPosition
from rocksq.nonblocking import MpmcQueue

NUM = 1
OPS = 1000
RELEASE_GIL = True
PATH = '/tmp/mpmc-queue'
TTL = 60
LABEL_ONE = 'label1'
LABEL_TWO = 'label2'
LABEL_THREE = 'label3'

# if directory exists, remove it
if os.path.exists(PATH):
remove_mpmc_queue(PATH)

q = MpmcQueue(PATH, TTL)

start = time.time()
for i in range(OPS):
data = [bytes(str(i), 'utf-8')]
q.add(data, no_gil=RELEASE_GIL).get()
v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == data

end = time.time()

print("Time taken: %f" % (end - start))

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == 0

v = q.next(label=LABEL_TWO, start_position=StartPosition.Newest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == [bytes(str(OPS-1), 'utf-8')]

labels = q.labels.get().labels
assert len(labels) == 2
assert LABEL_ONE in labels
assert LABEL_TWO in labels

r = q.remove_label(LABEL_THREE).get().removed_label
assert not r

r = q.remove_label(LABEL_ONE).get().removed_label
assert r

labels = q.labels.get().labels
assert len(labels) == 1
assert LABEL_ONE not in labels
assert LABEL_TWO in labels

v = q.next(label=LABEL_ONE, start_position=StartPosition.Oldest, max_elements=NUM, no_gil=RELEASE_GIL).get().data
assert len(v) == NUM
assert v == [bytes(str(0), 'utf-8')]
28 changes: 27 additions & 1 deletion queue_py/python/rocksq/blocking/blocking.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from rocksq import StartPosition

class PersistentQueueWithCapacity:
def __init__(self, path: str, max_elements: int = 1_000_000_000): ...

Expand All @@ -15,4 +17,28 @@ class PersistentQueueWithCapacity:
def payload_size(self) -> int: ...

@property
def len(self) -> int: ...
def len(self) -> int: ...

class MpmcQueue:
def __init__(self, path: str, ttl: int): ...

def add(self, items: list[bytes], no_gil: bool = True): ...

def next(self, label: str, start_position: StartPosition, max_elements: int = 1, no_gil: bool = True) -> list[bytes]: ...

@property
def is_empty(self) -> bool: ...

@property
def disk_size(self) -> int: ...

@property
def payload_size(self) -> int: ...

@property
def len(self) -> int: ...

@property
def labels(self): list[str]: ...

def remove_label(self, label: str) -> bool: ...
47 changes: 46 additions & 1 deletion queue_py/python/rocksq/nonblocking/nonblocking.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Optional

from rocksq import StartPosition

class ResponseVariant:
@property
Expand Down Expand Up @@ -39,3 +39,48 @@ class PersistentQueueWithCapacity:

@property
def len(self) -> Response: ...

class MpmcResponseVariant:
@property
def data(self) -> Optional[list[bytes]]: ...

@property
def labels(self) -> Optional[list[str]]: ...

@property
def removed_label(self) -> Optional[bool]: ...

@property
def len(self) -> Optional[int]: ...

@property
def size(self) -> Optional[int]: ...

class MpmcResponse:
@property
def is_ready(self) -> bool: ...

def try_get(self) -> Optional[MpmcResponseVariant]: ...

def get(self) -> MpmcResponseVariant: ...

class MpmcQueue:
def __init__(self, path: str, ttl: int, max_inflight_ops: int = 1_000): ...

def add(self, items: list[bytes], no_gil: bool = True) -> MpmcResponse: ...

@property
def inflight_ops(self) -> int: ...

def next(self, label: str, start_position: StartPosition, max_elements = 1, no_gil: bool = True) -> MpmcResponse: ...

@property
def disk_size(self) -> MpmcResponse: ...

@property
def len(self) -> MpmcResponse: ...

@property
def labels(self) -> MpmcResponse: ...

def remove_label(self, label: str) -> MpmcResponse: ...
7 changes: 7 additions & 0 deletions queue_py/python/rocksq/rocksq.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from enum import Enum

def version() -> str: ...

def remove_queue(queue_name: str): ...

def remove_mpmc_queue(queue_name: str): ...

class StartPosition(Enum):
Oldest=0
Newest=1
Loading

0 comments on commit a36fe68

Please sign in to comment.