Skip to content

Commit

Permalink
VER: Release 0.11.2
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen authored Jun 25, 2024
2 parents 7b93164 + 3a5ec2e commit 7390020
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 21 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 0.11.2 - 2024-06-25

#### Enhancements
- Added `historical::timeseries::get_range_to_file` method to persist the data stream to
a given path before returning an `AsyncDbnDecoder`
- Upgraded DBN version to 0.18.2

## 0.11.1 - 2024-06-11

#### Enhancements
Expand Down
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "databento"
authors = ["Databento <[email protected]>"]
version = "0.11.1"
version = "0.11.2"
edition = "2021"
repository = "https://github.com/databento/databento-rs"
description = "Official Databento client library"
Expand All @@ -24,7 +24,7 @@ live = ["dep:hex", "dep:sha2", "tokio/net"]

[dependencies]
# binary encoding
dbn = { version = "0.18.1", features = ["async", "serde"] }
dbn = { version = "0.18.2", features = ["async", "serde"] }
# Async stream trait
futures = { version = "0.3", optional = true }
# Used for Live authentication
Expand All @@ -45,6 +45,7 @@ tokio-util = { version = "0.7", features = ["io"], optional = true }
typed-builder = "0.18"

[dev-dependencies]
env_logger = "0.11.2"
env_logger = "0.11.3"
tempfile = "3.10.1"
tokio = { version = "1.38", features = ["full"] }
wiremock = "0.6"
217 changes: 201 additions & 16 deletions src/historical/timeseries.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//! The historical timeseries API.
use std::num::NonZeroU64;
use std::{num::NonZeroU64, path::PathBuf};

use dbn::{Compression, Encoding, SType, Schema, VersionUpgradePolicy};
use futures::TryStreamExt;
use dbn::{encode::AsyncDbnEncoder, Compression, Encoding, SType, Schema, VersionUpgradePolicy};
use futures::{Stream, TryStreamExt};
use reqwest::{header::ACCEPT, RequestBuilder};
use tokio::io::AsyncReadExt;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
};
use tokio_util::{bytes::Bytes, io::StreamReader};
use typed_builder::TypedBuilder;

use crate::Symbols;
Expand Down Expand Up @@ -34,17 +38,79 @@ impl TimeseriesClient<'_> {
&mut self,
params: &GetRangeParams,
) -> crate::Result<AsyncDbnDecoder<impl AsyncReadExt>> {
let reader = self
.get_range_impl(
&params.dataset,
params.schema,
params.stype_in,
params.stype_out,
&params.symbols,
&params.date_time_range,
params.limit,
)
.await?;
let mut decoder: AsyncDbnDecoder<_> = AsyncDbnDecoder::with_zstd_buffer(reader).await?;
decoder.set_upgrade_policy(params.upgrade_policy);
Ok(decoder)
}

/// Makes a streaming request for timeseries data from Databento.
///
/// This method returns a stream decoder. For larger requests, consider using
/// [`BatchClient::submit_job()`](super::batch::BatchClient::submit_job()).
///
/// # Errors
/// This function returns an error when it fails to communicate with the Databento API
/// or the API indicates there's an issue with the request. An error will also be returned
/// if it fails to create a new file at `path`.
pub async fn get_range_to_file(
&mut self,
params: &GetRangeToFileParams,
) -> crate::Result<AsyncDbnDecoder<impl AsyncReadExt>> {
let reader = self
.get_range_impl(
&params.dataset,
params.schema,
params.stype_in,
params.stype_out,
&params.symbols,
&params.date_time_range,
params.limit,
)
.await?;
let mut http_decoder = AsyncDbnDecoder::with_zstd_buffer(reader).await?;
http_decoder.set_upgrade_policy(params.upgrade_policy);
let file = BufWriter::new(File::create(&params.path).await?);
let mut encoder = AsyncDbnEncoder::with_zstd(file, http_decoder.metadata()).await?;
while let Some(rec_ref) = http_decoder.decode_record_ref().await? {
encoder.encode_record_ref(rec_ref).await?;
}
encoder.get_mut().shutdown().await?;
Ok(AsyncDbnDecoder::from_zstd_file(&params.path).await?)
}

