Skip to content

Commit

Permalink
VER: Release 0.16.0
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen authored Nov 12, 2024
2 parents fb9a261 + aab255d commit e9bdb35
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 5 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
# Changelog

## 0.15.0 - TBD
## 0.16.0 - TBD

#### Enhancements
- Upgraded DBN version to 0.23.1:
- Added floating-point getters for price fields
- Added new IntelligentCross venues `ASPN`, `ASMT`, and `ASPI`
- Upgraded `thiserror` version to 2.0

#### Deprecations
- Deprecated `Packaging` enum and `packaging` field on `SubmitJobParams` and `BatchJob`.
These will be removed in a future version. All files from a batch job can be downloaded
with the `batch().download()` method on the historical client

## 0.15.0 - 2024-10-22

#### Enhancements
- Upgraded DBN version to 0.23.0:
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "databento"
authors = ["Databento <[email protected]>"]
version = "0.15.0"
version = "0.16.0"
edition = "2021"
repository = "https://github.com/databento/databento-rs"
description = "Official Databento client library"
Expand All @@ -23,7 +23,7 @@ historical = ["dep:futures", "dep:reqwest", "dep:serde", "dep:tokio-util", "dep:
live = ["dep:hex", "dep:sha2", "tokio/net"]

[dependencies]
dbn = { version = "0.23.0", features = ["async", "serde"] }
dbn = { version = "0.23.1", features = ["async", "serde"] }
# Async stream trait
futures = { version = "0.3", optional = true }
# Used for Live authentication
Expand All @@ -43,6 +43,7 @@ typed-builder = "0.20"

[dev-dependencies]
anyhow = "1.0.91"
async-compression = { version = "0.4.13", features = ["tokio", "zstd"] }
clap = { version = "4.5.20", features = ["derive"] }
tempfile = "3.13.0"
tokio = { version = "1.41", features = ["full"] }
Expand Down
91 changes: 91 additions & 0 deletions examples/split_symbols.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! An example program that splits a DBN file into several DBN files
//! by parent symbol (from the `asset` field in the definitions schema).
use std::collections::HashMap;

use anyhow::Context;
use async_compression::tokio::write::ZstdEncoder;
use databento::{
dbn::{
decode::AsyncDbnDecoder, encode::AsyncDbnEncoder, InstrumentDefMsg, Metadata, Schema,
SymbolIndex,
},
historical::timeseries::GetRangeParams,
HistoricalClient,
};
use tokio::fs::File;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
if std::env::args().len() != 3 {
anyhow::bail!(
"Invalid number of arguments, expected: split_symbols FILE_PATH OUTPUT_PATTERN"
);
}
let file_path = std::env::args().nth(1).unwrap();
let output_pattern = std::env::args().nth(2).unwrap();
if !output_pattern.contains("{parent}") {
anyhow::bail!("OUTPUT_PATTERN should contain {{parent}}");
}
let mut decoder = AsyncDbnDecoder::from_zstd_file(file_path).await?;

let metadata = decoder.metadata().clone();
let symbol_map = metadata.symbol_map()?;
let symbols_to_parent = fetch_symbols_to_parent(&metadata).await?;
let mut encoders = HashMap::<String, AsyncDbnEncoder<ZstdEncoder<File>>>::new();
while let Some(rec) = decoder.decode_record_ref().await? {
let Some(symbol) = symbol_map.get_for_rec(&rec) else {
eprintln!("Missing mapping for {rec:?}");
continue;
};
let Some(parent) = symbols_to_parent.get(symbol) else {
eprintln!("Couldn't find parent mapping for {symbol} with {rec:?}");
continue;
};
if let Some(encoder) = encoders.get_mut(parent) {
encoder.encode_record_ref(rec).await?;
} else {
let mut encoder = AsyncDbnEncoder::with_zstd(
File::create_new(output_pattern.replace("{parent}", parent))
.await
.with_context(|| format!("creating file for {parent}"))?,
&metadata,
)
.await?;
encoder.encode_record_ref(rec).await?;
encoders.insert(parent.clone(), encoder);
};
}
for (parent, encoder) in encoders {
if let Err(e) = encoder.shutdown().await {
eprintln!("Failed to shutdown encoder for {parent}: {e:?}");
}
}

Ok(())
}

async fn fetch_symbols_to_parent(metadata: &Metadata) -> anyhow::Result<HashMap<String, String>> {
let mut client = HistoricalClient::builder().key_from_env()?.build()?;
let end = metadata.end().ok_or_else(|| {
anyhow::format_err!("Missing end in metadata. This script is intended for historical data")
})?;
let mut res = HashMap::new();
// 2000 is the maximum number of symbols per request
for chunk in metadata.symbols.chunks(2000) {
let mut decoder = client
.timeseries()
.get_range(
&GetRangeParams::builder()
.dataset(metadata.dataset.clone())
.schema(Schema::Definition)
.date_time_range((metadata.start(), end))
.symbols(Vec::from(chunk))
.build(),
)
.await?;
while let Some(def) = decoder.decode_record::<InstrumentDefMsg>().await? {
res.insert(def.raw_symbol()?.to_owned(), def.asset()?.to_owned());
}
}
Ok(res)
}
16 changes: 14 additions & 2 deletions src/historical/batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! The historical batch download API.
#![allow(deprecated)] // Packaging

use core::fmt;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -209,6 +211,10 @@ pub enum SplitDuration {

/// How the batch job will be packaged.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[deprecated(
since = "0.16.0",
note = "Use the `download()` method to download the whole job`"
)]
pub enum Packaging {
/// ZIP compressed.
Zip,
Expand Down Expand Up @@ -291,6 +297,10 @@ pub struct SubmitJobParams {
pub split_size: Option<NonZeroU64>,
/// The optional archive type to package all batched data files in. Defaults to `None`.
#[builder(default, setter(strip_option))]
#[deprecated(
since = "0.16.0",
note = "Use the `download()` method to download the whole job`"
)]
pub packaging: Option<Packaging>,
/// The delivery mechanism for the batched data files once processed. Defaults to
/// [`Download`](Delivery::Download).
Expand Down Expand Up @@ -357,6 +367,10 @@ pub struct BatchJob {
/// The maximum size for an individual file before splitting into multiple files.
pub split_size: Option<NonZeroU64>,
/// The packaging method of the batch data.
#[deprecated(
since = "0.16.0",
note = "Use the `download()` method to download the whole job`"
)]
pub packaging: Option<Packaging>,
/// The delivery mechanism of the batch data.
pub delivery: Delivery,
Expand Down Expand Up @@ -476,7 +490,6 @@ impl Packaging {
pub const fn as_str(&self) -> &'static str {
match self {
Packaging::Zip => "zip",
#[allow(deprecated)]
Packaging::Tar => "tar",
}
}
Expand All @@ -494,7 +507,6 @@ impl FromStr for Packaging {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"zip" => Ok(Packaging::Zip),
#[allow(deprecated)]
"tar" => Ok(Packaging::Tar),
_ => Err(crate::Error::bad_arg(
"s",
Expand Down

0 comments on commit e9bdb35

Please sign in to comment.