Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VER: Release 0.5.0 #11

Merged
merged 10 commits into from
Nov 23, 2023
52 changes: 49 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,61 @@
# Changelog

## 0.5.0 - 2023-11-23

This release adds support for DBN v2.

DBN v2 delivers improvements to the `Metadata` header symbology, new `stype_in` and `stype_out`
fields for `SymbolMappingMsg`, and extends the symbol field length for `SymbolMappingMsg` and
`InstrumentDefMsg`. The entire change notes are available [here](https://github.com/databento/dbn/releases/tag/v0.14.0).
Users who wish to convert DBN v1 files to v2 can use the `dbn-cli` tool available in the [databento-dbn](https://github.com/databento/dbn/) crate.
On a future date, the Databento live and historical APIs will stop serving DBN v1.

This release is fully compatible with both DBN v1 and v2, and so should be seamless for most users.

#### Enhancements
- Made `LiveClient::next_record`, `dbn::decode::AsyncDbnDecoder::decode_record` and
`decode_record_ref`, and `dbn::decode::AsyncRecordDecoder::decode` and `decode_ref`
cancel safe. This makes them safe to use within a
[`tokio::select!`](https://docs.rs/tokio/latest/tokio/macro.select.html) statement
- Improved error reporting for `HistoricalClient` when receiving an error from
Databento's API
- Improved error messages around API keys
- Improved performance of CSV and JSON encoding
- Added support for emitting warnings from historical API response headers, such as for
future deprecations
- Added `symbol_map` method to the `Resolution` struct returned by `symbology::resolve`
that returns a `TsSymbolMap`
- Added `PartialEq` and `Eq` implementations for parameter builder classes
- Added `upgrade_policy` setter to the `LiveClient` builder and a getter to the
`LiveClient`
- Added `upgrade_policy` optional setter to the `timeseries::GetRangeParams` builder

#### Breaking changes
- Upgraded `dbn` to 0.14.2. There are several breaking changes in this release as we
begin migrating to DBN encoding version 2 (DBNv2) in order to support the ICE
exchange:
- Renamed `dbn::InstrumentDefMsg` to `dbn::compat::InstrumentDefMsgV1` and added a
new `dbn::InstrumentDefMsg` with a longer `raw_symbol` field
- Renamed `dbn::SymbolMappingMsg` to `dbn::compat::SymbolMappingMsgV1` and added a
new `dbn::SymbolMappingMsg` with longer symbol fields and new `stype_in` and
`stype_out` fields
- Added `symbol_cstr_len` field to `dbn::Metadata`
- Made `Error` non-exhaustive, meaning it no longer be exhaustively matched against, and
new variants can be added in the future without a breaking change
- Added an `upgrade_policy` parameter to `LiveClient::connect` and `connect_with_addr`.
The builder provides a more stable API since new parameters are usually introduced as
optional

#### Deprecations
- Deprecated `live::SymbolMap` in favor of `databento::dbn::PitSymbolMap`

## 0.4.2 - 2023-10-23

#### Enhancemets
- Upgraded `dbn` to 0.13.0 for improvements to symbology helpers
- Upgraded `tokio` to 1.33
- Upgraded `typed-builder` to 0.17

##### Deprecations
- Deprecated `live::SymbolMap` in favor of `databento::dbn::PitSymbolMap`

#### Bug fixes
- Fixed panic in `LiveClient` when gateway returned an auth response without the
`success` key
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ live = ["dep:hex", "dep:sha2", "tokio/net"]

[dependencies]
# binary encoding
dbn = { version = "0.13.0", features = ["async", "serde"] }
dbn = { version = "0.14.2", features = ["async", "serde"] }
# Async stream trait
futures = { version = "0.3", optional = true }
# Hex encoding used for Live authentication
Expand Down
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# databento-rs

[![build](https://github.com/databento/databento-rs/actions/workflows/build.yaml/badge.svg)](https://github.com/databento/dbn/actions/workflows/build.yaml)
[![Documentation](https://img.shields.io/docsrs/databento)](https://docs.rs/databento/latest/databento/)
[![license](https://img.shields.io/github/license/databento/databento-rs?color=blue)](./LICENSE)
[![Current Crates.io Version](https://img.shields.io/crates/v/databento.svg)](https://crates.io/crates/databento)
[![Slack](https://img.shields.io/badge/join_Slack-community-darkblue.svg?logo=slack)](https://join.slack.com/t/databento-hq/shared_invite/zt-24oqyrub9-MellISM2cdpQ7s_7wcXosw)
Expand All @@ -27,8 +28,8 @@ Here is a simple program that fetches the next ES mini futures trade:
use std::error::Error;

use databento::{
dbn::{Dataset, SType, Schema, TradeMsg},
live::{Subscription, SymbolMap},
dbn::{Dataset, PitSymbolMap, SType, Schema, TradeMsg},
live::Subscription,
LiveClient,
};

Expand All @@ -51,13 +52,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
.unwrap();
client.start().await?;

let mut symbol_map = SymbolMap::new();
let mut symbol_map = PitSymbolMap::new();
// Get the next trade
loop {
let rec = client.next_record().await?.unwrap();
if let Some(trade) = rec.get::<TradeMsg>() {
let symbol = &symbol_map[trade.hd.instrument_id];
println!("Received trade for {symbol}: {trade:?}",);
let symbol = &symbol_map[trade];
println!("Received trade for {symbol}: {trade:?}");
break;
}
symbol_map.on_record(rec)?;
Expand All @@ -78,7 +79,7 @@ use databento::{
historical::timeseries::GetRangeParams,
HistoricalClient, Symbols,
};
use time::macros::datetime;
use time::macros::{date, datetime};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand All @@ -97,8 +98,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
.build(),
)
.await?;
let symbol_map = decoder
.metadata()
.symbol_map_for_date(date!(2022 - 06 - 10))?;
while let Some(trade) = decoder.decode_record::<TradeMsg>().await? {
println!("{trade:?}");
let symbol = &symbol_map[trade];
println!("Received trade for {symbol}: {trade:?}");
}
Ok(())
}
Expand Down
39 changes: 38 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use thiserror::Error;

/// An error that can occur while working with Databento's API.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
/// An invalid argument was passed to a function.
#[error("bad argument {param_name}: {desc}")]
#[error("bad argument `{param_name}`: {desc}")]
BadArgument {
/// The name of the parameter to which the bad argument was passed.
param_name: String,
Expand All @@ -19,6 +20,10 @@ pub enum Error {
#[cfg(feature = "historical")]
#[error("HTTP error: {0:?}")]
Http(#[from] reqwest::Error),
/// An error from the Databento API.
#[cfg(feature = "historical")]
#[error("API error: {0}")]
Api(ApiError),
/// An error internal to the client.
#[error("internal error: {0}")]
Internal(String),
Expand All @@ -32,6 +37,20 @@ pub enum Error {
/// An alias for a `Result` with [`databento::Error`](crate::Error) as the error type.
pub type Result<T> = std::result::Result<T, Error>;

/// An error from the Databento API.
#[cfg(feature = "historical")]
#[derive(Debug)]
pub struct ApiError {
/// The request ID.
pub request_id: Option<String>,
/// The HTTP status code of the response.
pub status_code: reqwest::StatusCode,
/// The message from the Databento API.
pub message: String,
/// The link to documentation related to the error.
pub docs_url: Option<String>,
}

impl Error {
pub(crate) fn bad_arg(param_name: impl ToString, desc: impl ToString) -> Self {
Self::BadArgument {
Expand All @@ -54,3 +73,21 @@ impl From<dbn::Error> for Error {
}
}
}

#[cfg(feature = "historical")]
impl std::fmt::Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let doc = if let Some(ref docs_url) = self.docs_url {
format!(" See {docs_url} for documentation.")
} else {
String::new()
};
let status = self.status_code;
let msg = &self.message;
if let Some(ref request_id) = self.request_id {
write!(f, "{request_id} failed with {status} {msg}{doc}")
} else {
write!(f, "{status} {msg}{doc}")
}
}
}
37 changes: 17 additions & 20 deletions src/historical/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use time::OffsetDateTime;
use tokio::io::BufWriter;
use typed_builder::TypedBuilder;

use crate::{Error, Symbols};
use crate::{historical::check_http_error, Error, Symbols};

use super::DateTimeRange;
use super::{handle_response, DateTimeRange};

/// A client for the batch group of Historical API endpoints.
pub struct BatchClient<'a> {
Expand Down Expand Up @@ -63,7 +63,8 @@ impl BatchClient<'_> {
form.push(("limit", limit.to_string()));
}
let builder = self.post("submit_job")?.form(&form);
Ok(builder.send().await?.error_for_status()?.json().await?)
let resp = builder.send().await?;
handle_response(resp).await
}

/// Lists previous batch jobs with filtering by `params`.
Expand All @@ -87,7 +88,8 @@ impl BatchClient<'_> {
if let Some(ref since) = params.since {
builder = builder.query(&[("since", &since.unix_timestamp_nanos().to_string())]);
}
Ok(builder.send().await?.error_for_status()?.json().await?)
let resp = builder.send().await?;
handle_response(resp).await
}

/// Lists all files associated with the batch job with ID `job_id`.
Expand All @@ -96,14 +98,12 @@ impl BatchClient<'_> {
/// This function returns an error when it fails to communicate with the Databento API
/// or the API indicates there's an issue with the request.
pub async fn list_files(&mut self, job_id: &str) -> crate::Result<Vec<BatchFileDesc>> {
Ok(self
let resp = self
.get("list_files")?
.query(&[("job_id", job_id)])
.send()
.await?
.error_for_status()?
.json()
.await?)
.await?;
handle_response(resp).await
}

/// Downloads the file specified in `params` or all files associated with the job ID.
Expand Down Expand Up @@ -163,13 +163,8 @@ impl BatchClient<'_> {
async fn download_file(&mut self, url: &str, path: impl AsRef<Path>) -> crate::Result<()> {
let url = reqwest::Url::parse(url)
.map_err(|e| Error::internal(format!("Unable to parse URL: {e:?}")))?;
let mut stream = self
.inner
.get_with_path(url.path())?
.send()
.await?
.error_for_status()?
.bytes_stream();
let resp = self.inner.get_with_path(url.path())?.send().await?;
let mut stream = check_http_error(resp).await?.bytes_stream();
info!("Saving {url} to {}", path.as_ref().display());
let mut output = BufWriter::new(
tokio::fs::OpenOptions::new()
Expand All @@ -185,7 +180,7 @@ impl BatchClient<'_> {
Ok(())
}

const PATH_PREFIX: &str = "batch";
const PATH_PREFIX: &'static str = "batch";

fn get(&mut self, slug: &str) -> crate::Result<RequestBuilder> {
self.inner.get(&format!("{}.{slug}", Self::PATH_PREFIX))
Expand Down Expand Up @@ -246,7 +241,7 @@ pub enum JobState {

/// The parameters for [`BatchClient::submit_job()`]. Use [`SubmitJobParams::builder()`] to
/// get a builder type with all the preset defaults.
#[derive(Debug, Clone, TypedBuilder)]
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct SubmitJobParams {
/// The dataset code.
#[builder(setter(transform = |dt: impl ToString| dt.to_string()))]
Expand Down Expand Up @@ -389,7 +384,7 @@ pub struct BatchJob {

/// The parameters for [`BatchClient::list_jobs()`]. Use [`ListJobsParams::builder()`] to
/// get a builder type with all the preset defaults.
#[derive(Debug, Clone, Default, TypedBuilder)]
#[derive(Debug, Clone, Default, TypedBuilder, PartialEq, Eq)]
pub struct ListJobsParams {
/// The optional filter for job states.
#[builder(default, setter(strip_option))]
Expand All @@ -415,11 +410,13 @@ pub struct BatchFileDesc {

/// The parameters for [`BatchClient::download()`]. Use [`DownloadParams::builder()`] to
/// get a builder type with all the preset defaults.
#[derive(Debug, Clone, TypedBuilder)]
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct DownloadParams {
/// The directory to download the file(s) to.
#[builder(setter(transform = |dt: impl Into<PathBuf>| dt.into()))]
pub output_dir: PathBuf,
/// The batch job identifier.
#[builder(setter(transform = |dt: impl ToString| dt.to_string()))]
pub job_id: String,
/// `None` means all files associated with the job will be downloaded.
#[builder(default, setter(strip_option))]
Expand Down
72 changes: 71 additions & 1 deletion src/historical/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use log::warn;
use reqwest::{header::ACCEPT, IntoUrl, RequestBuilder, Url};
use serde::Deserialize;

use crate::Error;
use crate::{error::ApiError, Error};

use super::{
batch::BatchClient, metadata::MetadataClient, symbology::SymbologyClient,
Expand All @@ -25,7 +27,22 @@ pub struct Client {
client: reqwest::Client,
}

#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub(crate) enum ApiErrorResponse {
Simple { detail: String },
Business { detail: BusinessErrorDetails },
}

#[derive(Debug, Deserialize)]
pub(crate) struct BusinessErrorDetails {
message: String,
docs: String,
}

const USER_AGENT: &str = concat!("Databento/", env!("CARGO_PKG_VERSION"), " Rust");
const WARNING_HEADER: &str = "X-Warning";
const REQUEST_ID_HEADER: &str = "request-id";

impl Client {
/// Returns a type-safe builder for setting the required parameters
Expand Down Expand Up @@ -133,6 +150,59 @@ impl Client {
}
}

pub(crate) async fn check_http_error(
response: reqwest::Response,
) -> crate::Result<reqwest::Response> {
if response.status().is_success() {
Ok(response)
} else {
let request_id = response
.headers()
.get(REQUEST_ID_HEADER)
.and_then(|header| header.to_str().ok().map(ToOwned::to_owned));
let status_code = response.status();
Err(Error::Api(
match response.json::<ApiErrorResponse>().await? {
ApiErrorResponse::Simple { detail: message } => ApiError {
request_id,
status_code,
message,
docs_url: None,
},
ApiErrorResponse::Business { detail } => ApiError {
request_id,
status_code,
message: detail.message,
docs_url: Some(detail.docs),
},
},
))
}
}

pub(crate) async fn handle_response<R: serde::de::DeserializeOwned>(
response: reqwest::Response,
) -> crate::Result<R> {
check_warnings(&response);
let response = check_http_error(response).await?;
Ok(response.json::<R>().await?)
}

fn check_warnings(response: &reqwest::Response) {
if let Some(header) = response.headers().get(WARNING_HEADER) {
match serde_json::from_slice::<Vec<String>>(header.as_bytes()) {
Ok(warnings) => {
for warning in warnings {
warn!("{warning}");
}
}
Err(err) => {
warn!("Failed to parse server warnings from HTTP header: {err:?}");
}
};
};
}

#[doc(hidden)]
pub struct Unset;

Expand Down
Loading