#[allow(clippy::too_many_arguments)] // private method
async fn get_range_impl(
&mut self,
dataset: &str,
schema: Schema,
stype_in: SType,
stype_out: SType,
symbols: &Symbols,
date_time_range: &DateTimeRange,
limit: Option<NonZeroU64>,
) -> crate::Result<StreamReader<impl Stream<Item = std::io::Result<Bytes>>, Bytes>> {
let mut form = vec![
("dataset", params.dataset.to_string()),
("schema", params.schema.to_string()),
("dataset", dataset.to_owned()),
("schema", schema.to_string()),
("encoding", Encoding::Dbn.to_string()),
("compression", Compression::ZStd.to_string()),
("stype_in", params.stype_in.to_string()),
("stype_out", params.stype_out.to_string()),
("symbols", params.symbols.to_api_string()),
("stype_in", stype_in.to_string()),
("stype_out", stype_out.to_string()),
("symbols", symbols.to_api_string()),
];
params.date_time_range.add_to_form(&mut form);
if let Some(limit) = params.limit {
date_time_range.add_to_form(&mut form);
if let Some(limit) = limit {
form.push(("limit", limit.to_string()));
}
let resp = self
Expand All @@ -59,10 +125,7 @@ impl TimeseriesClient<'_> {
.error_for_status()?
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let reader = tokio_util::io::StreamReader::new(stream);
let mut decoder: AsyncDbnDecoder<_> = AsyncDbnDecoder::with_zstd_buffer(reader).await?;
decoder.set_upgrade_policy(params.upgrade_policy);
Ok(decoder)
Ok(tokio_util::io::StreamReader::new(stream))
}

fn post(&mut self, slug: &str) -> crate::Result<RequestBuilder> {
Expand Down Expand Up @@ -101,9 +164,76 @@ pub struct GetRangeParams {
pub upgrade_policy: VersionUpgradePolicy,
}

/// The parameters for [`TimeseriesClient::get_range_to_file()`]. Use
/// [`GetRangeToFileParams::builder()`] to get a builder type with all the preset defaults.
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct GetRangeToFileParams {
/// The dataset code.
#[builder(setter(transform = |dt: impl ToString| dt.to_string()))]
pub dataset: String,
/// The symbols to filter for.
#[builder(setter(into))]
pub symbols: Symbols,
/// The data record schema.
pub schema: Schema,
/// The request time range.
#[builder(setter(into))]
pub date_time_range: DateTimeRange,
/// The symbology type of the input `symbols`. Defaults to
/// [`RawSymbol`](dbn::enums::SType::RawSymbol).
#[builder(default = SType::RawSymbol)]
pub stype_in: SType,
/// The symbology type of the output `symbols`. Defaults to
/// [`InstrumentId`](dbn::enums::SType::InstrumentId).
#[builder(default = SType::InstrumentId)]
pub stype_out: SType,
/// The optional maximum number of records to return. Defaults to no limit.
#[builder(default)]
pub limit: Option<NonZeroU64>,
/// How to decode DBN from prior versions. Defaults to upgrade.
#[builder(default = VersionUpgradePolicy::Upgrade)]
pub upgrade_policy: VersionUpgradePolicy,
/// The file path to persist the stream data to.
#[builder(default, setter(transform = |p: impl Into<PathBuf>| p.into()))]
pub path: PathBuf,
}

impl From<GetRangeToFileParams> for GetRangeParams {
fn from(value: GetRangeToFileParams) -> Self {
Self {
dataset: value.dataset,
symbols: value.symbols,
schema: value.schema,
date_time_range: value.date_time_range,
stype_in: value.stype_in,
stype_out: value.stype_out,
limit: value.limit,
upgrade_policy: value.upgrade_policy,
}
}
}

impl GetRangeParams {
/// Converts these parameters into a request that will be persisted to a file
/// at `path`. Used in conjunction with [`TimeseriesClient::get_range_to_file()``].
pub fn with_path(self, path: impl Into<PathBuf>) -> GetRangeToFileParams {
GetRangeToFileParams {
dataset: self.dataset,
symbols: self.symbols,
schema: self.schema,
date_time_range: self.date_time_range,
stype_in: self.stype_in,
stype_out: self.stype_out,
limit: self.limit,
upgrade_policy: self.upgrade_policy,
path: path.into(),
}
}
}

#[cfg(test)]
mod tests {
use dbn::record::TradeMsg;
use dbn::{record::TradeMsg, Dataset};
use reqwest::StatusCode;
use time::macros::datetime;
use wiremock::{
Expand Down Expand Up @@ -169,4 +299,59 @@ mod tests {
decoder.decode_record::<TradeMsg>().await.unwrap().unwrap();
assert!(decoder.decode_record::<TradeMsg>().await.unwrap().is_none());
}

#[tokio::test]
async fn test_get_range_to_file() {
const START: time::OffsetDateTime = datetime!(2024 - 05 - 17 00:00 UTC);
const END: time::OffsetDateTime = datetime!(2024 - 05 - 18 00:00 UTC);
const SCHEMA: Schema = Schema::Trades;
const DATASET: &str = Dataset::IfeuImpact.as_str();

let mock_server = MockServer::start().await;
let temp_dir = tempfile::TempDir::new().unwrap();
let bytes = tokio::fs::read(zst_test_data_path(SCHEMA)).await.unwrap();
Mock::given(method("POST"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/timeseries.get_range")))
.and(body_contains("dataset", DATASET))
.and(body_contains("schema", "trades"))
.and(body_contains("symbols", "BRN.FUT"))
.and(body_contains(
"start",
START.unix_timestamp_nanos().to_string(),
))
.and(body_contains("end", END.unix_timestamp_nanos().to_string()))
// // default
.and(body_contains("stype_in", "parent"))
.and(body_contains("stype_out", "instrument_id"))
.respond_with(ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_bytes(bytes))
.mount(&mock_server)
.await;
let mut target = HistoricalClient::with_url(
mock_server.uri(),
API_KEY.to_owned(),
HistoricalGateway::Bo1,
)
.unwrap();
let path = temp_dir.path().join("test.dbn.zst");
let mut decoder = target
.timeseries()
.get_range_to_file(
&GetRangeToFileParams::builder()
.dataset(DATASET)
.schema(SCHEMA)
.symbols(vec!["BRN.FUT"])
.stype_in(SType::Parent)
.date_time_range((START, END))
.path(path.clone())
.build(),
)
.await
.unwrap();
assert_eq!(decoder.metadata().schema.unwrap(), SCHEMA);
// Two records
decoder.decode_record::<TradeMsg>().await.unwrap().unwrap();
decoder.decode_record::<TradeMsg>().await.unwrap().unwrap();
assert!(decoder.decode_record::<TradeMsg>().await.unwrap().is_none());
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! It provides clients for fast, safe streaming of both real-time and historical market data through
//! similar interfaces.
//! The library is built on top of the tokio asynchronous runtime and
//! [Databento's efficient binary encoding](https://databento.com/docs/knowledge-base/new-users/dbn-encoding).
//! [Databento's efficient binary encoding](https://databento.com/docs/standards-and-conventions/databento-binary-encoding).
//!
//! You can find getting started tutorials, full API method documentation, examples
//! with output on the [Databento docs site](https://databento.com/docs/?historical=rust&live=rust).
Expand Down
2 changes: 1 addition & 1 deletion src/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Subscription {
#[builder(default, setter(strip_option))]
pub start: Option<OffsetDateTime>,
#[doc(hidden)]
/// Reserved for future use.
/// Request subscription with snapshot. Defaults to `false`. Conflicts with the `start` parameter.
#[builder(setter(strip_bool))]
pub use_snapshot: bool,
}
Expand Down

0 comments on commit 7390020

Please sign in to comment.