Skip to content

Commit

Permalink
Merge pull request #48 from blocknative/fix/reconnect-on-ping-timeout
Browse files Browse the repository at this point in the history
0.2.9: Reconnect on TooSlow error
  • Loading branch information
taylorjdawson authored Jul 5, 2022
2 parents 9fb3d52 + a900bab commit dff993c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
2 changes: 1 addition & 1 deletion blocknative/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.8'
__version__ = '0.2.9'
27 changes: 25 additions & 2 deletions blocknative/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import List, Mapping, Callable, Union
import trio
import logging
from logging import INFO
from trio_websocket import (
open_websocket_url,
ConnectionClosed,
Expand All @@ -24,8 +25,12 @@
SubscriptionType,
to_camel_case,
)

from blocknative import __version__ as API_VERSION

FORMAT = "%(asctime)s [%(levelname)s]: %(message)s"
logging.basicConfig(format=FORMAT, level=INFO)

PING_INTERVAL = 15
PING_TIMEOUT = 10
MESSAGE_SEND_INTERVAL = 0.021 # 21ms
Expand Down Expand Up @@ -174,7 +179,11 @@ def subscribe_txn(self, tx_hash: str, callback: Callback, status: str = "sent"):
self._send_txn_watch_message(tx_hash, status)

def connect(self, base_url: str = BN_BASE_URL):
"""Initializes the connection to the WebSocket server."""
"""Initializes the connection to the WebSocket server.
Args:
base_url: The websocket url to connect to. Useful for when using a proxy.
"""
try:
return trio.run(self._connect, base_url)
except KeyboardInterrupt:
Expand Down Expand Up @@ -267,6 +276,15 @@ async def _message_handler(self, message: dict):
)

def unsubscribe(self, watched_address):
"""Unsubscribe from the current stream.
Note:
This function is passed as a parameter to the to the transaction callback that you provide.
Args:
watched_address: The address to unsubscribe from.
"""

# remove this subscription from the registry so that we don't execute the callback
del self._subscription_registry[watched_address]

Expand Down Expand Up @@ -330,7 +348,12 @@ async def _handle_connection(self, base_url: str):
nursery.start_soon(self._heartbeat)
nursery.start_soon(self._poll_messages)
nursery.start_soon(self._message_dispatcher)
except (ConnectionClosed, trio.MultiError) as error:
except (ConnectionClosed, trio.MultiError, trio.TooSlowError) as error:
if isinstance(error, trio.TooSlowError):
logging.warn(
f"Server failed to respond to ping within the given timeout of {PING_TIMEOUT} seconds."
)
logging.info("Attempting to reconnect...")
# If server times the connection out or drops, reconnect
await trio.sleep(0.5)
await self._connect(base_url)
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Application Programming Interface
global_filters = [{ "status": "pending" }]
Stream(API_KEY, BLOCKCHAIN, network_id, global_filters)
.. autofunction:: blocknative.stream.Stream.connect

.. autofunction:: blocknative.stream.Stream.subscribe_address

Expand Down

0 comments on commit dff993c

Please sign in to comment.