From 90e5d39426af75c173a4af50ea17dbc763a622a4 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Fri, 6 Sep 2024 08:56:28 -0700 Subject: [PATCH 01/10] ADD: Add mode parameter for DBNStore exports --- CHANGELOG.md | 12 ++++ databento/common/dbnstore.py | 23 +++++-- tests/test_historical_bento.py | 120 +++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8eeff13..1037464 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## 0.42.0 - TBD + +#### Enhancements +- Added `mode` parameter to `DBNStore.to_csv` to control the file writing mode +- Added `mode` parameter to `DBNStore.to_json` to control the file writing mode +- Added `mode` parameter to `DBNStore.to_parquet` to control the file writing mode + +#### Breaking changes +- Changed default write mode for `DBNStore.to_csv` to overwrite ("w") +- Changed default write mode for `DBNStore.to_json` to overwrite ("w") +- Changed default write mode for `DBNStore.to_parquet` to overwrite ("w") + ## 0.41.0 - 2024-09-03 #### Enhancements diff --git a/databento/common/dbnstore.py b/databento/common/dbnstore.py index c716d56..3db33f7 100644 --- a/databento/common/dbnstore.py +++ b/databento/common/dbnstore.py @@ -792,6 +792,7 @@ def to_csv( map_symbols: bool = True, compression: Compression | str = Compression.NONE, schema: Schema | str | None = None, + mode: Literal["w", "x"] = "w", ) -> None: """ Write the data to a file in CSV format. @@ -816,6 +817,8 @@ def to_csv( schema : Schema or str, optional The DBN schema for the csv. This is only required when reading a DBN stream with mixed record types. + mode : str, default "w" + The file write mode to use, either "x" or "w". Raises ------ @@ -825,14 +828,15 @@ def to_csv( """ compression = validate_enum(compression, Compression, "compression") schema = validate_maybe_enum(schema, Schema, "schema") + file_path = validate_file_write_path(path, "path", exist_ok=mode == "w") if schema is None: if self.schema is None: raise ValueError("a schema must be specified for mixed DBN data") schema = self.schema - with open(path, "xb") as output: + with open(file_path, f"{mode}b") as output: self._transcode( - output=output, + output=output, # type: ignore [arg-type] encoding=Encoding.CSV, pretty_px=pretty_px, pretty_ts=pretty_ts, @@ -961,6 +965,7 @@ def to_parquet( pretty_ts: bool = True, map_symbols: bool = True, schema: Schema | str | None = None, + mode: Literal["w", "x"] = "w", **kwargs: Any, ) -> None: """ @@ -983,6 +988,8 @@ def to_parquet( schema : Schema or str, optional The DBN schema for the parquet file. This is only required when reading a DBN stream with mixed record types. + mode : str, default "w" + The file write mode to use, either "x" or "w". Raises ------ @@ -994,6 +1001,7 @@ def to_parquet( if price_type == "decimal": raise ValueError("the 'decimal' price type is not currently supported") + file_path = validate_file_write_path(path, "path", exist_ok=mode == "w") schema = validate_maybe_enum(schema, Schema, "schema") if schema is None: if self.schema is None: @@ -1015,7 +1023,7 @@ def to_parquet( # Initialize the writer using the first DataFrame parquet_schema = pa.Schema.from_pandas(frame) writer = pq.ParquetWriter( - where=path, + where=file_path, schema=parquet_schema, **kwargs, ) @@ -1066,6 +1074,7 @@ def to_json( map_symbols: bool = True, compression: Compression | str = Compression.NONE, schema: Schema | str | None = None, + mode: Literal["w", "x"] = "w", ) -> None: """ Write the data to a file in JSON format. @@ -1089,6 +1098,8 @@ def to_json( schema : Schema or str, optional The DBN schema for the json. This is only required when reading a DBN stream with mixed record types. + mode : str, default "w" + The file write mode to use, either "x" or "w". Raises ------ @@ -1098,14 +1109,16 @@ def to_json( """ compression = validate_enum(compression, Compression, "compression") schema = validate_maybe_enum(schema, Schema, "schema") + file_path = validate_file_write_path(path, "path", exist_ok=mode == "w") + if schema is None: if self.schema is None: raise ValueError("a schema must be specified for mixed DBN data") schema = self.schema - with open(path, "xb") as output: + with open(file_path, f"{mode}b") as output: self._transcode( - output=output, + output=output, # type: ignore [arg-type] encoding=Encoding.JSON, pretty_px=pretty_px, pretty_ts=pretty_ts, diff --git a/tests/test_historical_bento.py b/tests/test_historical_bento.py index c09b424..35609de 100644 --- a/tests/test_historical_bento.py +++ b/tests/test_historical_bento.py @@ -243,6 +243,126 @@ def test_to_file_exclusive( dbnstore.to_file(path=dbn_path, mode="x") +def test_to_csv_overwrite( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the default write mode allows files to be overwritten. + """ + # Arrange + stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO) + dbnstore = DBNStore.from_bytes(data=stub_data) + csv_path = tmp_path / "my_test.csv" + dbnstore.to_csv(path=csv_path) + assert csv_path.stat().st_size == 623 + + # Act + dbnstore.to_csv(path=csv_path) + + # Assert + assert csv_path.exists() + assert csv_path.stat().st_size == 623 + + +def test_to_csv_exclusive( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the exclusive write mode correctly rejects an existing file path. + """ + # Arrange + stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO) + dbnstore = DBNStore.from_bytes(data=stub_data) + csv_path = tmp_path / "my_test.csv" + dbnstore.to_csv(path=csv_path) + + # Act, Assert + with pytest.raises(FileExistsError): + dbnstore.to_csv(path=csv_path, mode="x") + + +def test_to_json_overwrite( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the default write mode allows files to be overwritten. + """ + # Arrange + stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO) + dbnstore = DBNStore.from_bytes(data=stub_data) + json_path = tmp_path / "my_test.json" + dbnstore.to_json(path=json_path) + assert json_path.stat().st_size == 1216 + + # Act + dbnstore.to_json(path=json_path) + + # Assert + assert json_path.exists() + assert json_path.stat().st_size == 1216 + + +def test_to_json_exclusive( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the exclusive write mode correctly rejects an existing file path. + """ + # Arrange + stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO) + dbnstore = DBNStore.from_bytes(data=stub_data) + json_path = tmp_path / "my_test.json" + dbnstore.to_json(path=json_path) + + # Act, Assert + with pytest.raises(FileExistsError): + dbnstore.to_json(path=json_path, mode="x") + + +def test_to_parquet_overwrite( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the default write mode allows files to be overwritten. + """ + # Arrange + stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO) + dbnstore = DBNStore.from_bytes(data=stub_data) + parquet_path = tmp_path / "my_test.parquet" + dbnstore.to_parquet(path=parquet_path) + assert parquet_path.stat().st_size == 9888 + + # Act + dbnstore.to_parquet(path=parquet_path) + + # Assert + assert parquet_path.exists() + assert parquet_path.stat().st_size == 9888 + + +def test_to_parquet_exclusive( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the exclusive write mode correctly rejects an existing file path. + """ + # Arrange + stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO) + dbnstore = DBNStore.from_bytes(data=stub_data) + parquet_path = tmp_path / "my_test.parquet" + dbnstore.to_parquet(path=parquet_path) + + # Act, Assert + with pytest.raises(FileExistsError): + dbnstore.to_parquet(path=parquet_path, mode="x") + + def test_to_ndarray_with_stub_data_returns_expected_array( test_data: Callable[[Dataset, Schema], bytes], ) -> None: From a89433a8bb851d8d6a9446f1f19de455b9b72fe7 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Thu, 12 Sep 2024 12:14:08 +1000 Subject: [PATCH 02/10] DOC: Clarify input range for batch split_size --- databento/historical/api/batch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/databento/historical/api/batch.py b/databento/historical/api/batch.py index 9d99b29..47f4d57 100644 --- a/databento/historical/api/batch.py +++ b/databento/historical/api/batch.py @@ -124,6 +124,7 @@ def submit_job( A week starts on Sunday UTC. split_size : int, optional The maximum size (bytes) of each batched data file before being split. + Must be an integer between 1e9 and 10e9 inclusive (1GB - 10GB). packaging : Packaging or str {'none', 'zip', 'tar'}, optional The archive type to package all batched data files in. delivery : Delivery or str {'download', 's3', 'disk'}, default 'download' From 4d75909a14acfa49058f35005b41fc2080ba1f92 Mon Sep 17 00:00:00 2001 From: Luca Lin Date: Thu, 12 Sep 2024 06:05:48 -0400 Subject: [PATCH 03/10] MOD: Update to new Slack HQ invite link --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1f8abb3..3bdca7b 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![pypi-version](https://img.shields.io/pypi/v/databento)](https://pypi.org/project/databento) [![license](https://img.shields.io/github/license/databento/databento-python?color=blue)](./LICENSE) [![code-style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) -[![Slack](https://img.shields.io/badge/join_Slack-community-darkblue.svg?logo=slack)](https://join.slack.com/t/databento-hq/shared_invite/zt-24oqyrub9-MellISM2cdpQ7s_7wcXosw) +[![Slack](https://img.shields.io/badge/join_Slack-community-darkblue.svg?logo=slack)](https://join.slack.com/t/databento-hq/shared_invite/zt-1ryj8bb50-aASfjMVzoFbABHauQ335zg) The official Python client library for [Databento](https://databento.com). From cfc67a5202d93d3e28ab7d786e29ae29aaba2d2d Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Fri, 13 Sep 2024 10:47:06 +1000 Subject: [PATCH 04/10] MOD: Update public Slack link --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3bdca7b..cedd0da 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![pypi-version](https://img.shields.io/pypi/v/databento)](https://pypi.org/project/databento) [![license](https://img.shields.io/github/license/databento/databento-python?color=blue)](./LICENSE) [![code-style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) -[![Slack](https://img.shields.io/badge/join_Slack-community-darkblue.svg?logo=slack)](https://join.slack.com/t/databento-hq/shared_invite/zt-1ryj8bb50-aASfjMVzoFbABHauQ335zg) +[![Slack](https://img.shields.io/badge/join_Slack-community-darkblue.svg?logo=slack)](https://to.dbn.to/slack) The official Python client library for [Databento](https://databento.com). From 59acae25117f05b6a18e7e68ad9df753282bc82f Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Fri, 13 Sep 2024 10:41:31 -0700 Subject: [PATCH 05/10] FIX: Fix test to use new alias --- tests/test_historical_bento.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_historical_bento.py b/tests/test_historical_bento.py index 35609de..c0c9ab7 100644 --- a/tests/test_historical_bento.py +++ b/tests/test_historical_bento.py @@ -707,14 +707,14 @@ def test_from_file_given_various_paths_returns_expected_metadata( assert data.schema == expected_schema -def test_from_dbn_alias( +def test_read_dbn_alias( test_data_path: Callable[[Dataset, Schema], Path], ) -> None: # Arrange path = test_data_path(Dataset.GLBX_MDP3, Schema.MBO) # Act - data = databento.from_dbn(path=path) + data = databento.read_dbn(path=path) # Assert assert data.schema == Schema.MBO From f80864bdb84da94544504f06b9568018de4a1acb Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Fri, 13 Sep 2024 14:04:37 -0700 Subject: [PATCH 06/10] ADD: Add compression parameter to DBNStore.to_file --- CHANGELOG.md | 1 + databento/common/dbnstore.py | 33 +++++++++++++++++++++++++++++++-- tests/test_historical_bento.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1037464..426172b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Added `mode` parameter to `DBNStore.to_csv` to control the file writing mode - Added `mode` parameter to `DBNStore.to_json` to control the file writing mode - Added `mode` parameter to `DBNStore.to_parquet` to control the file writing mode +- Added `compression` parameter to `DBNStore.to_file` which controls the output compression format #### Breaking changes - Changed default write mode for `DBNStore.to_csv` to overwrite ("w") diff --git a/databento/common/dbnstore.py b/databento/common/dbnstore.py index 3db33f7..0675dd3 100644 --- a/databento/common/dbnstore.py +++ b/databento/common/dbnstore.py @@ -1041,6 +1041,7 @@ def to_file( self, path: PathLike[str] | str, mode: Literal["w", "x"] = "w", + compression: Compression | str | None = None, ) -> None: """ Write the data to a DBN file at the given path. @@ -1051,6 +1052,8 @@ def to_file( The file path to write to. mode : str, default "w" The file write mode to use, either "x" or "w". + compression : Compression or str, optional + The compression format to write. If `None`, uses the same compression as the underlying data. Raises ------ @@ -1062,9 +1065,35 @@ def to_file( If path is not writable. """ + compression = validate_maybe_enum(compression, Compression, "compression") file_path = validate_file_write_path(path, "path", exist_ok=mode == "w") - file_path.write_bytes(self._data_source.reader.read()) - self._data_source = FileDataSource(file_path) + + writer: IO[bytes] | zstandard.ZstdCompressionWriter + if compression is None or compression == self.compression: + # Handle trivial case + with open(file_path, mode=f"{mode}b") as writer: + reader = self._data_source.reader + while chunk := reader.read(2**16): + writer.write(chunk) + return + + if compression == Compression.ZSTD: + writer = zstandard.ZstdCompressor( + write_checksum=True, + ).stream_writer( + open(file_path, mode=f"{mode}b"), + closefd=True, + ) + else: + writer = open(file_path, mode=f"{mode}b") + + try: + reader = self.reader + + while chunk := reader.read(2**16): + writer.write(chunk) + finally: + writer.close() def to_json( self, diff --git a/tests/test_historical_bento.py b/tests/test_historical_bento.py index c0c9ab7..0d14c4d 100644 --- a/tests/test_historical_bento.py +++ b/tests/test_historical_bento.py @@ -22,6 +22,7 @@ from databento.common.error import BentoError from databento.common.publishers import Dataset from databento.common.types import DBNRecord +from databento_dbn import Compression from databento_dbn import MBOMsg from databento_dbn import Schema from databento_dbn import SType @@ -243,6 +244,36 @@ def test_to_file_exclusive( dbnstore.to_file(path=dbn_path, mode="x") +@pytest.mark.parametrize( + "compression", + [ + Compression.NONE, + Compression.ZSTD, + ], +) +def test_to_file_compression( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, + compression: Compression, +) -> None: + """ + Test that specifying a compression for DBNStore.to_file writes the desired + compression mode. + """ + # Arrange + stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO) + dbnstore = DBNStore.from_bytes(data=stub_data) + dbn_path = tmp_path / "my_test.dbn" + dbnstore.to_file( + path=dbn_path, + compression=compression, + ) + + # Act, Assert + new_store = databento.read_dbn(dbn_path) + assert new_store.compression == compression + + def test_to_csv_overwrite( test_data: Callable[[Dataset, Schema], bytes], tmp_path: Path, From 75d0f75056b95b826018ae117d2d9aad2e669d73 Mon Sep 17 00:00:00 2001 From: Renan Gemignani Date: Thu, 19 Sep 2024 16:18:56 +0200 Subject: [PATCH 07/10] ADD: DBN updates to support XNAS.BASIC --- CHANGELOG.md | 1 + databento/common/publishers.py | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 426172b..f54b547 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Added `mode` parameter to `DBNStore.to_json` to control the file writing mode - Added `mode` parameter to `DBNStore.to_parquet` to control the file writing mode - Added `compression` parameter to `DBNStore.to_file` which controls the output compression format +- Added new consolidated publisher values for `XNAS.BASIC` and `DBEQ.MAX` #### Breaking changes - Changed default write mode for `DBNStore.to_csv` to overwrite ("w") diff --git a/databento/common/publishers.py b/databento/common/publishers.py index dbc998b..813019b 100644 --- a/databento/common/publishers.py +++ b/databento/common/publishers.py @@ -955,6 +955,10 @@ class Publisher(StringyMixin, str, Enum): NYSE National BBO and Trades. XNYS_BBOTRADES_XNYS NYSE BBO and Trades. + XNAS_BASIC_DBEQ + Nasdaq Basic - Consolidated. + DBEQ_MAX_DBEQ + DBEQ Max - Consolidated. """ @@ -1050,6 +1054,8 @@ class Publisher(StringyMixin, str, Enum): DBEQ_SUMMARY_DBEQ = "DBEQ.SUMMARY.DBEQ" XCIS_BBOTRADES_XCIS = "XCIS.BBOTRADES.XCIS" XNYS_BBOTRADES_XNYS = "XNYS.BBOTRADES.XNYS" + XNAS_BASIC_DBEQ = "XNAS.BASIC.DBEQ" + DBEQ_MAX_DBEQ = "DBEQ.MAX.DBEQ" @classmethod def from_int(cls, value: int) -> Publisher: @@ -1240,6 +1246,10 @@ def from_int(cls, value: int) -> Publisher: return Publisher.XCIS_BBOTRADES_XCIS if value == 92: return Publisher.XNYS_BBOTRADES_XNYS + if value == 93: + return Publisher.XNAS_BASIC_DBEQ + if value == 94: + return Publisher.DBEQ_MAX_DBEQ raise ValueError(f"Integer value {value} does not correspond with any Publisher variant") def to_int(self) -> int: @@ -1430,6 +1440,10 @@ def to_int(self) -> int: return 91 if self == Publisher.XNYS_BBOTRADES_XNYS: return 92 + if self == Publisher.XNAS_BASIC_DBEQ: + return 93 + if self == Publisher.DBEQ_MAX_DBEQ: + return 94 raise ValueError("Invalid Publisher") @property @@ -1621,6 +1635,10 @@ def venue(self) -> Venue: return Venue.XCIS if self == Publisher.XNYS_BBOTRADES_XNYS: return Venue.XNYS + if self == Publisher.XNAS_BASIC_DBEQ: + return Venue.DBEQ + if self == Publisher.DBEQ_MAX_DBEQ: + return Venue.DBEQ raise ValueError("Unexpected Publisher value") @property @@ -1812,6 +1830,10 @@ def dataset(self) -> Dataset: return Dataset.XCIS_BBOTRADES if self == Publisher.XNYS_BBOTRADES_XNYS: return Dataset.XNYS_BBOTRADES + if self == Publisher.XNAS_BASIC_DBEQ: + return Dataset.XNAS_BASIC + if self == Publisher.DBEQ_MAX_DBEQ: + return Dataset.DBEQ_MAX raise ValueError("Unexpected Publisher value") @property @@ -2003,4 +2025,8 @@ def description(self) -> str: return "NYSE National BBO and Trades" if self == Publisher.XNYS_BBOTRADES_XNYS: return "NYSE BBO and Trades" + if self == Publisher.XNAS_BASIC_DBEQ: + return "Nasdaq Basic - Consolidated" + if self == Publisher.DBEQ_MAX_DBEQ: + return "DBEQ Max - Consolidated" raise ValueError("Unexpected Publisher value") From 14c452fb64708cb4392056e97512bea56b50a66c Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Mon, 23 Sep 2024 12:31:00 -0700 Subject: [PATCH 08/10] VER: Release 0.42.0 --- CHANGELOG.md | 2 +- databento/version.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f54b547..5b62d5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.42.0 - TBD +## 0.42.0 - 2024-09-23 #### Enhancements - Added `mode` parameter to `DBNStore.to_csv` to control the file writing mode diff --git a/databento/version.py b/databento/version.py index 22ffde2..92717f7 100644 --- a/databento/version.py +++ b/databento/version.py @@ -1 +1 @@ -__version__ = "0.41.0" +__version__ = "0.42.0" diff --git a/pyproject.toml b/pyproject.toml index 5bb5f0e..6f627e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databento" -version = "0.41.0" +version = "0.42.0" description = "Official Python client library for Databento" authors = [ "Databento ", From 1952440e83297f40e87d535a057a1b8fb6735704 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Fri, 13 Sep 2024 12:04:50 -0700 Subject: [PATCH 09/10] MOD: Improve DBNStore handling of truncated DBN --- CHANGELOG.md | 1 + databento/common/dbnstore.py | 56 ++++++---- tests/test_historical_bento.py | 193 +++++++++++++++++++++++++++++++-- 3 files changed, 215 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b62d5e..63dde2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Added `mode` parameter to `DBNStore.to_parquet` to control the file writing mode - Added `compression` parameter to `DBNStore.to_file` which controls the output compression format - Added new consolidated publisher values for `XNAS.BASIC` and `DBEQ.MAX` +- Changed `DBNStore` to be more tolerant of truncated DBN streams #### Breaking changes - Changed default write mode for `DBNStore.to_csv` to overwrite ("w") diff --git a/databento/common/dbnstore.py b/databento/common/dbnstore.py index 0675dd3..6c25ec1 100644 --- a/databento/common/dbnstore.py +++ b/databento/common/dbnstore.py @@ -4,9 +4,11 @@ import decimal import itertools import logging +import warnings from collections.abc import Generator from collections.abc import Iterator from collections.abc import Mapping +from io import BufferedReader from io import BytesIO from os import PathLike from pathlib import Path @@ -46,6 +48,7 @@ from databento.common.constants import SCHEMA_STRUCT_MAP from databento.common.constants import SCHEMA_STRUCT_MAP_V1 from databento.common.error import BentoError +from databento.common.error import BentoWarning from databento.common.symbology import InstrumentMap from databento.common.types import DBNRecord from databento.common.types import Default @@ -150,7 +153,7 @@ def __init__(self, source: PathLike[str] | str): ) self._name = self._path.name - self.__buffer: IO[bytes] | None = None + self.__buffer: BufferedReader | None = None @property def name(self) -> str: @@ -189,13 +192,13 @@ def path(self) -> Path: return self._path @property - def reader(self) -> IO[bytes]: + def reader(self) -> BufferedReader: """ Return a reader for this file. Returns ------- - IO + BufferedReader """ if self.__buffer is None: @@ -259,14 +262,14 @@ def nbytes(self) -> int: return self.__buffer.getbuffer().nbytes @property - def reader(self) -> IO[bytes]: + def reader(self) -> BytesIO: """ Return a reader for this buffer. The reader beings at the start of the buffer. Returns ------- - IO + BytesIO """ self.__buffer.seek(0) @@ -391,8 +394,8 @@ def __iter__(self) -> Generator[DBNRecord, None, None]: yield record else: if len(decoder.buffer()) > 0: - raise BentoError( - "DBN file is truncated or contains an incomplete record", + warnings.warn( + BentoWarning("DBN file is truncated or contains an incomplete record"), ) break @@ -516,7 +519,7 @@ def reader(self) -> IO[bytes]: Returns ------- - BinaryIO + IO[bytes] See Also -------- @@ -524,13 +527,10 @@ def reader(self) -> IO[bytes]: """ if self.compression == Compression.ZSTD: - reader: IO[bytes] = zstandard.ZstdDecompressor().stream_reader( + return zstandard.ZstdDecompressor().stream_reader( self._data_source.reader, ) - else: - reader = self._data_source.reader - - return reader + return self._data_source.reader @property def schema(self) -> Schema | None: @@ -1281,8 +1281,10 @@ def _transcode( transcoder.write(byte_chunk) if transcoder.buffer(): - raise BentoError( - "DBN file is truncated or contains an incomplete record", + warnings.warn( + BentoWarning( + "DBN file is truncated or contains an incomplete record", + ), ) transcoder.flush() @@ -1327,6 +1329,7 @@ def __init__( self._dtype = np.dtype(dtype) self._offset = offset self._count = count + self._close_on_next = False self._reader.seek(offset) @@ -1334,21 +1337,30 @@ def __iter__(self) -> NDArrayStreamIterator: return self def __next__(self) -> np.ndarray[Any, Any]: + if self._close_on_next: + raise StopIteration + if self._count is None: read_size = -1 else: read_size = self._dtype.itemsize * max(self._count, 1) if buffer := self._reader.read(read_size): + loose_bytes = len(buffer) % self._dtype.itemsize + if loose_bytes != 0: + warnings.warn( + BentoWarning("DBN file is truncated or contains an incomplete record"), + ) + buffer = buffer[:-loose_bytes] + self._close_on_next = True # decode one more buffer before stopping + try: return np.frombuffer( buffer=buffer, dtype=self._dtype, ) - except ValueError: - raise BentoError( - "DBN file is truncated or contains an incomplete record", - ) + except ValueError as exc: + raise BentoError("Cannot decode DBN stream") from exc raise StopIteration @@ -1393,10 +1405,8 @@ def __next__(self) -> np.ndarray[Any, Any]: dtype=self._dtype, count=num_records, ) - except ValueError: - raise BentoError( - "DBN file is truncated or contains an incomplete record", - ) from None + except ValueError as exc: + raise BentoError("Cannot decode DBN stream") from exc class DataFrameIterator: diff --git a/tests/test_historical_bento.py b/tests/test_historical_bento.py index 0d14c4d..7090a0a 100644 --- a/tests/test_historical_bento.py +++ b/tests/test_historical_bento.py @@ -20,6 +20,7 @@ from databento.common.constants import SCHEMA_STRUCT_MAP from databento.common.dbnstore import DBNStore from databento.common.error import BentoError +from databento.common.error import BentoWarning from databento.common.publishers import Dataset from databento.common.types import DBNRecord from databento_dbn import Compression @@ -1086,7 +1087,7 @@ def test_dbnstore_buffer_short( tmp_path: Path, ) -> None: """ - Test that creating a DBNStore with missing bytes raises a BentoError when + Test that creating a DBNStore with missing bytes emits a BentoWarning when decoding. """ # Arrange @@ -1098,28 +1099,31 @@ def test_dbnstore_buffer_short( dbnstore = DBNStore.from_bytes(data=dbn_stub_data[:-2]) # Assert - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): list(dbnstore) - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): dbnstore.to_ndarray() - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): dbnstore.to_df() - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): dbnstore.to_csv(tmp_path / "test.csv") - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): dbnstore.to_json(tmp_path / "test.json") + with pytest.warns(BentoWarning): + dbnstore.to_parquet(tmp_path / "test.parquet") + def test_dbnstore_buffer_long( test_data: Callable[[Dataset, Schema], bytes], tmp_path: Path, ) -> None: """ - Test that creating a DBNStore with excess bytes raises a BentoError when + Test that creating a DBNStore with excess bytes emits a BentoWarning when decoding. """ # Arrange @@ -1132,21 +1136,24 @@ def test_dbnstore_buffer_long( dbnstore = DBNStore.from_bytes(data=dbn_stub_data) # Assert - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): list(dbnstore) - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): dbnstore.to_ndarray() - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): dbnstore.to_df() - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): dbnstore.to_csv(tmp_path / "test.csv") - with pytest.raises(BentoError): + with pytest.warns(BentoWarning): dbnstore.to_json(tmp_path / "test.json") + with pytest.warns(BentoWarning): + dbnstore.to_parquet(tmp_path / "test.parquet") + def test_dbnstore_buffer_rewind( test_data: Callable[[Dataset, Schema], bytes], @@ -1574,3 +1581,165 @@ def test_dbnstore_to_df_with_timezone_map_symbols( # Assert assert df["symbol"].notna().all() + + +def test_dbnstore_iterate_truncated_dbn( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the DBNStore makes a "best-effort" attempt to iterate a DBN + stream, even if it is corrupted/truncated. + """ + # Arrange + dbn_stub_data = ( + zstandard.ZstdDecompressor().stream_reader(test_data(Dataset.GLBX_MDP3, Schema.MBO)).read() + ) + truncated = tmp_path / "truncated.dbn" + truncated.write_bytes(dbn_stub_data[:-8]) # leave out 8 bytes of data + + # Act + complete_store = DBNStore.from_bytes(dbn_stub_data) + complete_records = list(complete_store) + truncated_store = DBNStore.from_file(path=truncated) + + # Assert + with pytest.warns(BentoWarning): + truncated_records = list(truncated_store) + + assert len(truncated_records) == len(complete_records) - 1 + + +def test_dbnstore_iterate_truncated_live_dbn( + live_test_data: bytes, + tmp_path: Path, +) -> None: + """ + Test that the DBNStore makes a "best-effort" attempt to iterate a live DBN + stream, even if it is corrupted/truncated. + """ + # Arrange + dbn_stub_data = zstandard.ZstdDecompressor().stream_reader(live_test_data).read() + truncated = tmp_path / "truncated.dbn" + truncated.write_bytes(dbn_stub_data[:-8]) # leave out 8 bytes of data + + # Act + complete_store = DBNStore.from_bytes(dbn_stub_data) + complete_records = list(complete_store) + truncated_store = DBNStore.from_file(path=truncated) + + # Assert + with pytest.warns(BentoWarning): + truncated_records = list(truncated_store) + + assert len(truncated_records) == len(complete_records) - 1 + + +def test_dbnstore_to_df_truncated_dbn( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the DBNStore makes a "best-effort" attempt to create a DataFrame + from a DBN stream, even if it is corrupted/truncated. + """ + # Arrange + dbn_stub_data = ( + zstandard.ZstdDecompressor().stream_reader(test_data(Dataset.GLBX_MDP3, Schema.MBO)).read() + ) + truncated = tmp_path / "truncated.dbn" + truncated.write_bytes(dbn_stub_data[:-8]) # leave out 8 bytes of data + + # Act + complete_store = DBNStore.from_bytes(dbn_stub_data) + complete_df = complete_store.to_df() + truncated_store = DBNStore.from_file(path=truncated) + + # Assert + with pytest.warns(BentoWarning): + truncated_df = truncated_store.to_df() + + assert len(truncated_df) == len(complete_df) - 1 + + +def test_dbnstore_to_df_truncated_live_dbn( + live_test_data: bytes, + tmp_path: Path, +) -> None: + """ + Test that the DBNStore makes a "best-effort" attempt to create a DataFrame + from a live DBN stream, even if it is corrupted/truncated. + """ + # Arrange + dbn_stub_data = zstandard.ZstdDecompressor().stream_reader(live_test_data).read() + truncated = tmp_path / "truncated.dbn" + truncated.write_bytes(dbn_stub_data[:-8]) # leave out 8 bytes of data + + # Act + complete_store = DBNStore.from_bytes(dbn_stub_data) + complete_df = complete_store.to_df( + schema=Schema.STATISTICS, + ) # Statistics is required because it is the last record + truncated_store = DBNStore.from_file(path=truncated) + + # Assert + with pytest.warns(BentoWarning): + truncated_df = truncated_store.to_df(schema=Schema.STATISTICS) + + assert len(truncated_df) == len(complete_df) - 1 + + +def test_dbnstore_transcode_truncated_dbn( + test_data: Callable[[Dataset, Schema], bytes], + tmp_path: Path, +) -> None: + """ + Test that the DBNStore makes a "best-effort" attempt to transocode from DBN + data, even if it is corrupted/truncated. + """ + # Arrange + dbn_stub_data = ( + zstandard.ZstdDecompressor().stream_reader(test_data(Dataset.GLBX_MDP3, Schema.MBO)).read() + ) + truncated = tmp_path / "truncated.dbn" + truncated.write_bytes(dbn_stub_data[:-8]) # leave out 8 bytes of data + + # Act + truncated_store = DBNStore.from_file(path=truncated) + + # Assert + with pytest.warns(BentoWarning): + truncated_store.to_csv(tmp_path / "truncated.csv") + + with pytest.warns(BentoWarning): + truncated_store.to_json(tmp_path / "truncated.json") + + with pytest.warns(BentoWarning): + truncated_store.to_parquet(tmp_path / "truncated.parquet") + + +def test_dbnstore_transcode_truncated_live_dbn( + live_test_data: bytes, + tmp_path: Path, +) -> None: + """ + Test that the DBNStore makes a "best-effort" attempt to transocode from + live DBN data, even if it is corrupted/truncated. + """ + # Arrange + dbn_stub_data = zstandard.ZstdDecompressor().stream_reader(live_test_data).read() + truncated = tmp_path / "truncated.dbn" + truncated.write_bytes(dbn_stub_data[:-8]) # leave out 8 bytes of data + + # Act + truncated_store = DBNStore.from_file(path=truncated) + + # Assert + with pytest.warns(BentoWarning): + truncated_store.to_csv(tmp_path / "truncated.csv", schema=Schema.STATISTICS) + + with pytest.warns(BentoWarning): + truncated_store.to_json(tmp_path / "truncated.json", schema=Schema.STATISTICS) + + with pytest.warns(BentoWarning): + truncated_store.to_parquet(tmp_path / "truncated.parquet", schema=Schema.STATISTICS) From 07b3b9cb7ae0627c33f0f39c3030643347217d30 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Tue, 24 Sep 2024 14:24:00 -0700 Subject: [PATCH 10/10] MOD: Re-enable release workflow --- .github/workflows/release.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4765980..e2bcd06 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,12 +1,12 @@ name: release -# on: -# workflow_run: -# workflows: -# - test -# branches: [main] -# types: -# - completed +on: + workflow_run: + workflows: + - test + branches: [main] + types: + - completed jobs: release: