Skip to content

Commit

Permalink
MOD: Improve DBNStore handling of truncated DBN
Browse files Browse the repository at this point in the history
  • Loading branch information
nmacholl committed Sep 24, 2024
1 parent 14c452f commit 1952440
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Added `mode` parameter to `DBNStore.to_parquet` to control the file writing mode
- Added `compression` parameter to `DBNStore.to_file` which controls the output compression format
- Added new consolidated publisher values for `XNAS.BASIC` and `DBEQ.MAX`
- Changed `DBNStore` to be more tolerant of truncated DBN streams

#### Breaking changes
- Changed default write mode for `DBNStore.to_csv` to overwrite ("w")
Expand Down
56 changes: 33 additions & 23 deletions databento/common/dbnstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import decimal
import itertools
import logging
import warnings
from collections.abc import Generator
from collections.abc import Iterator
from collections.abc import Mapping
from io import BufferedReader
from io import BytesIO
from os import PathLike
from pathlib import Path
Expand Down Expand Up @@ -46,6 +48,7 @@
from databento.common.constants import SCHEMA_STRUCT_MAP
from databento.common.constants import SCHEMA_STRUCT_MAP_V1
from databento.common.error import BentoError
from databento.common.error import BentoWarning
from databento.common.symbology import InstrumentMap
from databento.common.types import DBNRecord
from databento.common.types import Default
Expand Down Expand Up @@ -150,7 +153,7 @@ def __init__(self, source: PathLike[str] | str):
)

self._name = self._path.name
self.__buffer: IO[bytes] | None = None
self.__buffer: BufferedReader | None = None

@property
def name(self) -> str:
Expand Down Expand Up @@ -189,13 +192,13 @@ def path(self) -> Path:
return self._path

@property
def reader(self) -> IO[bytes]:
def reader(self) -> BufferedReader:
"""
Return a reader for this file.
Returns
-------
IO
BufferedReader
"""
if self.__buffer is None:
Expand Down Expand Up @@ -259,14 +262,14 @@ def nbytes(self) -> int:
return self.__buffer.getbuffer().nbytes

@property
def reader(self) -> IO[bytes]:
def reader(self) -> BytesIO:
"""
Return a reader for this buffer. The reader beings at the start of the
buffer.
Returns
-------
IO
BytesIO
"""
self.__buffer.seek(0)
Expand Down Expand Up @@ -391,8 +394,8 @@ def __iter__(self) -> Generator[DBNRecord, None, None]:
yield record
else:
if len(decoder.buffer()) > 0:
raise BentoError(
"DBN file is truncated or contains an incomplete record",
warnings.warn(
BentoWarning("DBN file is truncated or contains an incomplete record"),
)
break

Expand Down Expand Up @@ -516,21 +519,18 @@ def reader(self) -> IO[bytes]:
Returns
-------
BinaryIO
IO[bytes]
See Also
--------
DBNStore.raw
"""
if self.compression == Compression.ZSTD:
reader: IO[bytes] = zstandard.ZstdDecompressor().stream_reader(
return zstandard.ZstdDecompressor().stream_reader(
self._data_source.reader,
)
else:
reader = self._data_source.reader

return reader
return self._data_source.reader

@property
def schema(self) -> Schema | None:
Expand Down Expand Up @@ -1281,8 +1281,10 @@ def _transcode(
transcoder.write(byte_chunk)

if transcoder.buffer():
raise BentoError(
"DBN file is truncated or contains an incomplete record",
warnings.warn(
BentoWarning(
"DBN file is truncated or contains an incomplete record",
),
)

transcoder.flush()
Expand Down Expand Up @@ -1327,28 +1329,38 @@ def __init__(
self._dtype = np.dtype(dtype)
self._offset = offset
self._count = count
self._close_on_next = False

self._reader.seek(offset)

def __iter__(self) -> NDArrayStreamIterator:
return self

def __next__(self) -> np.ndarray[Any, Any]:
if self._close_on_next:
raise StopIteration

if self._count is None:
read_size = -1
else:
read_size = self._dtype.itemsize * max(self._count, 1)

if buffer := self._reader.read(read_size):
loose_bytes = len(buffer) % self._dtype.itemsize
if loose_bytes != 0:
warnings.warn(
BentoWarning("DBN file is truncated or contains an incomplete record"),
)
buffer = buffer[:-loose_bytes]
self._close_on_next = True # decode one more buffer before stopping

try:
return np.frombuffer(
buffer=buffer,
dtype=self._dtype,
)
except ValueError:
raise BentoError(
"DBN file is truncated or contains an incomplete record",
)
except ValueError as exc:
raise BentoError("Cannot decode DBN stream") from exc

raise StopIteration

Expand Down Expand Up @@ -1393,10 +1405,8 @@ def __next__(self) -> np.ndarray[Any, Any]:
dtype=self._dtype,
count=num_records,
)
except ValueError:
raise BentoError(
"DBN file is truncated or contains an incomplete record",
) from None
except ValueError as exc:
raise BentoError("Cannot decode DBN stream") from exc


class DataFrameIterator:
Expand Down
Loading

0 comments on commit 1952440

Please sign in to comment.