Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Awaited send message #71

Open
wants to merge 3 commits into
base: async
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions oscpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,52 @@ def send_bundle(self, messages, timetag=None, safer=False):
)
self.stats += stats
return stats


# async versions for curio and trio implemenations
# sock.sendto on an awaited server socket needs to be awaited as well

async def async_send_message(
osc_address, values, ip_address, port, sock=SOCK, safer=False,
encoding='', encoding_errors='strict'
):
"""Send an osc message to a socket address async version.

See `send_message` for usage information
"""
if platform != 'win32' and sock.family == socket.AF_UNIX:
address = ip_address
else:
address = (ip_address, port)

message, stats = format_message(
osc_address, values, encoding=encoding,
encoding_errors=encoding_errors
)

await sock.sendto(message, address)
if safer:
sleep(10e-9)

return stats


async def async_send_bundle(
messages, ip_address, port, timetag=None, sock=None, safer=False,
encoding='', encoding_errors='strict'
):
"""Send a bundle built from the `messages` iterable.

See `send_bundle` for usage information.
"""
if not sock:
sock = SOCK
bundle, stats = format_bundle(
messages, timetag=timetag, encoding=encoding,
encoding_errors=encoding_errors
)
await sock.sendto(bundle, (ip_address, port))
if safer:
sleep(10e-9)

return stats
47 changes: 47 additions & 0 deletions oscpy/server/curio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from curio import TaskGroup, socket
from oscpy.server import OSCBaseServer, UDP_MAX_SIZE
from oscpy.client import async_send_bundle, async_send_message

logging.basicConfig()
logger = logging.getLogger(__name__)
Expand All @@ -23,6 +24,52 @@ def get_socket(family, addr):
sock.bind(addr)
return sock

async def send_message(self,
osc_address, values, ip_address, port, sock=None, safer=False,
encoding='', encoding_errors='strict'
):
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')
stats = await async_send_message(
osc_address,
values,
ip_address,
port,
sock=sock,
safer=safer,
encoding=self.encoding,
encoding_errors=self.encoding_errors
)
self.stats_sent += stats
return stats

async def send_bundle(
self, messages, ip_address, port, timetag=None, sock=None, safer=False
):
"""Shortcut to the client's `send_bundle` method.

Use the `default_socket` of the server by default.
See `client.send_bundle` for more info about the parameters.
"""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')

stats = await async_send_bundle(
messages,
ip_address,
port,
sock=sock,
safer=safer,
encoding=self.encoding,
encoding_errors=self.encoding_errors
)
self.stats_sent += stats
return stats

async def _listen(self, sock):
async with TaskGroup(wait=all) as g:
self.task_groups[sock] = g
Expand Down
47 changes: 47 additions & 0 deletions oscpy/server/trio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from trio import socket, open_nursery, move_on_after
from oscpy.server import OSCBaseServer, UDP_MAX_SIZE
from oscpy.client import async_send_bundle, async_send_message

logging.basicConfig()
logger = logging.getLogger(__name__)
Expand All @@ -24,6 +25,52 @@ async def get_socket(family, addr):
await sock.bind(addr)
return sock

async def send_message(self,
osc_address, values, ip_address, port, sock=None, safer=False,
encoding='', encoding_errors='strict'
):
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')
stats = await async_send_message(
osc_address,
values,
ip_address,
port,
sock=sock,
safer=safer,
encoding=self.encoding,
encoding_errors=self.encoding_errors
)
self.stats_sent += stats
return stats

async def send_bundle(
self, messages, ip_address, port, timetag=None, sock=None, safer=False
):
"""Shortcut to the client's `send_bundle` method.

Use the `default_socket` of the server by default.
See `client.send_bundle` for more info about the parameters.
"""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')

stats = await async_send_bundle(
messages,
ip_address,
port,
sock=sock,
safer=safer,
encoding=self.encoding,
encoding_errors=self.encoding_errors
)
self.stats_sent += stats
return stats

async def listen(
self, address='localhost', port=0, default=False, family='inet'
):
Expand Down
41 changes: 32 additions & 9 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,31 @@ def test_terminate_server(cls):
assert osc.join_server(timeout=0.1)
assert not osc._thread.is_alive()

@pytest.mark.parametrize("cls", server_classes)
def test_send_message_using_default_socket(cls):

event = Event()

def success(*values):
event.set()

osc = cls()
sock = _await(osc.listen, osc)
address = sock.getsockname()[0]
port = sock.getsockname()[1]
osc.bind(b'/success', success, sock)

_await(osc.listen, osc, kwargs=dict(default=True))
_await(osc.send_message, osc, args=[b'/success', [], address, port])

runner(osc, timeout=.2)
assert event.is_set()

@pytest.mark.parametrize("cls", server_classes)
def test_send_message_without_socket(cls):
osc = cls()
with pytest.raises(RuntimeError):
osc.send_message(b'/test', [], 'localhost', 0)
_await(osc.send_message, osc, args=[b'/test', [], 'localhost', 0])


@pytest.mark.parametrize("cls", server_classes)
Expand Down Expand Up @@ -187,15 +206,16 @@ def broken_callback(*values):
def test_send_bundle_without_socket(cls):
osc = cls()
with pytest.raises(RuntimeError):
osc.send_bundle([], 'localhost', 0)
_await(osc.send_bundle, osc, args=[[], 'localhost', 0])

sock = _await(osc.listen, osc, kwargs={'default': True})
osc.send_bundle(
(
(b'/test', []),
),
'localhost', 1
)
_await(osc.send_bundle, osc,
args=[(
(b'/test', []),
),
'localhost', 1
]
)


@pytest.mark.parametrize("cls", server_classes)
Expand Down Expand Up @@ -964,7 +984,10 @@ def callback(index):
client.send_message('/callback', [0])

# sever sends message on different port, might crash the server on windows:
osc.send_message('/callback', ["nobody is going to receive this"], ip_address='localhost', port=port + 1)
_await(osc.send_message, osc,
args=['/callback', ["nobody is going to receive this"]],
kwargs=dict(ip_address='localhost', port=port + 1)
)

# client sends message to server again. if server is dead, message
# will not be received:
Expand Down