Skip to content

Commit

Permalink
VER: Release 0.26.0
Browse files Browse the repository at this point in the history
See release notes.
  • Loading branch information
nmacholl authored Jan 16, 2024
2 parents 32e6ea1 + 89abe36 commit 18f45c2
Show file tree
Hide file tree
Showing 17 changed files with 225 additions and 93 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 0.26.0 - 2024-01-16

This release adds support for transcoding DBN data into Apache parquet.

#### Enhancements
- Added `DBNStore.to_parquet` for transcoding DBN data into Apache parquet using `pyarrow`
- Upgraded `databento-dbn` to 0.15.0

## 0.25.0 - 2024-01-09

#### Breaking changes
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ The library is fully compatible with the latest distribution of Anaconda 3.8 and
The minimum dependencies as found in the `pyproject.toml` are also listed below:
- python = "^3.8"
- aiohttp = "^3.8.3"
- databento-dbn = "0.14.2"
- databento-dbn = "0.15.0"
- numpy= ">=1.23.5"
- pandas = ">=1.5.3"
- pyarrow = ">=13.0.0"
- requests = ">=2.24.0"
- zstandard = ">=0.21.0"

Expand Down
105 changes: 88 additions & 17 deletions databento/common/dbnstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Any,
BinaryIO,
Callable,
Final,
Literal,
Protocol,
overload,
Expand All @@ -23,6 +24,8 @@
import databento_dbn
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import zstandard
from databento_dbn import FIXED_PRICE_SCALE
from databento_dbn import Compression
Expand Down Expand Up @@ -51,6 +54,8 @@

logger = logging.getLogger(__name__)

PARQUET_CHUNK_SIZE: Final = 2**16

if TYPE_CHECKING:
from databento.historical.client import Historical

