From b4366921c32310da762f3c28528691e7fd5fc81c Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 24 Oct 2023 08:19:54 -0500 Subject: [PATCH 01/10] DOC: Update Rust client README --- README.md | 18 +++++++++++------- tests/historical-example.rs | 8 ++++++-- tests/live-example.rs | 10 +++++----- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index da42c42..88409dd 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,8 @@ Here is a simple program that fetches the next ES mini futures trade: use std::error::Error; use databento::{ - dbn::{Dataset, SType, Schema, TradeMsg}, - live::{Subscription, SymbolMap}, + dbn::{Dataset, PitSymbolMap, SType, Schema, TradeMsg}, + live::Subscription, LiveClient, }; @@ -51,13 +51,13 @@ async fn main() -> Result<(), Box> { .unwrap(); client.start().await?; - let mut symbol_map = SymbolMap::new(); + let mut symbol_map = PitSymbolMap::new(); // Get the next trade loop { let rec = client.next_record().await?.unwrap(); if let Some(trade) = rec.get::() { - let symbol = &symbol_map[trade.hd.instrument_id]; - println!("Received trade for {symbol}: {trade:?}",); + let symbol = &symbol_map[trade]; + println!("Received trade for {symbol}: {trade:?}"); break; } symbol_map.on_record(rec)?; @@ -78,7 +78,7 @@ use databento::{ historical::timeseries::GetRangeParams, HistoricalClient, Symbols, }; -use time::macros::datetime; +use time::macros::{date, datetime}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -97,8 +97,12 @@ async fn main() -> Result<(), Box> { .build(), ) .await?; + let symbol_map = decoder + .metadata() + .symbol_map_for_date(date!(2022 - 06 - 10))?; while let Some(trade) = decoder.decode_record::().await? { - println!("{trade:?}"); + let symbol = &symbol_map[trade]; + println!("Received trade for {symbol}: {trade:?}"); } Ok(()) } diff --git a/tests/historical-example.rs b/tests/historical-example.rs index d3b6c85..1af9f56 100644 --- a/tests/historical-example.rs +++ b/tests/historical-example.rs @@ -6,7 +6,7 @@ use databento::{ historical::timeseries::GetRangeParams, HistoricalClient, Symbols, }; -use time::macros::datetime; +use time::macros::{date, datetime}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -25,8 +25,12 @@ async fn main() -> Result<(), Box> { .build(), ) .await?; + let symbol_map = decoder + .metadata() + .symbol_map_for_date(date!(2022 - 06 - 10))?; while let Some(trade) = decoder.decode_record::().await? { - println!("{trade:?}"); + let symbol = &symbol_map[trade]; + println!("Received trade for {symbol}: {trade:?}"); } Ok(()) } diff --git a/tests/live-example.rs b/tests/live-example.rs index ec59c85..1b222ba 100644 --- a/tests/live-example.rs +++ b/tests/live-example.rs @@ -2,8 +2,8 @@ use std::error::Error; use databento::{ - dbn::{Dataset, SType, Schema, TradeMsg}, - live::{Subscription, SymbolMap}, + dbn::{Dataset, PitSymbolMap, SType, Schema, TradeMsg}, + live::Subscription, LiveClient, }; @@ -26,13 +26,13 @@ async fn main() -> Result<(), Box> { .unwrap(); client.start().await?; - let mut symbol_map = SymbolMap::new(); + let mut symbol_map = PitSymbolMap::new(); // Get the next trade loop { let rec = client.next_record().await?.unwrap(); if let Some(trade) = rec.get::() { - let symbol = &symbol_map[trade.hd.instrument_id]; - println!("Received trade for {symbol}: {trade:?}",); + let symbol = &symbol_map[trade]; + println!("Received trade for {symbol}: {trade:?}"); break; } symbol_map.on_record(rec)?; From 3b9c807e00df319a314849c1bf23595f5d3da908 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 31 Oct 2023 11:06:08 -0500 Subject: [PATCH 02/10] DOC: Add links to DBN introduction --- README.md | 1 + src/lib.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 88409dd..77eae1b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # databento-rs [![build](https://github.com/databento/databento-rs/actions/workflows/build.yaml/badge.svg)](https://github.com/databento/dbn/actions/workflows/build.yaml) +[![Documentation](https://img.shields.io/docsrs/databento)](https://docs.rs/databento/latest/databento/) [![license](https://img.shields.io/github/license/databento/databento-rs?color=blue)](./LICENSE) [![Current Crates.io Version](https://img.shields.io/crates/v/databento.svg)](https://crates.io/crates/databento) [![Slack](https://img.shields.io/badge/join_Slack-community-darkblue.svg?logo=slack)](https://join.slack.com/t/databento-hq/shared_invite/zt-24oqyrub9-MellISM2cdpQ7s_7wcXosw) diff --git a/src/lib.rs b/src/lib.rs index 04ad681..57a2862 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,8 +4,8 @@ //! The library is built on top of the tokio asynchronous runtime and //! [Databento's efficient binary encoding](https://docs.databento.com/knowledge-base/new-users/dbn-encoding). //! -//! You can find getting started tutorials, full API method documentation, example -//! code and output on the [Databento docs site](https://docs.databento.com/?historical=rust&live=rust). +//! You can find getting started tutorials, full API method documentation, examples +//! with output on the [Databento docs site](https://docs.databento.com/?historical=rust&live=rust). //! //! # Feature flags //! By default both features are enabled. From 05f1b6b59e4ee9564cf4cc005d5e3c9d8f893a2d Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 2 Nov 2023 17:38:24 -0500 Subject: [PATCH 03/10] MOD: Improve historical error handling in Rust --- CHANGELOG.md | 8 ++++ src/error.rs | 36 ++++++++++++++++++ src/historical/batch.rs | 27 ++++++-------- src/historical/client.rs | 72 +++++++++++++++++++++++++++++++++++- src/historical/metadata.rs | 65 +++++++++++++------------------- src/historical/symbology.rs | 12 ++---- src/historical/timeseries.rs | 6 ++- 7 files changed, 158 insertions(+), 68 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b789e74..5f46fb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 0.5.0 - TBD + +#### Enhancements +- Improved error reporting for `HistoricalClient` when receiving an error from + Databento's API +- Added support for emitting warnings from historical API response headers, such as for + future deprecations + ## 0.4.2 - 2023-10-23 #### Enhancemets diff --git a/src/error.rs b/src/error.rs index a651f56..23c4458 100644 --- a/src/error.rs +++ b/src/error.rs @@ -19,6 +19,10 @@ pub enum Error { #[cfg(feature = "historical")] #[error("HTTP error: {0:?}")] Http(#[from] reqwest::Error), + /// An error from the Databento API. + #[cfg(feature = "historical")] + #[error("API error: {0}")] + Api(ApiError), /// An error internal to the client. #[error("internal error: {0}")] Internal(String), @@ -32,6 +36,20 @@ pub enum Error { /// An alias for a `Result` with [`databento::Error`](crate::Error) as the error type. pub type Result = std::result::Result; +/// An error from the Databento API. +#[cfg(feature = "historical")] +#[derive(Debug)] +pub struct ApiError { + /// The request ID. + pub request_id: Option, + /// The HTTP status code of the response. + pub status_code: reqwest::StatusCode, + /// The message from the Databento API. + pub message: String, + /// The link to documentation related to the error. + pub docs_url: Option, +} + impl Error { pub(crate) fn bad_arg(param_name: impl ToString, desc: impl ToString) -> Self { Self::BadArgument { @@ -54,3 +72,21 @@ impl From for Error { } } } + +#[cfg(feature = "historical")] +impl std::fmt::Display for ApiError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let doc = if let Some(ref docs_url) = self.docs_url { + format!(" See {docs_url} for documentation.") + } else { + String::new() + }; + let status = self.status_code; + let msg = &self.message; + if let Some(ref request_id) = self.request_id { + write!(f, "{request_id} failed with {status} {msg}{doc}") + } else { + write!(f, "{status} {msg}{doc}") + } + } +} diff --git a/src/historical/batch.rs b/src/historical/batch.rs index a0c21d7..95795ce 100644 --- a/src/historical/batch.rs +++ b/src/historical/batch.rs @@ -18,9 +18,9 @@ use time::OffsetDateTime; use tokio::io::BufWriter; use typed_builder::TypedBuilder; -use crate::{Error, Symbols}; +use crate::{historical::check_http_error, Error, Symbols}; -use super::DateTimeRange; +use super::{handle_response, DateTimeRange}; /// A client for the batch group of Historical API endpoints. pub struct BatchClient<'a> { @@ -63,7 +63,8 @@ impl BatchClient<'_> { form.push(("limit", limit.to_string())); } let builder = self.post("submit_job")?.form(&form); - Ok(builder.send().await?.error_for_status()?.json().await?) + let resp = builder.send().await?; + handle_response(resp).await } /// Lists previous batch jobs with filtering by `params`. @@ -87,7 +88,8 @@ impl BatchClient<'_> { if let Some(ref since) = params.since { builder = builder.query(&[("since", &since.unix_timestamp_nanos().to_string())]); } - Ok(builder.send().await?.error_for_status()?.json().await?) + let resp = builder.send().await?; + handle_response(resp).await } /// Lists all files associated with the batch job with ID `job_id`. @@ -96,14 +98,12 @@ impl BatchClient<'_> { /// This function returns an error when it fails to communicate with the Databento API /// or the API indicates there's an issue with the request. pub async fn list_files(&mut self, job_id: &str) -> crate::Result> { - Ok(self + let resp = self .get("list_files")? .query(&[("job_id", job_id)]) .send() - .await? - .error_for_status()? - .json() - .await?) + .await?; + handle_response(resp).await } /// Downloads the file specified in `params` or all files associated with the job ID. @@ -163,13 +163,8 @@ impl BatchClient<'_> { async fn download_file(&mut self, url: &str, path: impl AsRef) -> crate::Result<()> { let url = reqwest::Url::parse(url) .map_err(|e| Error::internal(format!("Unable to parse URL: {e:?}")))?; - let mut stream = self - .inner - .get_with_path(url.path())? - .send() - .await? - .error_for_status()? - .bytes_stream(); + let resp = self.inner.get_with_path(url.path())?.send().await?; + let mut stream = check_http_error(resp).await?.bytes_stream(); info!("Saving {url} to {}", path.as_ref().display()); let mut output = BufWriter::new( tokio::fs::OpenOptions::new() diff --git a/src/historical/client.rs b/src/historical/client.rs index 5ee9428..1b383e7 100644 --- a/src/historical/client.rs +++ b/src/historical/client.rs @@ -1,6 +1,8 @@ +use log::warn; use reqwest::{header::ACCEPT, IntoUrl, RequestBuilder, Url}; +use serde::Deserialize; -use crate::Error; +use crate::{error::ApiError, Error}; use super::{ batch::BatchClient, metadata::MetadataClient, symbology::SymbologyClient, @@ -25,7 +27,22 @@ pub struct Client { client: reqwest::Client, } +#[derive(Debug, Deserialize)] +#[serde(untagged)] +pub(crate) enum ApiErrorResponse { + Simple { detail: String }, + Business { detail: BusinessErrorDetails }, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct BusinessErrorDetails { + message: String, + docs: String, +} + const USER_AGENT: &str = concat!("Databento/", env!("CARGO_PKG_VERSION"), " Rust"); +const WARNING_HEADER: &str = "X-Warning"; +const REQUEST_ID_HEADER: &str = "request-id"; impl Client { /// Returns a type-safe builder for setting the required parameters @@ -133,6 +150,59 @@ impl Client { } } +pub(crate) async fn check_http_error( + response: reqwest::Response, +) -> crate::Result { + if response.status().is_success() { + Ok(response) + } else { + let request_id = response + .headers() + .get(REQUEST_ID_HEADER) + .and_then(|header| header.to_str().ok().map(ToOwned::to_owned)); + let status_code = response.status(); + Err(Error::Api( + match response.json::().await? { + ApiErrorResponse::Simple { detail: message } => ApiError { + request_id, + status_code, + message, + docs_url: None, + }, + ApiErrorResponse::Business { detail } => ApiError { + request_id, + status_code, + message: detail.message, + docs_url: Some(detail.docs), + }, + }, + )) + } +} + +pub(crate) async fn handle_response( + response: reqwest::Response, +) -> crate::Result { + check_warnings(&response); + let response = check_http_error(response).await?; + Ok(response.json::().await?) +} + +fn check_warnings(response: &reqwest::Response) { + if let Some(header) = response.headers().get(WARNING_HEADER) { + match serde_json::from_slice::>(header.as_bytes()) { + Ok(warnings) => { + for warning in warnings { + warn!("{warning}"); + } + } + Err(err) => { + warn!("Failed to parse server warnings from HTTP header: {err:?}"); + } + }; + }; +} + #[doc(hidden)] pub struct Unset; diff --git a/src/historical/metadata.rs b/src/historical/metadata.rs index 3f6b3d5..0cdf367 100644 --- a/src/historical/metadata.rs +++ b/src/historical/metadata.rs @@ -9,7 +9,7 @@ use typed_builder::TypedBuilder; use crate::Symbols; -use super::{AddToQuery, DateRange, DateTimeRange}; +use super::{handle_response, AddToQuery, DateRange, DateTimeRange}; /// A client for the metadata group of Historical API endpoints. pub struct MetadataClient<'a> { @@ -22,13 +22,8 @@ impl MetadataClient<'_> { /// # Errors /// This function returns an error when it fails to communicate with the Databento API. pub async fn list_publishers(&mut self) -> crate::Result> { - Ok(self - .get("list_publishers")? - .send() - .await? - .error_for_status()? - .json() - .await?) + let resp = self.get("list_publishers")?.send().await?; + handle_response(resp).await } /// Lists all available dataset codes on Databento. @@ -44,7 +39,8 @@ impl MetadataClient<'_> { if let Some(date_range) = date_range { builder = builder.add_to_query(&date_range); } - Ok(builder.send().await?.error_for_status()?.json().await?) + let resp = builder.send().await?; + handle_response(resp).await } /// Lists all available schemas for the given `dataset`. @@ -53,14 +49,12 @@ impl MetadataClient<'_> { /// This function returns an error when it fails to communicate with the Databento API /// or the API indicates there's an issue with the request. pub async fn list_schemas(&mut self, dataset: &str) -> crate::Result> { - Ok(self + let resp = self .get("list_schemas")? .query(&[("dataset", dataset)]) .send() - .await? - .error_for_status()? - .json() - .await?) + .await?; + handle_response(resp).await } /// Lists all fields for a schema and encoding. @@ -76,7 +70,8 @@ impl MetadataClient<'_> { ("encoding", params.encoding.as_str()), ("schema", params.schema.as_str()), ]); - Ok(builder.send().await?.error_for_status()?.json().await?) + let resp = builder.send().await?; + handle_response(resp).await } /// Lists unit prices for each data schema and feed mode in US dollars per gigabyte. @@ -91,7 +86,8 @@ impl MetadataClient<'_> { let builder = self .get("list_unit_prices")? .query(&[("dataset", &dataset)]); - Ok(builder.send().await?.error_for_status()?.json().await?) + let resp = builder.send().await?; + handle_response(resp).await } /// Gets the dataset condition from Databento. @@ -111,7 +107,8 @@ impl MetadataClient<'_> { if let Some(ref date_range) = params.date_range { builder = builder.add_to_query(date_range); } - Ok(builder.send().await?.error_for_status()?.json().await?) + let resp = builder.send().await?; + handle_response(resp).await } /// Gets the available range for the dataset from Databento. @@ -122,14 +119,12 @@ impl MetadataClient<'_> { /// This function returns an error when it fails to communicate with the Databento API /// or the API indicates there's an issue with the request. pub async fn get_dataset_range(&mut self, dataset: &str) -> crate::Result { - Ok(self + let resp = self .get("get_dataset_range")? .query(&[("dataset", dataset)]) .send() - .await? - .error_for_status()? - .json() - .await?) + .await?; + handle_response(resp).await } /// Gets the record count of the time series data query. @@ -138,14 +133,12 @@ impl MetadataClient<'_> { /// This function returns an error when it fails to communicate with the Databento API /// or the API indicates there's an issue with the request. pub async fn get_record_count(&mut self, params: &GetRecordCountParams) -> crate::Result { - Ok(self + let resp = self .get("get_record_count")? .add_to_query(params) .send() - .await? - .error_for_status()? - .json() - .await?) + .await?; + handle_response(resp).await } /// Gets the billable uncompressed raw binary size for historical streaming or @@ -158,14 +151,12 @@ impl MetadataClient<'_> { &mut self, params: &GetBillableSizeParams, ) -> crate::Result { - Ok(self + let resp = self .get("get_billable_size")? .add_to_query(params) .send() - .await? - .error_for_status()? - .json() - .await?) + .await?; + handle_response(resp).await } /// Gets the cost in US dollars for a historical streaming or batch download @@ -175,14 +166,8 @@ impl MetadataClient<'_> { /// This function returns an error when it fails to communicate with the Databento API /// or the API indicates there's an issue with the request. pub async fn get_cost(&mut self, params: &GetCostParams) -> crate::Result { - Ok(self - .get("get_cost")? - .add_to_query(params) - .send() - .await? - .error_for_status()? - .json() - .await?) + let resp = self.get("get_cost")?.add_to_query(params).send().await?; + handle_response(resp).await } fn get(&mut self, slug: &str) -> crate::Result { diff --git a/src/historical/symbology.rs b/src/historical/symbology.rs index 79c4ec0..c3d2b34 100644 --- a/src/historical/symbology.rs +++ b/src/historical/symbology.rs @@ -9,7 +9,7 @@ use typed_builder::TypedBuilder; use crate::Symbols; -use super::DateRange; +use super::{handle_response, DateRange}; /// A client for the symbology group of Historical API endpoints. pub struct SymbologyClient<'a> { @@ -32,14 +32,8 @@ impl SymbologyClient<'_> { ("symbols", params.symbols.to_api_string()), ]; params.date_range.add_to_form(&mut form); - Ok(self - .post("resolve")? - .form(&form) - .send() - .await? - .error_for_status()? - .json() - .await?) + let resp = self.post("resolve")?.form(&form).send().await?; + handle_response(resp).await } fn post(&mut self, slug: &str) -> crate::Result { diff --git a/src/historical/timeseries.rs b/src/historical/timeseries.rs index b8f4570..ca46941 100644 --- a/src/historical/timeseries.rs +++ b/src/historical/timeseries.rs @@ -10,7 +10,7 @@ use typed_builder::TypedBuilder; use crate::Symbols; -use super::DateTimeRange; +use super::{check_http_error, DateTimeRange}; // Re-export because it's returned. pub use dbn::decode::AsyncDbnDecoder; @@ -46,12 +46,14 @@ impl TimeseriesClient<'_> { if let Some(limit) = params.limit { form.push(("limit", limit.to_string())); } - let stream = self + let resp = self .post("get_range")? // unlike almost every other request, it's not JSON .header(ACCEPT, "application/octet-stream") .form(&form) .send() + .await?; + let stream = check_http_error(resp) .await? .error_for_status()? .bytes_stream() From e44cebaeb6aa126553ef1e23a09630ae82f98663 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 6 Nov 2023 13:54:51 -0700 Subject: [PATCH 04/10] ADD: Implement PartialEq, Eq for param builders --- CHANGELOG.md | 1 + src/historical/batch.rs | 8 +++++--- src/historical/metadata.rs | 6 +++--- src/historical/symbology.rs | 2 +- src/historical/timeseries.rs | 2 +- src/live.rs | 2 +- 6 files changed, 12 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f46fb9..ddfa9c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Databento's API - Added support for emitting warnings from historical API response headers, such as for future deprecations +- Added `PartialEq` and `Eq` implementations for parameter builder classes ## 0.4.2 - 2023-10-23 diff --git a/src/historical/batch.rs b/src/historical/batch.rs index 95795ce..f407687 100644 --- a/src/historical/batch.rs +++ b/src/historical/batch.rs @@ -241,7 +241,7 @@ pub enum JobState { /// The parameters for [`BatchClient::submit_job()`]. Use [`SubmitJobParams::builder()`] to /// get a builder type with all the preset defaults. -#[derive(Debug, Clone, TypedBuilder)] +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] pub struct SubmitJobParams { /// The dataset code. #[builder(setter(transform = |dt: impl ToString| dt.to_string()))] @@ -384,7 +384,7 @@ pub struct BatchJob { /// The parameters for [`BatchClient::list_jobs()`]. Use [`ListJobsParams::builder()`] to /// get a builder type with all the preset defaults. -#[derive(Debug, Clone, Default, TypedBuilder)] +#[derive(Debug, Clone, Default, TypedBuilder, PartialEq, Eq)] pub struct ListJobsParams { /// The optional filter for job states. #[builder(default, setter(strip_option))] @@ -410,11 +410,13 @@ pub struct BatchFileDesc { /// The parameters for [`BatchClient::download()`]. Use [`DownloadParams::builder()`] to /// get a builder type with all the preset defaults. -#[derive(Debug, Clone, TypedBuilder)] +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] pub struct DownloadParams { /// The directory to download the file(s) to. + #[builder(setter(transform = |dt: impl Into| dt.into()))] pub output_dir: PathBuf, /// The batch job identifier. + #[builder(setter(transform = |dt: impl ToString| dt.to_string()))] pub job_id: String, /// `None` means all files associated with the job will be downloaded. #[builder(default, setter(strip_option))] diff --git a/src/historical/metadata.rs b/src/historical/metadata.rs index 0cdf367..c8e93d5 100644 --- a/src/historical/metadata.rs +++ b/src/historical/metadata.rs @@ -215,7 +215,7 @@ pub struct PublisherDetail { /// The parameters for [`MetadataClient::list_fields()`]. Use /// [`ListFieldsParams::builder()`] to get a builder type with all the preset defaults. -#[derive(Debug, Clone, TypedBuilder)] +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] pub struct ListFieldsParams { /// The encoding to request fields for. pub encoding: Encoding, @@ -245,7 +245,7 @@ pub struct UnitPricesForMode { /// The parameters for [`MetadataClient::get_dataset_condition()`]. Use /// [`GetDatasetConditionParams::builder()`] to get a builder type with all the preset /// defaults. -#[derive(Debug, Clone, TypedBuilder)] +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] pub struct GetDatasetConditionParams { /// The dataset code. #[builder(setter(transform = |dataset: impl ToString| dataset.to_string()))] @@ -282,7 +282,7 @@ pub struct DatasetRange { } /// The parameters for several metadata requests. -#[derive(Debug, Clone, TypedBuilder)] +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] pub struct GetQueryParams { /// The dataset code. #[builder(setter(transform = |dataset: impl ToString| dataset.to_string()))] diff --git a/src/historical/symbology.rs b/src/historical/symbology.rs index c3d2b34..dc1435d 100644 --- a/src/historical/symbology.rs +++ b/src/historical/symbology.rs @@ -43,7 +43,7 @@ impl SymbologyClient<'_> { /// The parameters for [`SymbologyClient::resolve()`]. Use [`ResolveParams::builder()`] /// to get a builder type with all the preset defaults. -#[derive(Debug, Clone, TypedBuilder)] +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] pub struct ResolveParams { /// The dataset code. #[builder(setter(transform = |dt: impl ToString| dt.to_string()))] diff --git a/src/historical/timeseries.rs b/src/historical/timeseries.rs index ca46941..cefbab5 100644 --- a/src/historical/timeseries.rs +++ b/src/historical/timeseries.rs @@ -69,7 +69,7 @@ impl TimeseriesClient<'_> { /// The parameters for [`TimeseriesClient::get_range()`]. Use /// [`GetRangeParams::builder()`] to get a builder type with all the preset defaults. -#[derive(Debug, Clone, TypedBuilder)] +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] pub struct GetRangeParams { /// The dataset code. #[builder(setter(transform = |dt: impl ToString| dt.to_string()))] diff --git a/src/live.rs b/src/live.rs index 12a57fb..239bb5c 100644 --- a/src/live.rs +++ b/src/live.rs @@ -264,7 +264,7 @@ impl Client { } /// A subscription for real-time or intraday historical data. -#[derive(Debug, Clone, TypedBuilder)] +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] pub struct Subscription { /// The symbols of the instruments to subscribe to. #[builder(setter(into))] From bb7b7bb94dc95b684437aa452dcd04de4e3d40d6 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 9 Nov 2023 10:08:51 -0700 Subject: [PATCH 05/10] MOD: Improve API key error messages --- CHANGELOG.md | 1 + src/error.rs | 2 +- src/lib.rs | 27 ++++++++++++++++++++++++--- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ddfa9c6..8e808a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Enhancements - Improved error reporting for `HistoricalClient` when receiving an error from Databento's API +- Improved error messages around API keys - Added support for emitting warnings from historical API response headers, such as for future deprecations - Added `PartialEq` and `Eq` implementations for parameter builder classes diff --git a/src/error.rs b/src/error.rs index 23c4458..6a796ee 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,7 +5,7 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum Error { /// An invalid argument was passed to a function. - #[error("bad argument {param_name}: {desc}")] + #[error("bad argument `{param_name}`: {desc}")] BadArgument { /// The name of the parameter to which the bad argument was passed. param_name: String, diff --git a/src/lib.rs b/src/lib.rs index 57a2862..9ceaedc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -161,8 +161,19 @@ impl From> for Symbols { } pub(crate) fn validate_key(key: String) -> crate::Result { - if key.len() != API_KEY_LENGTH { - Err(Error::bad_arg("key", "expected to be 32-characters long")) + if key == "$YOUR_API_KEY" { + Err(Error::bad_arg( + "key", + "got placeholder API key '$YOUR_API_KEY'. Please pass a real API key", + )) + } else if key.len() != API_KEY_LENGTH { + Err(Error::bad_arg( + "key", + format!( + "expected to be 32-characters long, got {} characters", + key.len() + ), + )) } else if !key.is_ascii() { error!("API key '{key}' contains non-ASCII characters"); Err(Error::bad_arg( @@ -175,7 +186,17 @@ pub(crate) fn validate_key(key: String) -> crate::Result { } pub(crate) fn key_from_env() -> crate::Result { - std::env::var("DATABENTO_API_KEY").map_err(|e| Error::bad_arg("key", format!("{e:?}"))) + std::env::var("DATABENTO_API_KEY").map_err(|e| { + Error::bad_arg( + "key", + match e { + std::env::VarError::NotPresent => "tried to read API key from environment variable DATABENTO_API_KEY but it is not set", + std::env::VarError::NotUnicode(_) => { + "environment variable DATABENTO_API_KEY contains invalid unicode" + } + }, + ) + }) } #[cfg(feature = "historical")] From 2c2940a879ed15aaf5eb299f1a72c3a61560f7f4 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 9 Nov 2023 12:25:05 -0700 Subject: [PATCH 06/10] OPT: Improve CSV and JSON encoding with itoa --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e808a9..77ad257 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Improved error reporting for `HistoricalClient` when receiving an error from Databento's API - Improved error messages around API keys +- Improved performance of CSV and JSON encoding - Added support for emitting warnings from historical API response headers, such as for future deprecations - Added `PartialEq` and `Eq` implementations for parameter builder classes From 65745ffbeb0d74a596e0c4997249b78d42d6565f Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 13 Nov 2023 08:19:03 -0600 Subject: [PATCH 07/10] ADD: Add symbol map method to `Resolution` --- CHANGELOG.md | 2 + src/historical/symbology.rs | 77 ++++++++++++++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 77ad257..3d51514 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ - Improved performance of CSV and JSON encoding - Added support for emitting warnings from historical API response headers, such as for future deprecations +- Added `symbol_map` method to the `Resolution` struct returned by `symbology::resolve` + that returns a `TsSymbolMap` - Added `PartialEq` and `Eq` implementations for parameter builder classes ## 0.4.2 - 2023-10-23 diff --git a/src/historical/symbology.rs b/src/historical/symbology.rs index dc1435d..f2e8a32 100644 --- a/src/historical/symbology.rs +++ b/src/historical/symbology.rs @@ -1,8 +1,8 @@ //! The historical symbology API. -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; -use dbn::{MappingInterval, SType}; +use dbn::{MappingInterval, SType, TsSymbolMap}; use reqwest::RequestBuilder; use serde::Deserialize; use typed_builder::TypedBuilder; @@ -33,7 +33,18 @@ impl SymbologyClient<'_> { ]; params.date_range.add_to_form(&mut form); let resp = self.post("resolve")?.form(&form).send().await?; - handle_response(resp).await + let ResolutionResp { + mappings, + partial, + not_found, + } = handle_response(resp).await?; + Ok(Resolution { + mappings, + partial, + not_found, + stype_in: params.stype_in, + stype_out: params.stype_out, + }) } fn post(&mut self, slug: &str) -> crate::Result { @@ -65,17 +76,73 @@ pub struct ResolveParams { } /// A symbology resolution from one symbology type to another. -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone)] pub struct Resolution { /// A mapping from input symbol to a list of resolved symbols in the output /// symbology. - #[serde(rename = "result")] pub mappings: HashMap>, /// A list of symbols that were resolved for part, but not all of the date range /// from the request. pub partial: Vec, /// A list of symbols that were not resolved. pub not_found: Vec, + /// The input symbology type. + pub stype_in: SType, + /// The output symbology type. + pub stype_out: SType, +} + +impl Resolution { + /// Creates a symbology mapping from instrument ID and date to text symbol. + /// + /// # Errors + /// This function returns an error if it's unable to parse a symbol into an + /// instrument ID. + pub fn symbol_map(&self) -> crate::Result { + let mut map = TsSymbolMap::new(); + if self.stype_in == SType::InstrumentId { + for (iid, intervals) in self.mappings.iter() { + let iid = iid.parse().map_err(|_| { + crate::Error::internal(format!("Unable to parse '{iid}' to an instrument ID",)) + })?; + for interval in intervals { + map.insert( + iid, + interval.start_date, + interval.end_date, + Arc::new(interval.symbol.clone()), + )?; + } + } + } else { + for (raw_symbol, intervals) in self.mappings.iter() { + let raw_symbol = Arc::new(raw_symbol.clone()); + for interval in intervals { + let iid = interval.symbol.parse().map_err(|_| { + crate::Error::internal(format!( + "Unable to parse '{}' to an instrument ID", + interval.symbol + )) + })?; + map.insert( + iid, + interval.start_date, + interval.end_date, + raw_symbol.clone(), + )?; + } + } + } + Ok(map) + } +} + +#[derive(Debug, Clone, Deserialize)] +struct ResolutionResp { + #[serde(rename = "result")] + pub mappings: HashMap>, + pub partial: Vec, + pub not_found: Vec, } #[cfg(test)] From a81ec46a5579195436dd32842d7e672ad8e09817 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 15 Nov 2023 09:26:47 -0600 Subject: [PATCH 08/10] ADD: Add test for cancellation safety --- src/live.rs | 67 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/src/live.rs b/src/live.rs index 239bb5c..169f015 100644 --- a/src/live.rs +++ b/src/live.rs @@ -437,16 +437,16 @@ impl std::ops::Index for SymbolMap { #[cfg(test)] mod tests { - use std::{ffi::c_char, fmt}; + use std::{ffi::c_char, fmt, time::Duration}; use dbn::{ encode::AsyncDbnMetadataEncoder, enums::rtype, publishers::Dataset, record::{HasRType, OhlcvMsg, RecordHeader, TradeMsg, WithTsOut}, - MetadataBuilder, UNDEF_TIMESTAMP, + Mbp10Msg, MetadataBuilder, Record, UNDEF_TIMESTAMP, }; - use tokio::{join, net::TcpListener, sync::mpsc::UnboundedSender, task::JoinHandle}; + use tokio::{join, net::TcpListener, select, sync::mpsc::UnboundedSender, task::JoinHandle}; use super::*; @@ -469,7 +469,9 @@ mod tests { } async fn accept(&mut self) { - self.stream = Some(BufReader::new(self.listener.accept().await.unwrap().0)); + let stream = self.listener.accept().await.unwrap().0; + stream.set_nodelay(true).unwrap(); + self.stream = Some(BufReader::new(stream)); } async fn authenticate(&mut self) { @@ -530,7 +532,11 @@ mod tests { async fn send_record(&mut self, record: Box + Send>) { let bytes = (*record).as_ref(); - self.stream().write_all(bytes).await.unwrap() + // test for partial read bugs + let half = bytes.len() / 2; + self.stream().write_all(&bytes[..half]).await.unwrap(); + self.stream().flush().await.unwrap(); + self.stream().write_all(&bytes[half..]).await.unwrap(); } async fn read_line(&mut self) -> String { @@ -693,7 +699,7 @@ mod tests { let (mut fixture, mut client) = setup(Dataset::GlbxMdp3, false).await; fixture.start(); let metadata = client.start().await.unwrap(); - assert_eq!(metadata.version, 1); + assert_eq!(metadata.version, dbn::DBN_VERSION); assert!(metadata.schema.is_none()); assert_eq!(metadata.dataset, Dataset::GlbxMdp3.as_str()); fixture.send_record(REC); @@ -722,7 +728,7 @@ mod tests { let (mut fixture, mut client) = setup(Dataset::GlbxMdp3, true).await; fixture.start(); let metadata = client.start().await.unwrap(); - assert_eq!(metadata.version, 1); + assert_eq!(metadata.version, dbn::DBN_VERSION); assert!(metadata.schema.is_none()); assert_eq!(metadata.dataset, Dataset::GlbxMdp3.as_str()); fixture.send_record(expected.clone()); @@ -842,4 +848,51 @@ mod tests { r1.unwrap(); r2.unwrap(); } + + #[tokio::test] + #[ignore = "waiting for new DBN release"] + async fn test_cancellation_safety() { + let (mut fixture, mut client) = setup(Dataset::GlbxMdp3, true).await; + fixture.start(); + let metadata = client.start().await.unwrap(); + assert_eq!(metadata.version, dbn::DBN_VERSION); + assert!(metadata.schema.is_none()); + assert_eq!(metadata.dataset, Dataset::GlbxMdp3.as_str()); + fixture.send_record(Mbp10Msg::default()); + + let mut int_1 = tokio::time::interval(Duration::from_millis(1)); + let mut int_2 = tokio::time::interval(Duration::from_millis(1)); + let mut int_3 = tokio::time::interval(Duration::from_millis(1)); + let mut int_4 = tokio::time::interval(Duration::from_millis(1)); + let mut int_5 = tokio::time::interval(Duration::from_millis(1)); + let mut int_6 = tokio::time::interval(Duration::from_millis(1)); + for _ in 0..1_000 { + select! { + _ = int_1.tick() => { + fixture.send_record(Mbp10Msg::default()); + } + _ = int_2.tick() => { + fixture.send_record(Mbp10Msg::default()); + } + _ = int_3.tick() => { + fixture.send_record(Mbp10Msg::default()); + } + _ = int_4.tick() => { + fixture.send_record(Mbp10Msg::default()); + } + _ = int_5.tick() => { + fixture.send_record(Mbp10Msg::default()); + } + _ = int_6.tick() => { + fixture.send_record(Mbp10Msg::default()); + } + res = client.next_record() => { + let rec = res.unwrap().unwrap(); + dbg!(rec.header()); + assert_eq!(*rec.get::().unwrap(), Mbp10Msg::default()); + } + } + } + fixture.stop().await; + } } From 6d45356d1dbde76f0707f27cc4e1aec30de0694d Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 16 Nov 2023 18:52:37 -0600 Subject: [PATCH 09/10] FIX: Fix new clippy lints --- src/historical/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/historical/batch.rs b/src/historical/batch.rs index f407687..528cbd8 100644 --- a/src/historical/batch.rs +++ b/src/historical/batch.rs @@ -180,7 +180,7 @@ impl BatchClient<'_> { Ok(()) } - const PATH_PREFIX: &str = "batch"; + const PATH_PREFIX: &'static str = "batch"; fn get(&mut self, slug: &str) -> crate::Result { self.inner.get(&format!("{}.{slug}", Self::PATH_PREFIX)) From 4609c6b4e9c9b3e89dc132f93803d1240015b040 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Thu, 23 Nov 2023 14:33:44 +1100 Subject: [PATCH 10/10] MOD: Upgrade databento-rs to new DBN --- CHANGELOG.md | 41 ++++++++++-- Cargo.toml | 2 +- src/error.rs | 1 + src/historical/timeseries.rs | 9 ++- src/live.rs | 118 ++++++++++++++++++++++++++--------- 5 files changed, 136 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d51514..40dd9da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,22 @@ # Changelog -## 0.5.0 - TBD +## 0.5.0 - 2023-11-23 + +This release adds support for DBN v2. + +DBN v2 delivers improvements to the `Metadata` header symbology, new `stype_in` and `stype_out` +fields for `SymbolMappingMsg`, and extends the symbol field length for `SymbolMappingMsg` and +`InstrumentDefMsg`. The entire change notes are available [here](https://github.com/databento/dbn/releases/tag/v0.14.0). +Users who wish to convert DBN v1 files to v2 can use the `dbn-cli` tool available in the [databento-dbn](https://github.com/databento/dbn/) crate. +On a future date, the Databento live and historical APIs will stop serving DBN v1. + +This release is fully compatible with both DBN v1 and v2, and so should be seamless for most users. #### Enhancements +- Made `LiveClient::next_record`, `dbn::decode::AsyncDbnDecoder::decode_record` and + `decode_record_ref`, and `dbn::decode::AsyncRecordDecoder::decode` and `decode_ref` + cancel safe. This makes them safe to use within a + [`tokio::select!`](https://docs.rs/tokio/latest/tokio/macro.select.html) statement - Improved error reporting for `HistoricalClient` when receiving an error from Databento's API - Improved error messages around API keys @@ -12,6 +26,28 @@ - Added `symbol_map` method to the `Resolution` struct returned by `symbology::resolve` that returns a `TsSymbolMap` - Added `PartialEq` and `Eq` implementations for parameter builder classes +- Added `upgrade_policy` setter to the `LiveClient` builder and a getter to the + `LiveClient` +- Added `upgrade_policy` optional setter to the `timeseries::GetRangeParams` builder + +#### Breaking changes +- Upgraded `dbn` to 0.14.2. There are several breaking changes in this release as we + begin migrating to DBN encoding version 2 (DBNv2) in order to support the ICE + exchange: + - Renamed `dbn::InstrumentDefMsg` to `dbn::compat::InstrumentDefMsgV1` and added a + new `dbn::InstrumentDefMsg` with a longer `raw_symbol` field + - Renamed `dbn::SymbolMappingMsg` to `dbn::compat::SymbolMappingMsgV1` and added a + new `dbn::SymbolMappingMsg` with longer symbol fields and new `stype_in` and + `stype_out` fields + - Added `symbol_cstr_len` field to `dbn::Metadata` +- Made `Error` non-exhaustive, meaning it no longer be exhaustively matched against, and + new variants can be added in the future without a breaking change +- Added an `upgrade_policy` parameter to `LiveClient::connect` and `connect_with_addr`. + The builder provides a more stable API since new parameters are usually introduced as + optional + +#### Deprecations +- Deprecated `live::SymbolMap` in favor of `databento::dbn::PitSymbolMap` ## 0.4.2 - 2023-10-23 @@ -20,9 +56,6 @@ - Upgraded `tokio` to 1.33 - Upgraded `typed-builder` to 0.17 -##### Deprecations -- Deprecated `live::SymbolMap` in favor of `databento::dbn::PitSymbolMap` - #### Bug fixes - Fixed panic in `LiveClient` when gateway returned an auth response without the `success` key diff --git a/Cargo.toml b/Cargo.toml index 797359d..7481157 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ live = ["dep:hex", "dep:sha2", "tokio/net"] [dependencies] # binary encoding -dbn = { version = "0.13.0", features = ["async", "serde"] } +dbn = { version = "0.14.2", features = ["async", "serde"] } # Async stream trait futures = { version = "0.3", optional = true } # Hex encoding used for Live authentication diff --git a/src/error.rs b/src/error.rs index 6a796ee..ede2628 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,6 +3,7 @@ use thiserror::Error; /// An error that can occur while working with Databento's API. #[derive(Debug, Error)] +#[non_exhaustive] pub enum Error { /// An invalid argument was passed to a function. #[error("bad argument `{param_name}`: {desc}")] diff --git a/src/historical/timeseries.rs b/src/historical/timeseries.rs index cefbab5..19ae5c9 100644 --- a/src/historical/timeseries.rs +++ b/src/historical/timeseries.rs @@ -2,7 +2,7 @@ use std::num::NonZeroU64; -use dbn::{Compression, Encoding, SType, Schema}; +use dbn::{Compression, Encoding, SType, Schema, VersionUpgradePolicy}; use futures::TryStreamExt; use reqwest::{header::ACCEPT, RequestBuilder}; use tokio::io::AsyncReadExt; @@ -59,7 +59,9 @@ impl TimeseriesClient<'_> { .bytes_stream() .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); let reader = tokio_util::io::StreamReader::new(stream); - Ok(AsyncDbnDecoder::with_zstd_buffer(reader).await?) + let mut decoder: AsyncDbnDecoder<_> = AsyncDbnDecoder::with_zstd_buffer(reader).await?; + decoder.set_upgrade_policy(params.upgrade_policy); + Ok(decoder) } fn post(&mut self, slug: &str) -> crate::Result { @@ -93,6 +95,9 @@ pub struct GetRangeParams { /// The optional maximum number of records to return. Defaults to no limit. #[builder(default)] pub limit: Option, + /// How to decode DBN from prior versions. Defaults to as-is. + #[builder(default)] + pub upgrade_policy: VersionUpgradePolicy, } #[cfg(test)] diff --git a/src/live.rs b/src/live.rs index 169f015..967dba9 100644 --- a/src/live.rs +++ b/src/live.rs @@ -3,8 +3,9 @@ use std::collections::HashMap; use dbn::{ + compat::SymbolMappingMsgV1, decode::dbn::{AsyncMetadataDecoder, AsyncRecordDecoder}, - Metadata, RecordRef, SType, Schema, SymbolMappingMsg, + Metadata, PitSymbolMap, RecordRef, SType, Schema, SymbolMappingMsg, VersionUpgradePolicy, }; use hex::ToHex; use log::{debug, error, info}; @@ -26,6 +27,7 @@ pub struct Client { key: String, dataset: String, send_ts_out: bool, + upgrade_policy: VersionUpgradePolicy, connection: WriteHalf, decoder: AsyncRecordDecoder>>, session_id: String, @@ -45,8 +47,20 @@ impl Client { /// # Errors /// This function returns an error when `key` is invalid or its unable to connect /// and authenticate with the Live gateway. - pub async fn connect(key: String, dataset: String, send_ts_out: bool) -> crate::Result { - Self::connect_with_addr(Self::determine_gateway(&dataset), key, dataset, send_ts_out).await + pub async fn connect( + key: String, + dataset: String, + send_ts_out: bool, + upgrade_policy: VersionUpgradePolicy, + ) -> crate::Result { + Self::connect_with_addr( + Self::determine_gateway(&dataset), + key, + dataset, + send_ts_out, + upgrade_policy, + ) + .await } /// Creates a new client connected to the Live gateway at `addr`. This is an advanced method and generally @@ -60,6 +74,7 @@ impl Client { key: String, dataset: String, send_ts_out: bool, + upgrade_policy: VersionUpgradePolicy, ) -> crate::Result { let key = validate_key(key)?; let stream = TcpStream::connect(addr).await?; @@ -74,8 +89,12 @@ impl Client { key, dataset, send_ts_out, + upgrade_policy, connection: writer, - decoder: AsyncRecordDecoder::new(reader), + // Pass a placeholder DBN version and should never fail because DBN_VERSION + // is a valid DBN version. Correct version set in `start()`. + decoder: AsyncRecordDecoder::with_version(reader, dbn::DBN_VERSION, upgrade_policy) + .unwrap(), session_id, }) } @@ -101,6 +120,11 @@ impl Client { self.send_ts_out } + /// Returns the upgrade policy for decoding DBN from previous versions. + pub fn upgrade_policy(&self) -> VersionUpgradePolicy { + self.upgrade_policy + } + fn determine_gateway(dataset: &str) -> String { const DEFAULT_PORT: u16 = 13_000; @@ -244,9 +268,12 @@ impl Client { pub async fn start(&mut self) -> crate::Result { info!("[{}] Starting session", self.dataset); self.connection.write_all(b"start_session\n").await?; - Ok(AsyncMetadataDecoder::new(self.decoder.get_mut()) + let mut metadata = AsyncMetadataDecoder::new(self.decoder.get_mut()) .decode() - .await?) + .await?; + self.decoder.set_version(metadata.version)?; + metadata.upgrade(self.upgrade_policy); + Ok(metadata) } /// Fetches the next record. This method should only be called after the session has @@ -294,6 +321,7 @@ pub struct ClientBuilder { key: AK, dataset: D, send_ts_out: bool, + upgrade_policy: VersionUpgradePolicy, } impl Default for ClientBuilder { @@ -302,6 +330,7 @@ impl Default for ClientBuilder { key: Unset, dataset: Unset, send_ts_out: false, + upgrade_policy: VersionUpgradePolicy::AsIs, } } } @@ -313,6 +342,13 @@ impl ClientBuilder { self.send_ts_out = send_ts_out; self } + + /// Sets `upgrade_policy`, which controls how to decode data from prior DBN + /// versions. The current default is to decode them as-is. + pub fn upgrade_policy(mut self, upgrade_policy: VersionUpgradePolicy) -> Self { + self.upgrade_policy = upgrade_policy; + self + } } impl ClientBuilder { @@ -332,6 +368,7 @@ impl ClientBuilder { key: crate::validate_key(key.to_string())?, dataset: self.dataset, send_ts_out: self.send_ts_out, + upgrade_policy: self.upgrade_policy, }) } @@ -354,6 +391,7 @@ impl ClientBuilder { key: self.key, dataset: dataset.to_string(), send_ts_out: self.send_ts_out, + upgrade_policy: self.upgrade_policy, } } } @@ -365,67 +403,79 @@ impl ClientBuilder { /// This function returns an error when its unable /// to connect and authenticate with the Live gateway. pub async fn build(self) -> crate::Result { - Client::connect(self.key, self.dataset, self.send_ts_out).await + Client::connect( + self.key, + self.dataset, + self.send_ts_out, + self.upgrade_policy, + ) + .await } } /// Manages the mapping between the instrument IDs included in each record and /// a text symbology. #[derive(Debug, Clone, Default)] +#[deprecated( + since = "0.5.0", + note = "dbn::PitSymbolMap provides identical functionality and also works with historical data" +)] pub struct SymbolMap { - symbol_map: HashMap, + inner: PitSymbolMap, } +#[allow(deprecated)] impl SymbolMap { /// Creates a new `SymbolMap` instance. pub fn new() -> Self { Self::default() } - /// Handles updating the mappings (if required) for a generic record. + /// Handles updating the mappings (if required) from a generic record. /// /// # Errors /// This function returns an error when `record` contains a [`SymbolMappingMsg`] but /// it contains invalid UTF-8. pub fn on_record(&mut self, record: RecordRef) -> crate::Result<()> { - if let Some(symbol_mapping) = record.get::() { - self.on_symbol_mapping(symbol_mapping) - } else { - Ok(()) - } + Ok(self.inner.on_record(record)?) } - /// Handles updating the mappings for a . + /// Handles updating the mappings from a [`SymbolMappingMsg`]. /// /// # Errors /// This function returns an error when `symbol_mapping` contains invalid UTF-8. pub fn on_symbol_mapping(&mut self, symbol_mapping: &SymbolMappingMsg) -> crate::Result<()> { - let stype_out_symbol = symbol_mapping.stype_out_symbol()?; - info!( - "Updated symbol mapping for {} to {}", - symbol_mapping.hd.instrument_id, stype_out_symbol - ); - self.symbol_map - .insert(symbol_mapping.hd.instrument_id, stype_out_symbol.to_owned()); - Ok(()) + Ok(self.inner.on_symbol_mapping(symbol_mapping)?) + } + + /// Handles updating the mappings from a [`SymbolMappingMsgV1`]. + /// + /// # Errors + /// This function returns an error when `symbol_mapping` contains invalid UTF-8. + pub fn on_symbol_mapping_v1( + &mut self, + symbol_mapping: &SymbolMappingMsgV1, + ) -> crate::Result<()> { + Ok(self.inner.on_symbol_mapping(symbol_mapping)?) } /// Returns a reference to the mapping for the given instrument ID. pub fn get(&self, instrument_id: u32) -> Option<&String> { - self.symbol_map.get(&instrument_id) + self.inner.get(instrument_id) } /// Returns a reference to the inner map. pub fn inner(&self) -> &HashMap { - &self.symbol_map + self.inner.inner() } /// Returns a mutable reference to the inner map. pub fn inner_mut(&mut self) -> &mut HashMap { - &mut self.symbol_map + self.inner.inner_mut() } } +#[allow(deprecated)] impl std::ops::Index for SymbolMap { type Output = String; @@ -436,6 +486,7 @@ impl std::ops::Index for SymbolMap { } #[cfg(test)] +#[allow(deprecated)] mod tests { use std::{ffi::c_char, fmt, time::Duration}; @@ -648,6 +699,7 @@ mod tests { "32-character-with-lots-of-filler".to_owned(), dataset.to_string(), send_ts_out, + VersionUpgradePolicy::AsIs, ) .await .unwrap(); @@ -746,7 +798,9 @@ mod tests { fixture.send_record(SymbolMappingMsg::new( 1, 2, + SType::RawSymbol, "", + SType::RawSymbol, "AAPL", UNDEF_TIMESTAMP, UNDEF_TIMESTAMP, @@ -754,7 +808,9 @@ mod tests { fixture.send_record(SymbolMappingMsg::new( 2, 2, + SType::RawSymbol, "", + SType::RawSymbol, "TSLA", UNDEF_TIMESTAMP, UNDEF_TIMESTAMP, @@ -762,7 +818,9 @@ mod tests { fixture.send_record(SymbolMappingMsg::new( 3, 2, + SType::RawSymbol, "", + SType::RawSymbol, "MSFT", UNDEF_TIMESTAMP, UNDEF_TIMESTAMP, @@ -781,7 +839,9 @@ mod tests { fixture.send_record(SymbolMappingMsg::new( 10, 2, + SType::RawSymbol, "", + SType::RawSymbol, "AAPL", UNDEF_TIMESTAMP, UNDEF_TIMESTAMP, @@ -798,7 +858,9 @@ mod tests { fixture.send_record(SymbolMappingMsg::new( 9, 2, + SType::RawSymbol, "", + SType::RawSymbol, "MSFT", UNDEF_TIMESTAMP, UNDEF_TIMESTAMP, @@ -830,6 +892,7 @@ mod tests { "32-character-with-lots-of-filler".to_owned(), DATASET.to_string(), false, + VersionUpgradePolicy::AsIs, ) .await; if let Err(e) = &res { @@ -850,7 +913,6 @@ mod tests { } #[tokio::test] - #[ignore = "waiting for new DBN release"] async fn test_cancellation_safety() { let (mut fixture, mut client) = setup(Dataset::GlbxMdp3, true).await; fixture.start(); @@ -866,7 +928,7 @@ mod tests { let mut int_4 = tokio::time::interval(Duration::from_millis(1)); let mut int_5 = tokio::time::interval(Duration::from_millis(1)); let mut int_6 = tokio::time::interval(Duration::from_millis(1)); - for _ in 0..1_000 { + for _ in 0..5_000 { select! { _ = int_1.tick() => { fixture.send_record(Mbp10Msg::default());