From c84a06e252b1feb6fea13f68656eba1b834ccf14 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Wed, 11 Oct 2023 11:43:32 -0700 Subject: [PATCH 1/8] DEL: Remove DBN tasks from Live client --- databento/live/session.py | 104 ++++++++++++++------------------------ 1 file changed, 38 insertions(+), 66 deletions(-) diff --git a/databento/live/session.py b/databento/live/session.py index ee5c316..4bfc602 100644 --- a/databento/live/session.py +++ b/databento/live/session.py @@ -134,7 +134,6 @@ def __init__( self._dbn_queue = dbn_queue self._loop = loop self._metadata: SessionMetadata = metadata - self._tasks: set[asyncio.Task[None]] = set() self._user_callbacks = user_callbacks self._user_streams = user_streams @@ -142,29 +141,52 @@ def received_metadata(self, metadata: databento_dbn.Metadata) -> None: if not self._metadata: self._metadata.data = metadata for stream, exc_callback in self._user_streams.items(): - task = self._loop.create_task( - self._stream_task(stream, metadata, exc_callback), - ) - task.add_done_callback(self._tasks.remove) - self._tasks.add(task) + try: + stream.write(bytes(metadata)) + except Exception as exc: + stream_name = getattr(stream, "name", str(stream)) + logger.error( + "error writing %s to `%s` stream", + type(metadata).__name__, + stream_name, + exc_info=exc, + ) + if exc_callback is not None: + exc_callback(exc) else: self._metadata.check(metadata) return super().received_metadata(metadata) def received_record(self, record: DBNRecord) -> None: for callback, exc_callback in self._user_callbacks.items(): - task = self._loop.create_task( - self._callback_task(callback, record, exc_callback), - ) - task.add_done_callback(self._tasks.remove) - self._tasks.add(task) + try: + callback(record) + except Exception as exc: + logger.error( + "error dispatching %s to `%s` callback", + type(record).__name__, + getattr(callback, "__name__", str(callback)), + exc_info=exc, + ) + if exc_callback is not None: + exc_callback(exc) + has_ts_out = self._metadata.data and self._metadata.data.ts_out for stream, exc_callback in self._user_streams.items(): - task = self._loop.create_task( - self._stream_task(stream, record, exc_callback), - ) - task.add_done_callback(self._tasks.remove) - self._tasks.add(task) + try: + stream.write(bytes(record)) + if not isinstance(record, databento_dbn.Metadata) and has_ts_out: + stream.write(struct.pack("Q", record.ts_out)) + except Exception as exc: + stream_name = getattr(stream, "name", str(stream)) + logger.error( + "error writing %s to `%s` stream", + type(record).__name__, + stream_name, + exc_info=exc, + ) + if exc_callback is not None: + exc_callback(exc) if self._dbn_queue.enabled: try: @@ -185,55 +207,6 @@ def received_record(self, record: DBNRecord) -> None: return super().received_record(record) - async def _callback_task( - self, - record_callback: RecordCallback, - record: DBNRecord, - exception_callback: ExceptionCallback | None, - ) -> None: - try: - record_callback(record) - except Exception as exc: - logger.error( - "error dispatching %s to `%s` callback", - type(record).__name__, - getattr(record_callback, "__name__", str(record_callback)), - exc_info=exc, - ) - if exception_callback is not None: - self._loop.call_soon_threadsafe(exception_callback, exc) - - async def _stream_task( - self, - stream: IO[bytes], - record: databento_dbn.Metadata | DBNRecord, - exc_callback: ExceptionCallback | None, - ) -> None: - has_ts_out = self._metadata.data and self._metadata.data.ts_out - try: - stream.write(bytes(record)) - if not isinstance(record, databento_dbn.Metadata) and has_ts_out: - stream.write(struct.pack("Q", record.ts_out)) - except Exception as exc: - stream_name = getattr(stream, "name", str(stream)) - logger.error( - "error writing %s to `%s` stream", - type(record).__name__, - stream_name, - exc_info=exc, - ) - if exc_callback is not None: - self._loop.call_soon_threadsafe(exc_callback, exc) - - async def wait_for_processing(self) -> None: - while self._tasks: - logger.info( - "waiting for %d record(s) to process", - len(self._tasks), - ) - await asyncio.gather(*self._tasks) - - class Session: """ Parameters @@ -446,7 +419,6 @@ async def wait_for_close(self) -> None: await self._protocol.authenticated await self._protocol.disconnected - await self._protocol.wait_for_processing() try: self._protocol.authenticated.result() From d6f4892626d5aee8e701a7877b1ee70c052ac661 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Fri, 13 Oct 2023 15:08:21 -0700 Subject: [PATCH 2/8] ADD: Python support for decimal.Decimal --- CHANGELOG.md | 8 ++ databento/common/dbnstore.py | 168 ++++++++++++++++++++++----------- tests/test_historical_bento.py | 30 ++++-- 3 files changed, 142 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf8dfe9..f37a8c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 0.22.0 - TBD + +#### Enhancements +- Added `price_type` argument for `DBNStore.to_df` to specify if price fields should be `fixed`, `float` or `decimal.Decimal` + +#### Deprecations +- Deprecated `pretty_px` argument for `DBNStore.to_df` to be removed in a future release; the default `pretty_px=True` is now equivalent to `price_type="float"` and `pretty_px=False` is now equivalent to `price_type="fixed"` + ## 0.21.0 - 2023-10-11 #### Enhancements diff --git a/databento/common/dbnstore.py b/databento/common/dbnstore.py index 6d50c46..6eba231 100644 --- a/databento/common/dbnstore.py +++ b/databento/common/dbnstore.py @@ -1,14 +1,17 @@ from __future__ import annotations import abc +import decimal import itertools import logging +import warnings from collections.abc import Generator from collections.abc import Iterator +from functools import partial from io import BytesIO from os import PathLike from pathlib import Path -from typing import IO, TYPE_CHECKING, Any, Callable, overload +from typing import IO, TYPE_CHECKING, Any, Callable, Literal, overload import databento_dbn import numpy as np @@ -44,7 +47,6 @@ INT64_NULL = 9223372036854775807 - logger = logging.getLogger(__name__) if TYPE_CHECKING: @@ -92,37 +94,6 @@ def is_dbn(reader: IO[bytes]) -> bool: return reader.read(3) == b"DBN" -def format_dataframe( - df: pd.DataFrame, - schema: Schema, - pretty_px: bool, - pretty_ts: bool, -) -> pd.DataFrame: - struct = SCHEMA_STRUCT_MAP[schema] - - if schema == Schema.DEFINITION: - for column, type_max in DEFINITION_TYPE_MAX_MAP.items(): - if column in df.columns: - df[column] = df[column].where(df[column] != type_max, np.nan) - - if pretty_px: - for px_field in struct._price_fields: - df[px_field] = df[px_field].replace(INT64_NULL, np.nan) / FIXED_PRICE_SCALE - - if pretty_ts: - for ts_field in struct._timestamp_fields: - df[ts_field] = pd.to_datetime(df[ts_field], errors="coerce", utc=True) - - for column, dtype in SCHEMA_DTYPES_MAP[schema]: - if dtype.startswith("S") and column not in struct._hidden_fields: - df[column] = df[column].str.decode("utf-8") - - index_column = "ts_event" if schema.value.startswith("ohlcv") else "ts_recv" - df.set_index(index_column, inplace=True) - - return df - - class DataSource(abc.ABC): """ Abstract base class for backing DBNStore instances with data. @@ -791,7 +762,7 @@ def to_csv( path: Path | str, pretty_px: bool = True, pretty_ts: bool = True, - map_symbols: bool | None = None, + map_symbols: bool = True, schema: Schema | str | None = None, ) -> None: """ @@ -826,8 +797,12 @@ def to_csv( Requires all the data to be brought up into memory to then be written. """ + price_type: Literal["fixed", "float"] = "fixed" + if pretty_px is True: + price_type = "float" + df_iter = self.to_df( - pretty_px=pretty_px, + price_type=price_type, pretty_ts=pretty_ts, map_symbols=map_symbols, schema=schema, @@ -844,7 +819,8 @@ def to_csv( @overload def to_df( self, - pretty_px: bool = ..., + pretty_px: bool | None = ..., + price_type: Literal["fixed", "float", "decimal"] = ..., pretty_ts: bool = ..., map_symbols: bool = ..., schema: Schema | str | None = ..., @@ -855,7 +831,8 @@ def to_df( @overload def to_df( self, - pretty_px: bool = ..., + pretty_px: bool | None = ..., + price_type: Literal["fixed", "float", "decimal"] = ..., pretty_ts: bool = ..., map_symbols: bool = ..., schema: Schema | str | None = ..., @@ -865,7 +842,8 @@ def to_df( def to_df( self, - pretty_px: bool = True, + pretty_px: bool | None = None, + price_type: Literal["fixed", "float", "decimal"] = "float", pretty_ts: bool = True, map_symbols: bool = True, schema: Schema | str | None = None, @@ -877,9 +855,15 @@ def to_df( Parameters ---------- pretty_px : bool, default True + This parameter is deprecated and will be removed in a future release. If all price columns should be converted from `int` to `float` at the correct scale (using the fixed-precision scalar 1e-9). Null prices are replaced with NaN. + price_type : str, default "float" + The price type to use for price fields. + If "fixed", prices will have a type of `int` in fixed decimal format; each unit representing 1e-9 or 0.000000001. + If "float", prices will have a type of `float`. + If "decimal", prices will be instances of `decimal.Decimal`. pretty_ts : bool, default True If all timestamp columns should be converted from UNIX nanosecond `int` to tz-aware UTC `pd.Timestamp`. @@ -908,6 +892,20 @@ def to_df( If the schema for the array cannot be determined. """ + if pretty_px is True: + warnings.warn( + 'The argument `pretty_px` is deprecated and will be removed in a future release; `price_type="float"` can be used instead.', + DeprecationWarning, + stacklevel=2, + ) + elif pretty_px is False: + price_type = "fixed" + warnings.warn( + 'The argument `pretty_px` is deprecated and will be removed in a future release; `price_type="fixed"` can be used instead.', + DeprecationWarning, + stacklevel=2, + ) + schema = validate_maybe_enum(schema, Schema, "schema") if schema is None: if self.schema is None: @@ -927,7 +925,7 @@ def to_df( schema=schema, count=count, instrument_map=self._instrument_map, - pretty_px=pretty_px, + price_type=price_type, pretty_ts=pretty_ts, map_symbols=map_symbols, ) @@ -966,7 +964,7 @@ def to_json( path: Path | str, pretty_px: bool = True, pretty_ts: bool = True, - map_symbols: bool | None = None, + map_symbols: bool = True, schema: Schema | str | None = None, ) -> None: """ @@ -1000,8 +998,12 @@ def to_json( Requires all the data to be brought up into memory to then be written. """ + price_type: Literal["fixed", "float"] = "fixed" + if pretty_px is True: + price_type = "float" + df_iter = self.to_df( - pretty_px=pretty_px, + price_type=price_type, pretty_ts=pretty_ts, map_symbols=map_symbols, schema=schema, @@ -1126,37 +1128,91 @@ def __init__( count: int | None, schema: Schema, instrument_map: InstrumentMap, - pretty_px: bool = True, + price_type: Literal["fixed", "float", "decimal"] = "float", pretty_ts: bool = True, map_symbols: bool = True, ): self._records = records self._schema = schema self._count = count - self._pretty_px = pretty_px + self._price_type = price_type self._pretty_ts = pretty_ts self._map_symbols = map_symbols self._instrument_map = instrument_map + self._struct = SCHEMA_STRUCT_MAP[schema] def __iter__(self) -> DataFrameIterator: return self def __next__(self) -> pd.DataFrame: - df = format_dataframe( - pd.DataFrame( - next(self._records), - columns=SCHEMA_COLUMNS[self._schema], - ), - schema=self._schema, - pretty_px=self._pretty_px, - pretty_ts=self._pretty_ts, + df = pd.DataFrame( + next(self._records), + columns=SCHEMA_COLUMNS[self._schema], ) + if self._schema == Schema.DEFINITION: + self._format_definition_fields(df) + + self._format_hidden_fields(df) + + self._format_px(df, self._price_type) + + if self._pretty_ts: + self._format_pretty_ts(df) + + self._format_set_index(df) + if self._map_symbols: - df_index = df.index if self._pretty_ts else pd.to_datetime(df.index, utc=True) - dates = [ts.date() for ts in df_index] - df["symbol"] = [ - self._instrument_map.resolve(inst, dates[i]) for i, inst in enumerate(df["instrument_id"]) - ] + self._format_map_symbols(df) return df + + def _format_definition_fields(self, df: pd.DataFrame) -> None: + for column, type_max in DEFINITION_TYPE_MAX_MAP.items(): + if column in df.columns: + df[column] = df[column].where(df[column] != type_max, np.nan) + + def _format_hidden_fields(self, df: pd.DataFrame) -> None: + for column, dtype in SCHEMA_DTYPES_MAP[self._schema]: + hidden_fields = self._struct._hidden_fields + if dtype.startswith("S") and column not in hidden_fields: + df[column] = df[column].str.decode("utf-8") + + def _format_map_symbols(self, df: pd.DataFrame) -> None: + df_index = df.index if self._pretty_ts else pd.to_datetime(df.index, utc=True) + dates = [ts.date() for ts in df_index] + df["symbol"] = [ + self._instrument_map.resolve(inst, dates[i]) + for i, inst in enumerate(df["instrument_id"]) + ] + + def _format_px( + self, + df: pd.DataFrame, + price_type: Literal["fixed", "float", "decimal"], + ) -> None: + px_fields = self._struct._price_fields + + if price_type == "decimal": + for field in px_fields: + df[field] = ( + df[field].replace(INT64_NULL, np.nan).apply(decimal.Decimal) + / FIXED_PRICE_SCALE + ) + elif price_type == "float": + for field in px_fields: + df[field] = df[field].replace(INT64_NULL, np.nan) / FIXED_PRICE_SCALE + else: + return # do nothing + + def _format_pretty_ts(self, df: pd.DataFrame) -> None: + for field in self._struct._timestamp_fields: + df[field] = df[field].apply( + partial(pd.to_datetime, utc=True, errors="coerce"), + ) + + def _format_set_index(self, df: pd.DataFrame) -> None: + index_column = ( + "ts_event" if self._schema.value.startswith("ohlcv") else "ts_recv" + ) + df.set_index(index_column, inplace=True) diff --git a/tests/test_historical_bento.py b/tests/test_historical_bento.py index 9ccd3f1..c017673 100644 --- a/tests/test_historical_bento.py +++ b/tests/test_historical_bento.py @@ -2,9 +2,10 @@ import collections import datetime as dt +import decimal from io import BytesIO from pathlib import Path -from typing import Any, Callable +from typing import Any, Callable, Literal from unittest.mock import MagicMock import databento @@ -105,6 +106,7 @@ def test_sources_metadata_returns_expected_json_as_dict( ], } + def test_dbnstore_given_initial_nbytes_returns_expected_metadata( test_data: Callable[[Schema], bytes], ) -> None: @@ -390,26 +392,35 @@ def test_to_df_with_pretty_ts_converts_timestamps_as_expected( ], ], ) -def test_to_df_with_pretty_px_with_various_schemas_converts_prices_as_expected( +@pytest.mark.parametrize( + "price_type, expected_type", + [ + ("fixed", np.integer), + ("decimal", decimal.Decimal), + ("float", np.floating), + ], +) +def test_to_df_with_price_type_with_various_schemas_converts_prices_as_expected( test_data: Callable[[Schema], bytes], schema: Schema, columns: list[str], + price_type: Literal["float", "decimal"], + expected_type: type, ) -> None: # Arrange stub_data = test_data(schema) data = DBNStore.from_bytes(data=stub_data) # Act - df = data.to_df(pretty_px=True) + df = data.to_df(price_type=price_type) # Assert assert len(df) == 4 for column in columns: - assert isinstance(df[column].iloc(0)[1], float) - # TODO(cs): Check float values once display factor fixed + assert isinstance(df[column].iloc(0)[1], expected_type) -def test_to_df_with_pretty_px_handles_null( +def test_to_df_with_price_type_handles_null( test_data: Callable[[Schema], bytes], ) -> None: # Arrange @@ -417,8 +428,8 @@ def test_to_df_with_pretty_px_handles_null( data = DBNStore.from_bytes(data=stub_data) # Act - df_plain = data.to_df(pretty_px=False) - df_pretty = data.to_df(pretty_px=True) + df_plain = data.to_df(price_type="fixed") + df_pretty = data.to_df(price_type="float") # Assert assert all(df_plain["strike_price"] == 9223372036854775807) @@ -906,6 +917,7 @@ def test_dbnstore_buffer_long( with pytest.raises(BentoError): dbnstore.to_json(tmp_path / "test.json") + def test_dbnstore_buffer_rewind( test_data: Callable[[Schema], bytes], tmp_path: Path, @@ -925,6 +937,7 @@ def test_dbnstore_buffer_rewind( assert len(dbnstore.to_df()) == 4 + @pytest.mark.parametrize( "schema", [pytest.param(schema, id=str(schema)) for schema in Schema.variants()], @@ -1142,6 +1155,7 @@ def test_dbnstore_to_df_with_count_empty( # Assert assert next(df_iter).empty + def test_dbnstore_to_df_cannot_map_symbols_default_to_false( test_data: Callable[[Schema], bytes], monkeypatch: pytest.MonkeyPatch, From 686ce41ab55d126d364336370ed386132c33bf5b Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Mon, 16 Oct 2023 08:46:02 -0700 Subject: [PATCH 3/8] MOD: Refresh examples and notebook --- examples/historical_timeseries_to_df.py | 2 +- notebooks/quickstart.ipynb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/historical_timeseries_to_df.py b/examples/historical_timeseries_to_df.py index b9caa74..faf8a6d 100644 --- a/examples/historical_timeseries_to_df.py +++ b/examples/historical_timeseries_to_df.py @@ -18,4 +18,4 @@ ) # Convert to pandas dataframe - pprint(data.to_df(map_symbols=True)) + pprint(data.to_df()) diff --git a/notebooks/quickstart.ipynb b/notebooks/quickstart.ipynb index c8d8b0c..54c4d7f 100644 --- a/notebooks/quickstart.ipynb +++ b/notebooks/quickstart.ipynb @@ -1749,7 +1749,7 @@ "\n", "pd.set_option('display.max_columns', None)\n", "\n", - "df = data.to_df(pretty_px=True, pretty_ts=True)\n", + "df = data.to_df()\n", "df.head(20)" ] }, From c1749aac7bd704c2d12883a1325fa66f25d62b33 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Fri, 13 Oct 2023 14:16:05 -0700 Subject: [PATCH 4/8] MOD: Use Transcoder for DBNStore encoders --- CHANGELOG.md | 4 + README.md | 2 +- databento/common/dbnstore.py | 127 +++++++++++++++++++++--------- databento/common/iterator.py | 36 +++++++++ databento/live/protocol.py | 34 +-------- pyproject.toml | 2 +- tests/test_common_iterator.py | 33 ++++++++ tests/test_historical_bento.py | 136 +++++++++------------------------ 8 files changed, 202 insertions(+), 172 deletions(-) create mode 100644 databento/common/iterator.py create mode 100644 tests/test_common_iterator.py diff --git a/CHANGELOG.md b/CHANGELOG.md index f37a8c9..bbbb3fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ #### Enhancements - Added `price_type` argument for `DBNStore.to_df` to specify if price fields should be `fixed`, `float` or `decimal.Decimal` +- Upgraded `databento-dbn` to 0.12.0 + +#### Breaking Changes +- Changed outputs of `DBNStore.to_csv` and `DBNStore.to_json` to match the encoding formats from the Databento API #### Deprecations - Deprecated `pretty_px` argument for `DBNStore.to_df` to be removed in a future release; the default `pretty_px=True` is now equivalent to `price_type="float"` and `pretty_px=False` is now equivalent to `price_type="fixed"` diff --git a/README.md b/README.md index 94f185b..df496a3 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ The library is fully compatible with the latest distribution of Anaconda 3.8 and The minimum dependencies as found in the `pyproject.toml` are also listed below: - python = "^3.8" - aiohttp = "^3.8.3" -- databento-dbn = "0.11.1" +- databento-dbn = "0.12.0" - numpy= ">=1.23.5" - pandas = ">=1.5.3" - requests = ">=2.24.0" diff --git a/databento/common/dbnstore.py b/databento/common/dbnstore.py index 6eba231..eba994b 100644 --- a/databento/common/dbnstore.py +++ b/databento/common/dbnstore.py @@ -11,7 +11,7 @@ from io import BytesIO from os import PathLike from pathlib import Path -from typing import IO, TYPE_CHECKING, Any, Callable, Literal, overload +from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Literal, overload import databento_dbn import numpy as np @@ -20,19 +20,24 @@ from databento_dbn import FIXED_PRICE_SCALE from databento_dbn import Compression from databento_dbn import DBNDecoder +from databento_dbn import Encoding from databento_dbn import ErrorMsg from databento_dbn import Metadata from databento_dbn import Schema from databento_dbn import SType from databento_dbn import SymbolMappingMsg from databento_dbn import SystemMsg +from databento_dbn import Transcoder from databento.common.data import DEFINITION_TYPE_MAX_MAP from databento.common.data import SCHEMA_COLUMNS from databento.common.data import SCHEMA_DTYPES_MAP from databento.common.data import SCHEMA_STRUCT_MAP from databento.common.error import BentoError +from databento.common.iterator import chunk from databento.common.symbology import InstrumentMap +from databento.common.symbology import SymbolInterval +from databento.common.validation import validate_enum from databento.common.validation import validate_file_write_path from databento.common.validation import validate_maybe_enum from databento.live import DBNRecord @@ -763,6 +768,7 @@ def to_csv( pretty_px: bool = True, pretty_ts: bool = True, map_symbols: bool = True, + compression: Compression | str = Compression.NONE, schema: Schema | str | None = None, ) -> None: """ @@ -783,6 +789,8 @@ def to_csv( If symbology mappings from the metadata should be used to create a 'symbol' column, mapping the instrument ID to its requested symbol for every record. + compression : Compression or str, default `Compression.NONE` + The output compression for writing. schema : Schema or str, optional The schema for the csv. This is only required when reading a DBN stream with mixed record types. @@ -797,24 +805,33 @@ def to_csv( Requires all the data to be brought up into memory to then be written. """ - price_type: Literal["fixed", "float"] = "fixed" - if pretty_px is True: - price_type = "float" + compression = validate_enum(compression, Compression, "compression") + schema = validate_maybe_enum(schema, Schema, "schema") + if schema is None: + if self.schema is None: + raise ValueError("a schema must be specified for mixed DBN data") + schema = self.schema - df_iter = self.to_df( - price_type=price_type, - pretty_ts=pretty_ts, - map_symbols=map_symbols, - schema=schema, - count=2**16, - ) + record_type = SCHEMA_STRUCT_MAP[schema] + record_iter = filter(lambda r: isinstance(r, record_type), self) - with open(path, "x", newline="") as csv_file: - for i, frame in enumerate(df_iter): - frame.to_csv( - csv_file, - header=(i == 0), - ) + if map_symbols: + self._instrument_map.insert_metadata(self.metadata) + symbol_map = self._instrument_map._data + else: + symbol_map = None + + with open(path, "xb") as output: + self._transcode( + output=output, + records_iter=record_iter, + encoding=Encoding.CSV, + pretty_px=pretty_px, + pretty_ts=pretty_ts, + symbol_map=symbol_map, + compression=compression, + schema=schema, + ) @overload def to_df( @@ -965,6 +982,7 @@ def to_json( pretty_px: bool = True, pretty_ts: bool = True, map_symbols: bool = True, + compression: Compression | str = Compression.NONE, schema: Schema | str | None = None, ) -> None: """ @@ -984,6 +1002,8 @@ def to_json( If symbology mappings from the metadata should be used to create a 'symbol' column, mapping the instrument ID to its requested symbol for every record. + compression : Compression or str, default `Compression.NONE` + The output compression for writing. schema : Schema or str, optional The schema for the json. This is only required when reading a DBN stream with mixed record types. @@ -998,27 +1018,33 @@ def to_json( Requires all the data to be brought up into memory to then be written. """ - price_type: Literal["fixed", "float"] = "fixed" - if pretty_px is True: - price_type = "float" + compression = validate_enum(compression, Compression, "compression") + schema = validate_maybe_enum(schema, Schema, "schema") + if schema is None: + if self.schema is None: + raise ValueError("a schema must be specified for mixed DBN data") + schema = self.schema - df_iter = self.to_df( - price_type=price_type, - pretty_ts=pretty_ts, - map_symbols=map_symbols, - schema=schema, - count=2**16, - ) + record_type = SCHEMA_STRUCT_MAP[schema] + record_iter = filter(lambda r: isinstance(r, record_type), self) - with open(path, "x") as json_path: - for frame in df_iter: - frame.reset_index(inplace=True) - frame.to_json( - json_path, - orient="records", - date_unit="ns", - lines=True, - ) + if map_symbols: + self._instrument_map.insert_metadata(self.metadata) + symbol_map = self._instrument_map._data + else: + symbol_map = None + + with open(path, "xb") as output: + self._transcode( + output=output, + records_iter=record_iter, + encoding=Encoding.JSON, + pretty_px=pretty_px, + pretty_ts=pretty_ts, + symbol_map=symbol_map, + compression=compression, + schema=schema, + ) @overload def to_ndarray( # type: ignore [misc] @@ -1085,6 +1111,35 @@ def to_ndarray( return ndarray_iter + def _transcode( + self, + output: BinaryIO, + records_iter: Iterator[DBNRecord], + encoding: Encoding, + pretty_px: bool, + pretty_ts: bool, + symbol_map: dict[int, list[SymbolInterval]] | None, + compression: Compression, + schema: Schema, + ) -> None: + transcoder = Transcoder( + file=output, + encoding=encoding, + compression=compression, + pretty_px=pretty_px, + pretty_ts=pretty_ts, + has_metadata=True, + input_compression=Compression.NONE, + symbol_map=symbol_map, # type: ignore [arg-type] + schema=schema, + ) + + transcoder.write(bytes(self.metadata)) + for records in chunk(records_iter, 2**16): + for record in records: + transcoder.write(bytes(record)) + transcoder.flush() + class NDArrayIterator: def __init__( diff --git a/databento/common/iterator.py b/databento/common/iterator.py new file mode 100644 index 0000000..36c3563 --- /dev/null +++ b/databento/common/iterator.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import itertools +from collections.abc import Iterable +from typing import TypeVar + + +_C = TypeVar("_C") + + +def chunk(iterable: Iterable[_C], size: int) -> Iterable[tuple[_C, ...]]: + """ + Break an iterable into chunks with a length of at most `size`. + + Parameters + ---------- + iterable: Iterable[_C] + The iterable to break up. + size : int + The maximum size of each chunk. + + Returns + ------- + Iterable[_C] + + Raises + ------ + ValueError + If `size` is less than 1. + + """ + if size < 1: + raise ValueError("size must be at least 1") + + it = iter(iterable) + return iter(lambda: tuple(itertools.islice(it, size)), ()) diff --git a/databento/live/protocol.py b/databento/live/protocol.py index fdeb8d5..e71a415 100644 --- a/databento/live/protocol.py +++ b/databento/live/protocol.py @@ -1,12 +1,10 @@ from __future__ import annotations import asyncio -import itertools import logging from collections.abc import Iterable from functools import singledispatchmethod from numbers import Number -from typing import TypeVar import databento_dbn from databento_dbn import Schema @@ -14,6 +12,7 @@ from databento.common import cram from databento.common.error import BentoError +from databento.common.iterator import chunk from databento.common.parsing import optional_datetime_to_unix_nanoseconds from databento.common.parsing import optional_symbols_list_to_list from databento.common.publishers import Dataset @@ -36,37 +35,6 @@ logger = logging.getLogger(__name__) -_C = TypeVar("_C") - - -def chunk(iterable: Iterable[_C], size: int) -> Iterable[tuple[_C, ...]]: - """ - Break an iterable into chunks with a length of at most `size`. - - Parameters - ---------- - iterable: Iterable[_C] - The iterable to break up. - size : int - The maximum size of each chunk. - - Returns - ------- - Iterable[_C] - - Raises - ------ - ValueError - If `size` is less than 1. - - """ - if size < 1: - raise ValueError("size must be at least 1") - - it = iter(iterable) - return iter(lambda: tuple(itertools.islice(it, size)), ()) - - class DatabentoLiveProtocol(asyncio.BufferedProtocol): """ A BufferedProtocol implementation for the Databento live subscription diff --git a/pyproject.toml b/pyproject.toml index b57df3b..d975959 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ repository = "https://github.com/databento/databento-python" [tool.poetry.dependencies] python = "^3.8" aiohttp = "^3.8.3" -databento-dbn = "0.11.1" +databento-dbn = "0.12.0" numpy = ">=1.23.5" pandas = ">=1.5.3" requests = ">=2.24.0" diff --git a/tests/test_common_iterator.py b/tests/test_common_iterator.py new file mode 100644 index 0000000..80ffa27 --- /dev/null +++ b/tests/test_common_iterator.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from collections.abc import Iterable + +import pytest +from databento.common import iterator + + +@pytest.mark.parametrize( + "things, size, expected", + [ + ( + "abcdefg", + 2, + [ + ("a", "b"), + ("c", "d"), + ("e", "f"), + ("g",), + ], + ), + ], +) +def test_chunk( + things: Iterable[object], + size: int, + expected: Iterable[tuple[object]], +) -> None: + """ + Test that an iterable is chunked property. + """ + chunks = [chunk for chunk in iterator.chunk(things, size)] + assert chunks == expected diff --git a/tests/test_historical_bento.py b/tests/test_historical_bento.py index c017673..ed16863 100644 --- a/tests/test_historical_bento.py +++ b/tests/test_historical_bento.py @@ -488,13 +488,11 @@ def test_mbo_to_csv_writes_expected_file_to_disk( # Assert written = path.read_text() expected = ( - "ts_recv,ts_event,rtype,publisher_id,instrument_id,action,side,price,size,channel_id," - "order_id,flags,ts_in_delta,sequence\n1609160400000704060,1609160400000429831,160," - "1,5482,C,A,3722750000000,1,0,647784973705,128,22993,1170352\n1609160400000711344," - "1609160400000431665,160,1,5482,C,A,3723000000000,1,0,647784973631,128,19621,1170" - "353\n1609160400000728600,1609160400000433051,160,1,5482,C,A,3723250000000,1,0,647" - "784973427,128,16979,1170354\n1609160400000740248,1609160400000434353,160,1,5482,C" - ",A,3723500000000,1,0,647784973094,128,17883,1170355\n" + "ts_recv,ts_event,rtype,publisher_id,instrument_id,action,side,price,size,channel_id,order_id,flags,ts_in_delta,sequence,symbol\n" + "1609160400000704060,1609160400000429831,160,1,5482,C,A,3722750000000,1,0,647784973705,128,22993,1170352,ESH1\n" + "1609160400000711344,1609160400000431665,160,1,5482,C,A,3723000000000,1,0,647784973631,128,19621,1170353,ESH1\n" + "1609160400000728600,1609160400000433051,160,1,5482,C,A,3723250000000,1,0,647784973427,128,16979,1170354,ESH1\n" + "1609160400000740248,1609160400000434353,160,1,5482,C,A,3723500000000,1,0,647784973094,128,17883,1170355,ESH1\n" ) assert written == expected @@ -520,15 +518,11 @@ def test_mbp_1_to_csv_with_no_options_writes_expected_file_to_disk( # Assert written = path.read_text() expected = ( - "ts_recv,ts_event,rtype,publisher_id,instrument_id,action,side,depth,price,size,flags" - ",ts_in_delta,sequence,bid_px_00,ask_px_00,bid_sz_00,ask_sz_00,bid_ct_00,ask_ct_0" - "0\n1609160400006136329,1609160400006001487,1,1,5482,A,A,0,3720500000000,1,128,172" - "14,1170362,3720250000000,3720500000000,24,11,15,9\n1609160400006246513,1609160400" - "006146661,1,1,5482,A,A,0,3720500000000,1,128,18858,1170364,3720250000000,37205000000" - "00,24,12,15,10\n1609160400007159323,1609160400007044577,1,1,5482,A,B,0,3720250000" - "000,2,128,18115,1170365,3720250000000,3720500000000,26,12,16,10\n1609160400007260" - "967,1609160400007169135,1,1,5482,C,A,0,3720500000000,1,128,17361,1170366,37202500000" - "00,3720500000000,26,11,16,9\n" + "ts_recv,ts_event,rtype,publisher_id,instrument_id,action,side,depth,price,size,flags,ts_in_delta,sequence,bid_px_00,ask_px_00,bid_sz_00,ask_sz_00,bid_ct_00,ask_ct_00,symbol\n" + "1609160400006136329,1609160400006001487,1,1,5482,A,A,0,3720500000000,1,128,17214,1170362,3720250000000,3720500000000,24,11,15,9,ESH1\n" + "1609160400006246513,1609160400006146661,1,1,5482,A,A,0,3720500000000,1,128,18858,1170364,3720250000000,3720500000000,24,12,15,10,ESH1\n" + "1609160400007159323,1609160400007044577,1,1,5482,A,B,0,3720250000000,2,128,18115,1170365,3720250000000,3720500000000,26,12,16,10,ESH1\n" + "1609160400007260967,1609160400007169135,1,1,5482,C,A,0,3720500000000,1,128,17361,1170366,3720250000000,3720500000000,26,11,16,9,ESH1\n" ) assert written == expected @@ -554,17 +548,11 @@ def test_mbp_1_to_csv_with_all_options_writes_expected_file_to_disk( # Assert written = path.read_text() expected = ( - "ts_recv,ts_event,rtype,publisher_id,instrument_id,action,side,depth,price,size,flags" - ",ts_in_delta,sequence,bid_px_00,ask_px_00,bid_sz_00,ask_sz_00,bid_ct_00,ask_ct_00,sy" - "mbol\n2020-12-28 13:00:00.006136329+00:00,2020-12-28 13:00:00.006001487+00:00,1,1" - ",5482,A,A,0,3720.5,1,128,17214,1170362,3720.25,3720.5" - ",24,11,15,9,ESH1\n2020-12-28 13:00:00.006246513+00:00,2020-12-28 13:00:00.006" - "146661+00:00,1,1,5482,A,A,0,3720.5,1,128,18858,1170364,3720.25" - ",3720.5,24,12,15,10,ESH1\n2020-12-28 13:00:00.007159323+00:00,2020-1" - "2-28 13:00:00.007044577+00:00,1,1,5482,A,B,0,3720.25,2,128,18115,1170365," - "3720.25,3720.5,26,12,16,10,ESH1\n2020-12-28 13:00:00.00726" - "0967+00:00,2020-12-28 13:00:00.007169135+00:00,1,1,5482,C,A,0,3720.5,1,1" - "28,17361,1170366,3720.25,3720.5,26,11,16,9,ESH1\n" + "ts_recv,ts_event,rtype,publisher_id,instrument_id,action,side,depth,price,size,flags,ts_in_delta,sequence,bid_px_00,ask_px_00,bid_sz_00,ask_sz_00,bid_ct_00,ask_ct_00,symbol\n" + "2020-12-28T13:00:00.006136329Z,2020-12-28T13:00:00.006001487Z,1,1,5482,A,A,0,3720.500000000,1,128,17214,1170362,3720.250000000,3720.500000000,24,11,15,9,ESH1\n" + "2020-12-28T13:00:00.006246513Z,2020-12-28T13:00:00.006146661Z,1,1,5482,A,A,0,3720.500000000,1,128,18858,1170364,3720.250000000,3720.500000000,24,12,15,10,ESH1\n" + "2020-12-28T13:00:00.007159323Z,2020-12-28T13:00:00.007044577Z,1,1,5482,A,B,0,3720.250000000,2,128,18115,1170365,3720.250000000,3720.500000000,26,12,16,10,ESH1\n" + "2020-12-28T13:00:00.007260967Z,2020-12-28T13:00:00.007169135Z,1,1,5482,C,A,0,3720.500000000,1,128,17361,1170366,3720.250000000,3720.500000000,26,11,16,9,ESH1\n" ) assert written == expected @@ -589,24 +577,11 @@ def test_mbo_to_json_with_no_options_writes_expected_file_to_disk( # Assert written = path.read_text() - assert written.strip() == ( - '{"ts_recv":1609160400000704060,"ts_event":1609160400000429831,"rtype":160,' - '"publisher_id":1,"instrument_id":5482,"action":"C","side":"A","price":3722750000000,' - '"size":1,"channel_id":0,"order_id":647784973705,"flags":128,"ts_in_delta":22993,"sequence":1170352}' - "\n" - '{"ts_recv":1609160400000711344,"ts_event":1609160400000431665,"rtype":160,"publisher_id":1,"instrument_id":5482,' - '"action":"C","side":"A","price":3723000000000,"size":1,"channel_id":0,"order_id":647784973631,' - '"flags":128,"ts_in_delta":19621,"sequence":1170353}' - "\n" - '{"ts_recv":1609160400000728600,"ts_event":1609160400000433051,"rtype":' - '160,"publisher_id":1,"instrument_id":5482,"action":"C","side":"A","price":3723250000' - '000,"size":1,"channel_id":0,"order_id":647784973427,"flags":128,"ts_in_delta":16979,' - '"sequence":1170354}' - "\n" - '{"ts_recv":1609160400000740248,"ts_event":1609160400000434353,"rtype":160,"publisher_id":1' - ',"instrument_id":5482,"action":"C","side":"A","price":3723500000000,"size":1,"channe' - 'l_id":0,"order_id":647784973094,"flags":128,"ts_in_delta":17883,"sequence":11703' - "55}" + assert written == ( + '{"ts_recv":"1609160400000704060","hd":{"ts_event":"1609160400000429831","rtype":160,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","price":"3722750000000","size":1,"channel_id":0,"order_id":"647784973705","flags":128,"ts_in_delta":22993,"sequence":1170352,"symbol":"ESH1"}\n' + '{"ts_recv":"1609160400000711344","hd":{"ts_event":"1609160400000431665","rtype":160,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","price":"3723000000000","size":1,"channel_id":0,"order_id":"647784973631","flags":128,"ts_in_delta":19621,"sequence":1170353,"symbol":"ESH1"}\n' + '{"ts_recv":"1609160400000728600","hd":{"ts_event":"1609160400000433051","rtype":160,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","price":"3723250000000","size":1,"channel_id":0,"order_id":"647784973427","flags":128,"ts_in_delta":16979,"sequence":1170354,"symbol":"ESH1"}\n' + '{"ts_recv":"1609160400000740248","hd":{"ts_event":"1609160400000434353","rtype":160,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","price":"3723500000000","size":1,"channel_id":0,"order_id":"647784973094","flags":128,"ts_in_delta":17883,"sequence":1170355,"symbol":"ESH1"}\n' ) @@ -629,22 +604,11 @@ def test_mbo_to_json_with_all_options_writes_expected_file_to_disk( # Assert written = path.read_text() - assert written.strip() == ( - '{"ts_recv":1609160400000704000,"ts_event":1609160400000429000,"rtype":160,"publisher_id":1,"instrument_id":5482,' - '"action":"C","side":"A","price":3722.75,"size":1,"channel_id":0,"order_id":647784973705,"flags":128,' - '"ts_in_delta":22993,"sequence":1170352,"symbol":"ESH1"}' - "\n" - '{"ts_recv":1609160400000711000,"ts_event":1609160400000431000,"rtype":160,"publisher_id":1,"instrument_id":5482,' - '"action":"C","side":"A","price":3723.0,"size":1,"channel_id":0,"order_id":647784973631,"flags":128,' - '"ts_in_delta":19621,"sequence":1170353,"symbol":"ESH1"}' - "\n" - '{"ts_recv":1609160400000728000,"ts_event":1609160400000433000,"rtype":160,"publisher_id":1,"instrument_id":5482,' - '"action":"C","side":"A","price":3723.25,"size":1,"channel_id":0,"order_id":647784973427,"flags":128,' - '"ts_in_delta":16979,"sequence":1170354,"symbol":"ESH1"}' - "\n" - '{"ts_recv":1609160400000740000,"ts_event":1609160400000434000,"rtype":160,"publisher_id":1,"instrument_id":5482,' - '"action":"C","side":"A","price":3723.5,"size":1,"channel_id":0,"order_id":647784973094,"flags":128,' - '"ts_in_delta":17883,"sequence":1170355,"symbol":"ESH1"}' + assert written == ( + '{"ts_recv":"2020-12-28T13:00:00.000704060Z","hd":{"ts_event":"2020-12-28T13:00:00.000429831Z","rtype":160,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","price":"3722.750000000","size":1,"channel_id":0,"order_id":"647784973705","flags":128,"ts_in_delta":22993,"sequence":1170352,"symbol":"ESH1"}\n' + '{"ts_recv":"2020-12-28T13:00:00.000711344Z","hd":{"ts_event":"2020-12-28T13:00:00.000431665Z","rtype":160,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","price":"3723.000000000","size":1,"channel_id":0,"order_id":"647784973631","flags":128,"ts_in_delta":19621,"sequence":1170353,"symbol":"ESH1"}\n' + '{"ts_recv":"2020-12-28T13:00:00.000728600Z","hd":{"ts_event":"2020-12-28T13:00:00.000433051Z","rtype":160,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","price":"3723.250000000","size":1,"channel_id":0,"order_id":"647784973427","flags":128,"ts_in_delta":16979,"sequence":1170354,"symbol":"ESH1"}\n' + '{"ts_recv":"2020-12-28T13:00:00.000740248Z","hd":{"ts_event":"2020-12-28T13:00:00.000434353Z","rtype":160,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","price":"3723.500000000","size":1,"channel_id":0,"order_id":"647784973094","flags":128,"ts_in_delta":17883,"sequence":1170355,"symbol":"ESH1"}\n' ) @@ -667,26 +631,11 @@ def test_mbp_1_to_json_with_no_options_writes_expected_file_to_disk( # Assert written = path.read_text() - assert written.strip() == ( - '{"ts_recv":1609160400006136329,"ts_event":1609160400006001487,"rtype":1,"publisher_id":1,"instrument_id":5482,"action":"A",' - '"side":"A","depth":0,"price":3720500000000,"size":1,"flags":128,"ts_in_delta":17214,"sequence":1170362,' - '"bid_px_00":3720250000000,"ask_px_00":3720500000000,"bid_sz_00":24,"ask_sz_00":11,"bid_ct_00":15,' - '"ask_ct_00":9}' - "\n" - '{"ts_recv":1609160400006246513,"ts_event":1609160400006146661,"rtype":1,"publisher_id":1,"instrument_id":5482,"action":"A",' - '"side":"A","depth":0,"price":3720500000000,"size":1,"flags":128,"ts_in_delta":18858,"sequence":1170364,' - '"bid_px_00":3720250000000,"ask_px_00":3720500000000,"bid_sz_00":24,"ask_sz_00":12,"bid_ct_00":15,' - '"ask_ct_00":10}' - "\n" - '{"ts_recv":1609160400007159323,"ts_event":1609160400007044577,"rtype":1,"publisher_id":1,"instrument_id":5482,"action":"A",' - '"side":"B","depth":0,"price":3720250000000,"size":2,"flags":128,"ts_in_delta":18115,"sequence":1170365,' - '"bid_px_00":3720250000000,"ask_px_00":3720500000000,"bid_sz_00":26,"ask_sz_00":12,"bid_ct_00":16,' - '"ask_ct_00":10}' - "\n" - '{"ts_recv":1609160400007260967,"ts_event":1609160400007169135,"rtype":1,"publisher_id":1,"instrument_id":5482,"action":"C",' - '"side":"A","depth":0,"price":3720500000000,"size":1,"flags":128,"ts_in_delta":17361,"sequence":1170366,' - '"bid_px_00":3720250000000,"ask_px_00":3720500000000,"bid_sz_00":26,"ask_sz_00":11,"bid_ct_00":16,' - '"ask_ct_00":9}' + assert written == ( + '{"ts_recv":"1609160400006136329","hd":{"ts_event":"1609160400006001487","rtype":1,"publisher_id":1,"instrument_id":5482},"action":"A","side":"A","depth":0,"price":"3720500000000","size":1,"flags":128,"ts_in_delta":17214,"sequence":1170362,"levels":[{"bid_px":"3720250000000","ask_px":"3720500000000","bid_sz":24,"ask_sz":11,"bid_ct":15,"ask_ct":9}],"symbol":"ESH1"}\n' + '{"ts_recv":"1609160400006246513","hd":{"ts_event":"1609160400006146661","rtype":1,"publisher_id":1,"instrument_id":5482},"action":"A","side":"A","depth":0,"price":"3720500000000","size":1,"flags":128,"ts_in_delta":18858,"sequence":1170364,"levels":[{"bid_px":"3720250000000","ask_px":"3720500000000","bid_sz":24,"ask_sz":12,"bid_ct":15,"ask_ct":10}],"symbol":"ESH1"}\n' + '{"ts_recv":"1609160400007159323","hd":{"ts_event":"1609160400007044577","rtype":1,"publisher_id":1,"instrument_id":5482},"action":"A","side":"B","depth":0,"price":"3720250000000","size":2,"flags":128,"ts_in_delta":18115,"sequence":1170365,"levels":[{"bid_px":"3720250000000","ask_px":"3720500000000","bid_sz":26,"ask_sz":12,"bid_ct":16,"ask_ct":10}],"symbol":"ESH1"}\n' + '{"ts_recv":"1609160400007260967","hd":{"ts_event":"1609160400007169135","rtype":1,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","depth":0,"price":"3720500000000","size":1,"flags":128,"ts_in_delta":17361,"sequence":1170366,"levels":[{"bid_px":"3720250000000","ask_px":"3720500000000","bid_sz":26,"ask_sz":11,"bid_ct":16,"ask_ct":9}],"symbol":"ESH1"}\n' ) @@ -709,26 +658,11 @@ def test_mbp_1_to_json_with_all_options_writes_expected_file_to_disk( # Assert written = path.read_text() - assert written.strip() == ( - '{"ts_recv":1609160400006136000,"ts_event":1609160400006001000,"rtype":1,"publisher_id":1,"instrument_id":5482,' - '"action":"A","side":"A","depth":0,"price":3720.5,"size":1,"flags":128,"ts_in_delta":17214,' - '"sequence":1170362,"bid_px_00":3720.25,"ask_px_00":3720.5,"bid_sz_00":24,"ask_sz_00":11,' - '"bid_ct_00":15,"ask_ct_00":9,"symbol":"ESH1"}' - "\n" - '{"ts_recv":1609160400006246000,"ts_event":1609160400006146000,"rtype":1,"publisher_id":1,"instrument_id":5482,' - '"action":"A","side":"A","depth":0,"price":3720.5,"size":1,"flags":128,"ts_in_delta":18858,' - '"sequence":1170364,"bid_px_00":3720.25,"ask_px_00":3720.5,"bid_sz_00":24,"ask_sz_00":12,' - '"bid_ct_00":15,"ask_ct_00":10,"symbol":"ESH1"}' - "\n" - '{"ts_recv":1609160400007159000,"ts_event":1609160400007044000,"rtype":1,"publisher_id":1,"instrument_id":5482,' - '"action":"A","side":"B","depth":0,"price":3720.25,"size":2,"flags":128,"ts_in_delta":18115,' - '"sequence":1170365,"bid_px_00":3720.25,"ask_px_00":3720.5,"bid_sz_00":26,"ask_sz_00":12,' - '"bid_ct_00":16,"ask_ct_00":10,"symbol":"ESH1"}' - "\n" - '{"ts_recv":1609160400007260000,"ts_event":1609160400007169000,"rtype":1,"publisher_id":1,"instrument_id":5482,' - '"action":"C","side":"A","depth":0,"price":3720.5,"size":1,"flags":128,"ts_in_delta":17361,' - '"sequence":1170366,"bid_px_00":3720.25,"ask_px_00":3720.5,"bid_sz_00":26,"ask_sz_00":11,' - '"bid_ct_00":16,"ask_ct_00":9,"symbol":"ESH1"}' + assert written == ( + '{"ts_recv":"2020-12-28T13:00:00.006136329Z","hd":{"ts_event":"2020-12-28T13:00:00.006001487Z","rtype":1,"publisher_id":1,"instrument_id":5482},"action":"A","side":"A","depth":0,"price":"3720.500000000","size":1,"flags":128,"ts_in_delta":17214,"sequence":1170362,"levels":[{"bid_px":"3720.250000000","ask_px":"3720.500000000","bid_sz":24,"ask_sz":11,"bid_ct":15,"ask_ct":9}],"symbol":"ESH1"}\n' + '{"ts_recv":"2020-12-28T13:00:00.006246513Z","hd":{"ts_event":"2020-12-28T13:00:00.006146661Z","rtype":1,"publisher_id":1,"instrument_id":5482},"action":"A","side":"A","depth":0,"price":"3720.500000000","size":1,"flags":128,"ts_in_delta":18858,"sequence":1170364,"levels":[{"bid_px":"3720.250000000","ask_px":"3720.500000000","bid_sz":24,"ask_sz":12,"bid_ct":15,"ask_ct":10}],"symbol":"ESH1"}\n' + '{"ts_recv":"2020-12-28T13:00:00.007159323Z","hd":{"ts_event":"2020-12-28T13:00:00.007044577Z","rtype":1,"publisher_id":1,"instrument_id":5482},"action":"A","side":"B","depth":0,"price":"3720.250000000","size":2,"flags":128,"ts_in_delta":18115,"sequence":1170365,"levels":[{"bid_px":"3720.250000000","ask_px":"3720.500000000","bid_sz":26,"ask_sz":12,"bid_ct":16,"ask_ct":10}],"symbol":"ESH1"}\n' + '{"ts_recv":"2020-12-28T13:00:00.007260967Z","hd":{"ts_event":"2020-12-28T13:00:00.007169135Z","rtype":1,"publisher_id":1,"instrument_id":5482},"action":"C","side":"A","depth":0,"price":"3720.500000000","size":1,"flags":128,"ts_in_delta":17361,"sequence":1170366,"levels":[{"bid_px":"3720.250000000","ask_px":"3720.500000000","bid_sz":26,"ask_sz":11,"bid_ct":16,"ask_ct":9}],"symbol":"ESH1"}\n' ) From 34c94caf70bdfae2db88b661e7f5a7e34ac62e98 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Wed, 11 Oct 2023 12:28:51 -0700 Subject: [PATCH 5/8] MOD: Add client key for Live auth request --- databento/common/system.py | 7 +++++++ databento/historical/http.py | 8 ++------ databento/live/gateway.py | 9 +++++---- tests/test_live_gateway_messages.py | 7 +++++-- 4 files changed, 19 insertions(+), 12 deletions(-) create mode 100644 databento/common/system.py diff --git a/databento/common/system.py b/databento/common/system.py new file mode 100644 index 0000000..e447305 --- /dev/null +++ b/databento/common/system.py @@ -0,0 +1,7 @@ +import sys + +from databento.version import __version__ + + +PYTHON_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}" +USER_AGENT = f"Databento/{__version__} Python/{PYTHON_VERSION}" diff --git a/databento/historical/http.py b/databento/historical/http.py index dc946b6..7293d50 100644 --- a/databento/historical/http.py +++ b/databento/historical/http.py @@ -1,7 +1,6 @@ from __future__ import annotations import json -import sys import warnings from collections.abc import Iterable from io import BytesIO @@ -21,7 +20,7 @@ from databento.common.error import BentoDeprecationWarning from databento.common.error import BentoServerError from databento.common.error import BentoWarning -from databento.version import __version__ +from databento.common.system import USER_AGENT _32KIB = 1024 * 32 # 32_768 @@ -36,12 +35,9 @@ class BentoHttpAPI: TIMEOUT = 100 def __init__(self, key: str, gateway: str): - python_version = f"{sys.version_info.major}.{sys.version_info.minor}" - user_agent = f"Databento/{__version__} Python/{python_version}" - self._key = key self._gateway = gateway - self._headers = {"accept": "application/json", "user-agent": user_agent} + self._headers = {"accept": "application/json", "user-agent": USER_AGENT} def _check_api_key(self) -> None: if self._key == "YOUR_API_KEY": diff --git a/databento/live/gateway.py b/databento/live/gateway.py index 5a89d24..e3e78a6 100644 --- a/databento/live/gateway.py +++ b/databento/live/gateway.py @@ -2,7 +2,6 @@ import dataclasses import logging -from functools import partial from io import BytesIO from operator import attrgetter from typing import TypeVar @@ -12,12 +11,14 @@ from databento_dbn import SType from databento.common.publishers import Dataset +from databento.common.system import USER_AGENT logger = logging.getLogger(__name__) T = TypeVar("T", bound="GatewayControl") + @dataclasses.dataclass class GatewayControl: """ @@ -42,9 +43,8 @@ def parse(cls: type[T], line: str) -> T: if not line.endswith("\n"): raise ValueError(f"`{line.strip()}` does not end with a newline") - tokens = line[:-1].split("|") # split excluding trailing new line - splitter = partial(str.split, sep="=", maxsplit=1) - data_dict = {k: v for k, v in map(splitter, tokens)} + split_tokens = [t.partition("=") for t in line[:-1].split("|")] + data_dict = {k: v for k, _, v in split_tokens} try: return cls(**data_dict) @@ -108,6 +108,7 @@ class AuthenticationRequest(GatewayControl): encoding: Encoding = Encoding.DBN details: str | None = None ts_out: str = "0" + client: str = USER_AGENT @dataclasses.dataclass diff --git a/tests/test_live_gateway_messages.py b/tests/test_live_gateway_messages.py index 18d2c77..e07a4ab 100644 --- a/tests/test_live_gateway_messages.py +++ b/tests/test_live_gateway_messages.py @@ -86,16 +86,18 @@ def test_parse_authentication_request( AuthenticationRequest( auth="abcd1234", dataset=Dataset.GLBX_MDP3, + client="unittest", ), - b"auth=abcd1234|dataset=GLBX.MDP3|encoding=dbn|ts_out=0\n", + b"auth=abcd1234|dataset=GLBX.MDP3|encoding=dbn|ts_out=0|client=unittest\n", ), pytest.param( AuthenticationRequest( auth="abcd1234", dataset=Dataset.XNAS_ITCH, ts_out="1", + client="unittest", ), - b"auth=abcd1234|dataset=XNAS.ITCH|encoding=dbn|ts_out=1\n", + b"auth=abcd1234|dataset=XNAS.ITCH|encoding=dbn|ts_out=1|client=unittest\n", ), ], ) @@ -243,6 +245,7 @@ def test_serialize_greeting( "line, expected", [ pytest.param("start_session=0\n", "0"), + pytest.param("start_session\n", "", id="no_value"), pytest.param("start_session=0", ValueError, id="no_newline"), pytest.param("start_session=0|extra=key\n", ValueError, id="extra_key"), ], From 9e22ab2e1e1c5c51887c28cc93e25b2906ea38fb Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Fri, 20 Oct 2023 08:50:43 -0700 Subject: [PATCH 6/8] MOD: Upgrade databento-dbn version to 0.13.0 --- CHANGELOG.md | 2 +- README.md | 2 +- databento/common/dbnstore.py | 1 - pyproject.toml | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbbb3fb..b221e94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ #### Enhancements - Added `price_type` argument for `DBNStore.to_df` to specify if price fields should be `fixed`, `float` or `decimal.Decimal` -- Upgraded `databento-dbn` to 0.12.0 +- Upgraded `databento-dbn` to 0.13.0 #### Breaking Changes - Changed outputs of `DBNStore.to_csv` and `DBNStore.to_json` to match the encoding formats from the Databento API diff --git a/README.md b/README.md index df496a3..c082e32 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ The library is fully compatible with the latest distribution of Anaconda 3.8 and The minimum dependencies as found in the `pyproject.toml` are also listed below: - python = "^3.8" - aiohttp = "^3.8.3" -- databento-dbn = "0.12.0" +- databento-dbn = "0.13.0" - numpy= ">=1.23.5" - pandas = ">=1.5.3" - requests = ">=2.24.0" diff --git a/databento/common/dbnstore.py b/databento/common/dbnstore.py index eba994b..f7fef5e 100644 --- a/databento/common/dbnstore.py +++ b/databento/common/dbnstore.py @@ -1129,7 +1129,6 @@ def _transcode( pretty_px=pretty_px, pretty_ts=pretty_ts, has_metadata=True, - input_compression=Compression.NONE, symbol_map=symbol_map, # type: ignore [arg-type] schema=schema, ) diff --git a/pyproject.toml b/pyproject.toml index d975959..7d85ebf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ repository = "https://github.com/databento/databento-python" [tool.poetry.dependencies] python = "^3.8" aiohttp = "^3.8.3" -databento-dbn = "0.12.0" +databento-dbn = "0.13.0" numpy = ">=1.23.5" pandas = ">=1.5.3" requests = ">=2.24.0" From 175112d6dba2bb9eb7f8139a2438fdee787d52b3 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Mon, 23 Oct 2023 08:47:51 -0700 Subject: [PATCH 7/8] VER: Release 0.22.0 --- CHANGELOG.md | 2 +- databento/version.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b221e94..ed0adf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.22.0 - TBD +## 0.22.0 - 2023-10-23 #### Enhancements - Added `price_type` argument for `DBNStore.to_df` to specify if price fields should be `fixed`, `float` or `decimal.Decimal` diff --git a/databento/version.py b/databento/version.py index 6a726d8..5963297 100644 --- a/databento/version.py +++ b/databento/version.py @@ -1 +1 @@ -__version__ = "0.21.0" +__version__ = "0.22.0" diff --git a/pyproject.toml b/pyproject.toml index 7d85ebf..9fe6e30 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databento" -version = "0.21.0" +version = "0.22.0" description = "Official Python client library for Databento" authors = [ "Databento ", From d6c53066fb982351a0717666a7132cfcb0f8dedd Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Mon, 23 Oct 2023 09:35:41 -0700 Subject: [PATCH 8/8] ADD: Marker file py.typed for databento-python --- CHANGELOG.md | 1 + databento/py.typed | 0 pyproject.toml | 5 ++++- 3 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 databento/py.typed diff --git a/CHANGELOG.md b/CHANGELOG.md index ed0adf6..ef89d34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ #### Enhancements - Added `price_type` argument for `DBNStore.to_df` to specify if price fields should be `fixed`, `float` or `decimal.Decimal` +- Added `py.typed` marker file - Upgraded `databento-dbn` to 0.13.0 #### Breaking Changes diff --git a/databento/py.typed b/databento/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index 9fe6e30..41186d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,10 @@ authors = [ "Databento ", ] license = "Apache License 2.0" -packages = [{include = "databento"}] +packages = [ + {include = "databento"}, + {include = "databento/py.typed"}, +] classifiers = [ "Development Status :: 4 - Beta", "Operating System :: OS Independent",