Expand Down Expand Up @@ -791,18 +796,14 @@ def to_csv(
compression : Compression or str, default `Compression.NONE`
The output compression for writing.
schema : Schema or str, optional
The schema for the csv.
The DBN schema for the csv.
This is only required when reading a DBN stream with mixed record types.
Raises
------
ValueError
If the schema for the array cannot be determined.
Notes
-----
Requires all the data to be brought up into memory to then be written.
"""
compression = validate_enum(compression, Compression, "compression")
schema = validate_maybe_enum(schema, Schema, "schema")
Expand Down Expand Up @@ -870,7 +871,7 @@ def to_df(
a 'symbol' column, mapping the instrument ID to its requested symbol for
every record.
schema : Schema or str, optional
The schema for the dataframe.
The DBN schema for the dataframe.
This is only required when reading a DBN stream with mixed record types.
count : int, optional
If set, instead of returning a single `DataFrame` a `DataFrameIterator`
Expand All @@ -887,7 +888,7 @@ def to_df(
Raises
------
ValueError
If the schema for the array cannot be determined.
If the DBN schema is unspecified and cannot be determined.
"""
schema = validate_maybe_enum(schema, Schema, "schema")
Expand Down Expand Up @@ -919,6 +920,81 @@ def to_df(

return df_iter

def to_parquet(
self,
path: Path | str,
price_type: Literal["fixed", "float"] = "float",
pretty_ts: bool = True,
map_symbols: bool = True,
schema: Schema | str | None = None,
**kwargs: Any,
) -> None:
"""
Write the data to a parquet file at the given path.
Parameters
----------
price_type : str, default "float"
The price type to use for price fields.
If "fixed", prices will have a type of `int` in fixed decimal format; each unit representing 1e-9 or 0.000000001.
If "float", prices will have a type of `float`.
The "decimal" price type is not supported at this time.
pretty_ts : bool, default True
If all timestamp columns should be converted from UNIX nanosecond
`int` to tz-aware UTC `pyarrow.TimestampType`.
map_symbols : bool, default True
If symbology mappings from the metadata should be used to create
a 'symbol' column, mapping the instrument ID to its requested symbol for
every record.
schema : Schema or str, optional
The DBN schema for the parquet file.
This is only required when reading a DBN stream with mixed record types.
Raises
------
ValueError
If an incorrect price type is specified.
If the DBN schema is unspecified and cannot be determined.
"""
if price_type == "decimal":
raise ValueError("the 'decimal' price type is not currently supported")

schema = validate_maybe_enum(schema, Schema, "schema")
if schema is None:
if self.schema is None:
raise ValueError("a schema must be specified for mixed DBN data")
schema = self.schema

dataframe_iter = self.to_df(
price_type=price_type,
pretty_ts=pretty_ts,
map_symbols=map_symbols,
schema=schema,
count=PARQUET_CHUNK_SIZE,
)

writer = None
try:
for frame in dataframe_iter:
if writer is None:
# Initialize the writer using the first DataFrame
parquet_schema = pa.Schema.from_pandas(frame)
writer = pq.ParquetWriter(
where=path,
schema=parquet_schema,
**kwargs,
)
writer.write_table(
pa.Table.from_pandas(
frame,
schema=parquet_schema,
),
)
finally:
if writer is not None:
writer.close()

def to_file(self, path: Path | str) -> None:
"""
Write the data to a DBN file at the given path.
Expand Down Expand Up @@ -972,18 +1048,14 @@ def to_json(
compression : Compression or str, default `Compression.NONE`
The output compression for writing.
schema : Schema or str, optional
The schema for the json.
The DBN schema for the json.
This is only required when reading a DBN stream with mixed record types.
Raises
------
ValueError
If the schema for the array cannot be determined.
Notes
-----
Requires all the data to be brought up into memory to then be written.
"""
compression = validate_enum(compression, Compression, "compression")
schema = validate_maybe_enum(schema, Schema, "schema")
Expand Down Expand Up @@ -1030,7 +1102,7 @@ def to_ndarray(
Parameters
----------
schema : Schema or str, optional
The schema for the array.
The DBN schema for the array.
This is only required when reading a DBN stream with mixed record types.
count : int, optional
If set, instead of returning a single `np.ndarray` a `NDArrayIterator`
Expand All @@ -1047,7 +1119,7 @@ def to_ndarray(
Raises
------
ValueError
If the schema for the array cannot be determined.
If the DBN schema is unspecified and cannot be determined.
"""
schema = validate_maybe_enum(schema, Schema, "schema")
Expand Down Expand Up @@ -1120,7 +1192,7 @@ def _transcode(
pretty_ts=pretty_ts,
has_metadata=True,
map_symbols=map_symbols,
symbol_interval_map=symbol_map, # type: ignore [arg-type]
symbol_interval_map=symbol_map,
schema=schema,
)

Expand Down Expand Up @@ -1329,8 +1401,7 @@ def _format_px(
if price_type == "decimal":
for field in px_fields:
df[field] = (
df[field].replace(INT64_NULL, np.nan).apply(decimal.Decimal)
/ FIXED_PRICE_SCALE
df[field].replace(INT64_NULL, np.nan).apply(decimal.Decimal) / FIXED_PRICE_SCALE
)
elif price_type == "float":
for field in px_fields:
Expand Down
17 changes: 9 additions & 8 deletions databento/common/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from datetime import date
from functools import partial
from functools import singledispatch
from numbers import Number
from numbers import Integral
from typing import Any

import pandas as pd
from databento_dbn import SType
Expand Down Expand Up @@ -59,7 +60,7 @@ def optional_values_list_to_string(

@singledispatch
def optional_symbols_list_to_list(
symbols: Iterable[str] | Iterable[Number] | str | Number | None,
symbols: Iterable[str | int | Integral] | str | int | Integral | None,
stype_in: SType,
) -> list[str]:
"""
Expand All @@ -68,7 +69,7 @@ def optional_symbols_list_to_list(
Parameters
----------
symbols : iterable of str, iterable of Number, str, or Number optional
symbols : Iterable of str or int or Number, or str or int or Number, optional
The symbols to concatenate.
stype_in : SType
The input symbology type for the request.
Expand All @@ -84,7 +85,7 @@ def optional_symbols_list_to_list(
"""
raise TypeError(
f"`{symbols}` is not a valid type for symbol input; "
"allowed types are Iterable[str], Iterable[int], str, int, and None.",
"allowed types are Iterable[str | int], str, int, and None.",
)


Expand All @@ -102,10 +103,10 @@ def _(_: None, __: SType) -> list[str]:
return [ALL_SYMBOLS]


@optional_symbols_list_to_list.register(cls=Number)
def _(symbols: Number, stype_in: SType) -> list[str]:
@optional_symbols_list_to_list.register(cls=Integral)
def _(symbols: Integral, stype_in: SType) -> list[str]:
"""
Dispatch method for optional_symbols_list_to_list. Handles numerical types,
Dispatch method for optional_symbols_list_to_list. Handles integral types,
alerting when an integer is given for STypes that expect strings.
See Also
Expand Down Expand Up @@ -147,7 +148,7 @@ def _(symbols: str, stype_in: SType) -> list[str]:


@optional_symbols_list_to_list.register(cls=Iterable)
def _(symbols: Iterable[str] | Iterable[int], stype_in: SType) -> list[str]:
def _(symbols: Iterable[Any], stype_in: SType) -> list[str]:
"""
Dispatch method for optional_symbols_list_to_list. Handles Iterables by
dispatching the individual members.
Expand Down
5 changes: 3 additions & 2 deletions databento/historical/api/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import os
from collections.abc import Iterable
from datetime import date
from os import PathLike
from pathlib import Path
Expand Down Expand Up @@ -48,7 +49,7 @@ def __init__(self, key: str, gateway: str) -> None:
def submit_job(
self,
dataset: Dataset | str,
symbols: list[str] | str,
symbols: Iterable[str | int] | str | int,
schema: Schema | str,
start: pd.Timestamp | date | str | int,
end: pd.Timestamp | date | str | int | None = None,
Expand All @@ -75,7 +76,7 @@ def submit_job(
----------
dataset : Dataset or str
The dataset code (string identifier) for the request.
symbols : list[str | int] or str
symbols : Iterable[str | int] or str or int
The instrument symbols to filter for. Takes up to 2,000 symbols per request.
If more than 1 symbol is specified, the data is merged and sorted by time.
If 'ALL_SYMBOLS' or `None` then will be for **all** symbols.
Expand Down
13 changes: 7 additions & 6 deletions databento/historical/api/metadata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from collections.abc import Iterable
from datetime import date
from typing import Any

Expand Down Expand Up @@ -261,7 +262,7 @@ def get_record_count(
dataset: Dataset | str,
start: pd.Timestamp | date | str | int,
end: pd.Timestamp | date | str | int | None = None,
symbols: list[str] | str | None = None,
symbols: Iterable[str | int] | str | int | None = None,
schema: Schema | str = "trades",
stype_in: SType | str = "raw_symbol",
limit: int | None = None,
Expand All @@ -285,7 +286,7 @@ def get_record_count(
If an integer is passed, then this represents nanoseconds since the UNIX epoch.
Values are forward filled based on the resolution provided.
Defaults to the same value as `start`.
symbols : list[str | int] or str, optional
symbols : Iterable[str | int] or str or int, optional
The instrument symbols to filter for. Takes up to 2,000 symbols per request.
If 'ALL_SYMBOLS' or `None` then will be for **all** symbols.
schema : Schema or str {'mbo', 'mbp-1', 'mbp-10', 'trades', 'tbbo', 'ohlcv-1s', 'ohlcv-1m', 'ohlcv-1h', 'ohlcv-1d', 'definition', 'statistics', 'status'}, default 'trades' # noqa
Expand Down Expand Up @@ -329,7 +330,7 @@ def get_billable_size(
dataset: Dataset | str,
start: pd.Timestamp | date | str | int,
end: pd.Timestamp | date | str | int | None = None,
symbols: list[str] | str | None = None,
symbols: Iterable[str | int] | str | int | None = None,
schema: Schema | str = "trades",
stype_in: SType | str = "raw_symbol",
limit: int | None = None,
Expand All @@ -354,7 +355,7 @@ def get_billable_size(
If an integer is passed, then this represents nanoseconds since the UNIX epoch.
Values are forward filled based on the resolution provided.
Defaults to the same value as `start`.
symbols : list[str | int] or str, optional
symbols : Iterable[str | int] or str or int, optional
The instrument symbols to filter for. Takes up to 2,000 symbols per request.
If 'ALL_SYMBOLS' or `None` then will be for **all** symbols.
schema : Schema or str {'mbo', 'mbp-1', 'mbp-10', 'trades', 'tbbo', 'ohlcv-1s', 'ohlcv-1m', 'ohlcv-1h', 'ohlcv-1d', 'definition', 'statistics', 'status'}, default 'trades' # noqa
Expand Down Expand Up @@ -399,7 +400,7 @@ def get_cost(
start: pd.Timestamp | date | str | int,
end: pd.Timestamp | date | str | int | None = None,
mode: FeedMode | str = "historical-streaming",
symbols: list[str] | str | None = None,
symbols: Iterable[str | int] | str | int | None = None,
schema: Schema | str = "trades",
stype_in: SType | str = "raw_symbol",
limit: int | None = None,
Expand All @@ -426,7 +427,7 @@ def get_cost(
Defaults to the same value as `start`.
mode : FeedMode or str {'live', 'historical-streaming', 'historical'}, default 'historical-streaming'
The data feed mode for the request.
symbols : list[str | int] or str, optional
symbols : Iterable[str | int] or str or int, optional
The instrument symbols to filter for. Takes up to 2,000 symbols per request.
If 'ALL_SYMBOLS' or `None` then will be for **all** symbols.
schema : Schema or str {'mbo', 'mbp-1', 'mbp-10', 'trades', 'tbbo', 'ohlcv-1s', 'ohlcv-1m', 'ohlcv-1h', 'ohlcv-1d', 'definition', 'statistics', 'status'}, default 'trades' # noqa
Expand Down
Loading

0 comments on commit 18f45c2

Please sign in to comment.