Skip to content

Commit

Permalink
Support async sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Wundero committed Dec 19, 2024
1 parent 61db2f4 commit d03e475
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion packages/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sinkr"
version = "0.1.4"
version = "0.2.0"
description = "Sinkr Python SDK"
authors = ["Wundero <[email protected]>"]
readme = "README.md"
Expand Down
28 changes: 21 additions & 7 deletions packages/python/sinkr/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from websockets.asyncio.client import connect
from contextlib import AbstractAsyncContextManager
import nanoid
import asyncio


class SinkrSink(AbstractAsyncContextManager):
Expand Down Expand Up @@ -64,9 +65,16 @@ def once(self, event: Optional[str], callback: callable):
:return: A function to unsubscribe from the event. Does nothing if the event has already been received.
"""

def once_callback(data):
callback(data)
off(event, once_callback)
if asyncio.iscoroutinefunction(callback):

async def once_callback(data):
await callback(data)
off()
else:

def once_callback(data):
callback(data)
off(event, once_callback)

off = self.on(event, once_callback)
return off
Expand All @@ -83,22 +91,28 @@ def clear_listeners(self, event: Optional[str] = None):
else:
self.callbacks[event] = {}

def __trigger_callbacks(self, message: str):
async def __trigger_callbacks(self, message: str):
data = json.loads(message)
event = data["data"]["event"]
relevant_callbacks = self.callbacks.get(event, {})
for callback in relevant_callbacks.values():
callback(data["data"])
if asyncio.iscoroutinefunction(callback):
await callback(data["data"])
else:
callback(data["data"])
for callback in self.global_callbacks.values():
callback(data["data"])
if asyncio.iscoroutinefunction(callback):
await callback(data["data"])
else:
callback(data["data"])

async def __iter__messages(self):
while True:
if not self.ws:
return
message = await self.ws.recv(True)
self.messages.append(message)
self.__trigger_callbacks(message)
await self.__trigger_callbacks(message)

async def __aenter__(self):
self.ws = await connect(self.url)
Expand Down
4 changes: 2 additions & 2 deletions packages/python/sinkr/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def __fetch(self, body):
"data": part,
}
await self.ws_session.send(json.dumps(obj))
resp = await self.ws_session.recv()
resp = await self.ws_session.recv(True)
resp_obj = json.loads(resp)
resp_id = resp_obj.get("id")
if resp_id == part_id:
Expand All @@ -130,7 +130,7 @@ async def __fetch(self, body):
"data": part,
}
await self.ws_session.send(json.dumps(obj))
resp = await self.ws_session.recv()
resp = await self.ws_session.recv(True)
resp_obj = json.loads(resp)
resp_id = resp_obj.get("id")
if resp_id == part_id:
Expand Down

0 comments on commit d03e475

Please sign in to comment.