Skip to content

Commit

Permalink
VER: Release 0.29.0
Browse files Browse the repository at this point in the history
See release notes.
  • Loading branch information
nmacholl authored Feb 13, 2024
2 parents 8287c53 + 481242b commit b8a1227
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 53 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ jobs:
if: ${{ github.event.workflow_run.conclusion == 'success' }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.10"
- uses: snok/install-poetry@v1
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ jobs:
runs-on: ${{ matrix.os }}

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- uses: snok/install-poetry@v1
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 0.29.0 - 2024-02-13

#### Enhancements
- Added `tz` parameter to `DBNStore.to_df` which will convert all timestamp fields from UTC to a specified timezone when used with `pretty_ts`

#### Bug fixes
- `Live.block_for_close` and `Live.wait_for_close` will now call `Live.stop` when a timeout is reached instead of `Live.terminate` to close the stream more gracefully

## 0.28.0 - 2024-02-01

#### Enhancements
Expand Down
22 changes: 20 additions & 2 deletions databento/common/dbnstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytz
import zstandard
from databento_dbn import FIXED_PRICE_SCALE
from databento_dbn import Compression
Expand All @@ -47,6 +48,7 @@
from databento.common.error import BentoError
from databento.common.symbology import InstrumentMap
from databento.common.types import DBNRecord
from databento.common.types import Default
from databento.common.validation import validate_enum
from databento.common.validation import validate_file_write_path
from databento.common.validation import validate_maybe_enum
Expand Down Expand Up @@ -830,6 +832,7 @@ def to_df(
pretty_ts: bool = ...,
map_symbols: bool = ...,
schema: Schema | str | None = ...,
tz: pytz.BaseTzInfo | str = ...,
count: None = ...,
) -> pd.DataFrame:
...
Expand All @@ -841,6 +844,7 @@ def to_df(
pretty_ts: bool = ...,
map_symbols: bool = ...,
schema: Schema | str | None = ...,
tz: pytz.BaseTzInfo | str = ...,
count: int = ...,
) -> DataFrameIterator:
...
Expand All @@ -851,6 +855,7 @@ def to_df(
pretty_ts: bool = True,
map_symbols: bool = True,
schema: Schema | str | None = None,
tz: pytz.BaseTzInfo | str | Default[pytz.BaseTzInfo] = Default[pytz.BaseTzInfo](pytz.UTC),
count: int | None = None,
) -> pd.DataFrame | DataFrameIterator:
"""
Expand All @@ -865,14 +870,16 @@ def to_df(
If "decimal", prices will be instances of `decimal.Decimal`.
pretty_ts : bool, default True
If all timestamp columns should be converted from UNIX nanosecond
`int` to tz-aware UTC `pd.Timestamp`.
`int` to tz-aware `pd.Timestamp`. The timezone can be specified using the `tz` parameter.
map_symbols : bool, default True
If symbology mappings from the metadata should be used to create
a 'symbol' column, mapping the instrument ID to its requested symbol for
every record.
schema : Schema or str, optional
The DBN schema for the dataframe.
This is only required when reading a DBN stream with mixed record types.
tz : pytz.BaseTzInfo or str, default UTC
If `pretty_ts` is `True`, all timestamps will be converted to the specified timezone.
count : int, optional
If set, instead of returning a single `DataFrame` a `DataFrameIterator`
instance will be returned. When iterated, this object will yield
Expand All @@ -892,6 +899,14 @@ def to_df(
"""
schema = validate_maybe_enum(schema, Schema, "schema")

if isinstance(tz, Default):
tz = tz.value # consume default
elif not pretty_ts:
raise ValueError("A timezone was specified when `pretty_ts` is `False`. Did you mean to set `pretty_ts=True`?")

if not isinstance(tz, pytz.BaseTzInfo):
tz = pytz.timezone(tz)
if schema is None:
if self.schema is None:
raise ValueError("a schema must be specified for mixed DBN data")
Expand All @@ -910,6 +925,7 @@ def to_df(
count=count,
struct_type=self._schema_struct_map[schema],
instrument_map=self._instrument_map,
tz=tz,
price_type=price_type,
pretty_ts=pretty_ts,
map_symbols=map_symbols,
Expand Down Expand Up @@ -1334,6 +1350,7 @@ def __init__(
count: int | None,
struct_type: type[DBNRecord],
instrument_map: InstrumentMap,
tz: pytz.BaseTzInfo,
price_type: Literal["fixed", "float", "decimal"] = "float",
pretty_ts: bool = True,
map_symbols: bool = True,
Expand All @@ -1345,6 +1362,7 @@ def __init__(
self._pretty_ts = pretty_ts
self._map_symbols = map_symbols
self._instrument_map = instrument_map
self._tz = tz

def __iter__(self) -> DataFrameIterator:
return self
Expand Down Expand Up @@ -1411,7 +1429,7 @@ def _format_px(

def _format_pretty_ts(self, df: pd.DataFrame) -> None:
for field in self._struct_type._timestamp_fields:
df[field] = pd.to_datetime(df[field], utc=True, errors="coerce")
df[field] = pd.to_datetime(df[field], utc=True, errors="coerce").dt.tz_convert(self._tz)

def _format_set_index(self, df: pd.DataFrame) -> None:
index_column = self._struct_type._ordered_fields[0]
Expand Down
33 changes: 32 additions & 1 deletion databento/common/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Union
from typing import Callable, Generic, TypeVar, Union

import databento_dbn

Expand All @@ -21,3 +21,34 @@

RecordCallback = Callable[[DBNRecord], None]
ExceptionCallback = Callable[[Exception], None]

_T = TypeVar("_T")
class Default(Generic[_T]):
"""
A container for a default value. This is to be used when a callable wants
to detect if a default parameter value is being used.
Example
-------
def foo(param=Default[int](10)):
if isinstance(param, Default):
print(f"param={param.value} (default)")
else:
print(f"param={param.value}")
"""

def __init__(self, value: _T):
self._value = value

@property
def value(self) -> _T:
"""
The default value.
Returns
-------
_T
"""
return self._value
17 changes: 11 additions & 6 deletions databento/live/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,7 @@ def subscribe(
stype_in : SType or str, default 'raw_symbol'
The input symbology type to resolve from.
start : str or int, optional
UNIX nanosecond epoch timestamp to start streaming from. Must be
within 24 hours.
UNIX nanosecond epoch timestamp to start streaming from (inclusive), based on `ts_event`. Must be within 24 hours except when requesting the mbo or definition schemas.
Raises
------
Expand Down Expand Up @@ -538,6 +537,9 @@ def block_for_close(
Block until the session closes or a timeout is reached. A session will
close after `Live.stop` is called or the remote gateway disconnects.
If a `timeout` is specified, `Live.stop` will be called when the
timeout is reached.
Parameters
----------
timeout : float, optional
Expand All @@ -562,8 +564,8 @@ def block_for_close(
loop=Live._loop,
).result(timeout=timeout)
except (futures.TimeoutError, KeyboardInterrupt) as exc:
logger.info("terminating session due to %s", type(exc).__name__)
self.terminate()
logger.info("closing session due to %s", type(exc).__name__)
self.stop()
if isinstance(exc, KeyboardInterrupt):
raise
except BentoError:
Expand All @@ -582,6 +584,9 @@ async def wait_for_close(
session will close after `Live.stop` is called or the remote gateway
disconnects.
If a `timeout` is specified, `Live.stop` will be called when the
timeout is reached.
Parameters
----------
timeout : float, optional
Expand Down Expand Up @@ -610,8 +615,8 @@ async def wait_for_close(
try:
await asyncio.wait_for(waiter, timeout=timeout)
except (asyncio.TimeoutError, KeyboardInterrupt) as exc:
logger.info("terminating session due to %s", type(exc).__name__)
self.terminate()
logger.info("closing session due to %s", type(exc).__name__)
self.stop()
if isinstance(exc, KeyboardInterrupt):
raise
except BentoError:
Expand Down
2 changes: 1 addition & 1 deletion databento/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.28.0"
__version__ = "0.29.0"
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "databento"
version = "0.28.0"
version = "0.29.0"
description = "Official Python client library for Databento"
authors = [
"Databento <[email protected]>",
Expand Down Expand Up @@ -51,6 +51,7 @@ ruff = "^0.0.291"
types-requests = "^2.30.0.0"
tomli = "^2.0.1"
teamcity-messages = "^1.32"
types-pytz = "^2024.1.0.20240203"

[build-system]
requires = ["poetry-core"]
Expand Down
52 changes: 30 additions & 22 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
import random
import string
import threading
from collections.abc import AsyncGenerator
from collections.abc import Generator
from collections.abc import Iterable
from typing import Callable

import pytest
import pytest_asyncio
from databento import historical
from databento import live
from databento.common.publishers import Dataset
Expand Down Expand Up @@ -200,13 +198,35 @@ def fixture_test_api_key() -> str:
return f"db-{random_str}"


@pytest_asyncio.fixture(name="mock_live_server")
async def fixture_mock_live_server(
@pytest.fixture(name="thread_loop", scope="session")
def fixture_thread_loop() -> Generator[asyncio.AbstractEventLoop, None, None]:
"""
Fixture for a threaded event loop.
Yields
------
asyncio.AbstractEventLoop
"""
loop = asyncio.new_event_loop()
thread = threading.Thread(
name="MockLiveServer",
target=loop.run_forever,
args=(),
daemon=True,
)
thread.start()
yield loop
loop.stop()

@pytest.fixture(name="mock_live_server")
def fixture_mock_live_server(
thread_loop: asyncio.AbstractEventLoop,
test_api_key: str,
caplog: pytest.LogCaptureFixture,
unused_tcp_port: int,
monkeypatch: pytest.MonkeyPatch,
) -> AsyncGenerator[MockLiveServer, None]:
) -> Generator[MockLiveServer, None, None]:
"""
Fixture for a MockLiveServer instance.
Expand All @@ -229,34 +249,22 @@ async def fixture_mock_live_server(
"CONNECT_TIMEOUT_SECONDS",
1,
)

loop = asyncio.new_event_loop()
thread = threading.Thread(
name="MockLiveServer",
target=loop.run_forever,
args=(),
daemon=True,
)
thread.start()

with caplog.at_level("DEBUG"):
mock_live_server = asyncio.run_coroutine_threadsafe(
coro=MockLiveServer.create(
host="127.0.0.1",
port=unused_tcp_port,
dbn_path=TESTS_ROOT / "data",
),
loop=loop,
loop=thread_loop,
).result()

yield mock_live_server

loop.run_in_executor(
None,
loop.stop,
)
thread.join()

asyncio.run_coroutine_threadsafe(
coro=mock_live_server.stop(),
loop=thread_loop,
).result()

@pytest.fixture(name="historical_client")
def fixture_historical_client(
Expand Down
1 change: 0 additions & 1 deletion tests/mock_live_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,6 @@ async def stop(self) -> None:
)

self.server.close()
await self.server.wait_closed()


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit b8a1227

Please sign in to comment.