diff --git a/Cargo.toml b/Cargo.toml index 1d20018..41ddf87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,9 @@ members = [ "hftbacktest", "hftbacktest-derive", "py-hftbacktest", - "collector" -, "connector"] + "collector", + "connector", + "contrib/converter"] [profile.dev] opt-level = 0 diff --git a/contrib/converter/Cargo.toml b/contrib/converter/Cargo.toml new file mode 100644 index 0000000..15342a9 --- /dev/null +++ b/contrib/converter/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "converter" +version = "0.1.0" +edition = "2021" + +[dependencies] +zip = { version = "2.1.3" } +flate2 = {version = "1.0.34" } +hftbacktest = { path = "../../hftbacktest" } +hftbacktest-derive = { path = "../../hftbacktest-derive" } +serde = { version = "1.0.210", features = ["derive"] } +serde_json = "1.0.128" +clap = { version = "4.5.17", features = ["derive"] } diff --git a/contrib/converter/src/bybit.rs b/contrib/converter/src/bybit.rs new file mode 100644 index 0000000..8ef3f37 --- /dev/null +++ b/contrib/converter/src/bybit.rs @@ -0,0 +1,177 @@ +use crate::converter::ConverterBase; + +use hftbacktest::types::Event; +use hftbacktest::types::{ + BUY_EVENT, DEPTH_CLEAR_EVENT, DEPTH_EVENT, DEPTH_SNAPSHOT_EVENT, SELL_EVENT, TRADE_EVENT, +}; +use serde_json::Value; + +use serde::de::Error; +use serde::{Deserialize, Deserializer}; + +// Everything we do below is based on the bybit API which is covered in detail +// at https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook. + +// This is almost a carbon copy of the structs used in the bybit connector. In an ideal +// world they would be moved into the main rust lib and then reused here. + +fn from_str_to_f64<'de, D: Deserializer<'de>>(deserializer: D) -> Result { + Ok(match Value::deserialize(deserializer)? { + Value::String(s) => s.parse().map_err(Error::custom)?, + Value::Number(num) => num + .as_f64() + .ok_or_else(|| Error::custom("Invalid number"))?, + _ => return Err(Error::custom("wrong type")), + }) +} + +#[derive(Deserialize, Debug)] +pub struct PublicStream { + pub topic: String, + #[serde(rename = "type")] + pub event_type: String, + pub data: Value, + pub cts: Option, +} + +#[derive(Deserialize, Debug)] +pub struct OrderBook { + #[serde(rename = "b")] + pub bids: Vec<(String, String)>, + #[serde(rename = "a")] + pub asks: Vec<(String, String)>, +} + +#[derive(Deserialize, Debug)] +pub struct Trade { + #[serde(rename = "T")] + pub ts: i64, + #[serde(rename = "S")] + pub side: String, + #[serde(rename = "v")] + #[serde(deserialize_with = "from_str_to_f64")] + pub trade_size: f64, + #[serde(rename = "p")] + #[serde(deserialize_with = "from_str_to_f64")] + pub trade_price: f64, +} + +pub fn bybit_process( + base: &mut ConverterBase, + local_ts: i64, + payload: &str, +) -> Result, Box> { + let mut result: Vec = Vec::new(); + + let stream = serde_json::from_str::(payload)?; + + if stream.topic.starts_with("publicTrade") { + let trades: Vec = serde_json::from_value(stream.data)?; + for trade in trades { + // adjust if necessary and detect negative latency.. + let mut exch_ts = base.convert_ts(trade.ts); + let latency = local_ts - exch_ts; + exch_ts = local_ts + base.latency(latency); + + let event_type = match &*trade.side { + "Sell" => TRADE_EVENT | SELL_EVENT, + "Buy" => TRADE_EVENT | BUY_EVENT, + _ => TRADE_EVENT | SELL_EVENT, // Assume mm trade.. + }; + + result.push(Event { + ev: event_type, + exch_ts, + local_ts, + order_id: 0, + px: trade.trade_price, + qty: trade.trade_size, + ival: 0, + fval: 0.0, + }) + } + } else if stream.topic.starts_with("orderbook") { + // adjust if necessary and detect negative latency.. + let mut exch_ts = base.convert_ts(stream.cts.ok_or("Missing CTS on order book event")?); + let latency = local_ts - exch_ts; + exch_ts = local_ts + base.latency(latency); + + let order_book: OrderBook = serde_json::from_value(stream.data)?; + + if order_book.asks.len() > 0 { + let (last_ask_px_str, _) = order_book.asks.last().unwrap(); + + let ev = match &*stream.event_type { + "snapshot" => DEPTH_SNAPSHOT_EVENT | SELL_EVENT, + _ => DEPTH_EVENT | SELL_EVENT, + }; + + // Clear the books if this is a snapshot.. + if stream.event_type == "snapshot" { + result.push(Event { + ev: DEPTH_CLEAR_EVENT | SELL_EVENT, + exch_ts, + local_ts, + order_id: 0, + px: last_ask_px_str.parse::()?, + qty: 0.0, + ival: 0, + fval: 0.0, + }) + } + + // Insert entries.. + for (ask_px, ask_qty) in order_book.asks { + result.push(Event { + ev, + exch_ts, + local_ts, + order_id: 0, + px: ask_px.parse::()?, + qty: ask_qty.parse::()?, + ival: 0, + fval: 0.0, + }) + } + } + + if order_book.bids.len() > 0 { + let (last_bid_px_str, _) = order_book.bids.last().unwrap(); + + let ev = match &*stream.event_type { + "snapshot" => DEPTH_SNAPSHOT_EVENT | BUY_EVENT, + _ => DEPTH_EVENT | BUY_EVENT, + }; + + // Clear the books if this is a snapshot.. + if stream.event_type == "snapshot" { + result.push(Event { + ev: DEPTH_CLEAR_EVENT | BUY_EVENT, + exch_ts, + local_ts, + order_id: 0, + px: last_bid_px_str.parse::()?, + qty: 0.0, + ival: 0, + fval: 0.0, + }) + } + + // Insert entries.. + for (bid_px, bid_qty) in order_book.bids { + result.push(Event { + ev, + exch_ts, + local_ts, + order_id: 0, + px: bid_px.parse::()?, + qty: bid_qty.parse::()?, + ival: 0, + fval: 0.0, + }) + } + } + } + + Ok(result) +} diff --git a/contrib/converter/src/converter.rs b/contrib/converter/src/converter.rs new file mode 100644 index 0000000..0b5988d --- /dev/null +++ b/contrib/converter/src/converter.rs @@ -0,0 +1,98 @@ +use flate2::read::GzDecoder; +use hftbacktest::types::Event; +use std::fs::File; +use std::io::{BufRead, BufReader, BufWriter}; + +use hftbacktest::backtest::data::write_npy_data; + +use crate::bybit::bybit_process; + +pub struct ConverterBase { + base_latency: i64, + min_latency: i64, + time_mul: i64, +} + +impl ConverterBase { + pub fn new(base_latency: i64) -> Self { + Self { + base_latency, + min_latency: i64::MAX, + time_mul: 1_000_000, // convert to nanos. + } + } + + pub fn convert_ts(&self, ts: i64) -> i64 { + ts * self.time_mul + } + + pub fn latency(&mut self, latency: i64) -> i64 { + if latency > 0 && latency < self.min_latency { + self.min_latency = latency; + } + + if latency < 0 { + return self.min_latency + self.base_latency; + } + + latency + } +} + +#[allow(non_camel_case_types)] +pub enum Converter { + bybit(ConverterBase), +} + +pub trait IConverter { + fn process( + &mut self, + local_ts: i64, + payload: &str, + ) -> Result, Box>; +} + +impl IConverter for Converter { + fn process( + &mut self, + local_ts: i64, + payload: &str, + ) -> Result, Box> { + match self { + Converter::bybit(base) => bybit_process(base, local_ts, payload), + } + } +} + +impl Converter { + pub fn new(exchange: &str, base_latency: i64) -> Self { + match exchange { + "bybit" => Converter::bybit(ConverterBase::new(base_latency)), + _ => panic!("Unknown exchange"), + } + } + + pub fn process_file( + &mut self, + input: BufReader>, + output: &mut BufWriter, + ) -> Result> { + let mut counter: usize = 0; + for line in input.lines() { + let line = line?; + + // Split the line into timestamp and JSON part + if let Some((timestamp_str, json_str)) = line.split_once(' ') { + // Parse the timestamp + let timestamp: i64 = timestamp_str.parse()?; + let events = self.process(timestamp, json_str)?; + counter += events.len(); + write_npy_data(output, events.as_slice())?; + } else { + eprintln!("Error: line format incorrect: {}", line); + } + } + + Ok(counter) + } +} diff --git a/contrib/converter/src/main.rs b/contrib/converter/src/main.rs new file mode 100644 index 0000000..da70e5f --- /dev/null +++ b/contrib/converter/src/main.rs @@ -0,0 +1,80 @@ +use flate2::read::GzDecoder; +use hftbacktest::backtest::data::write_npy_header; +use hftbacktest::types::Event; +use std::fs::{remove_file, File}; +use std::io::{copy, BufReader, BufWriter, Seek, SeekFrom, Write}; +use zip::write::SimpleFileOptions; +use zip::ZipWriter; + +use clap::Parser; + +mod bybit; +mod converter; + +use converter::Converter; + +#[derive(Parser, Debug, Clone)] +#[command(version, about, long_about = None)] +struct Args { + #[arg(long)] + exchange: String, + + #[arg(long)] + input: String, + + #[arg(long, default_value = "test.npz")] + output: String, + + #[arg(long, default_value_t = 5_000_000)] + base_latency: i64, + + #[arg(long, default_value = "/tmp/")] + temp_dir: String, +} + +fn main() -> Result<(), Box> { + let args = Args::parse(); + + let mut temp_file = args.temp_dir; + temp_file.push_str("temp.npy"); + let file = File::create(&temp_file)?; + let mut writer = BufWriter::new(file); + + // This ordering may not be optimal for speed! + let input = File::open(args.input.clone())?; + let decoder = GzDecoder::new(input); + let reader = BufReader::new(decoder); + + let mut converter = Converter::new(&*args.exchange, args.base_latency); + + write_npy_header::, Event>(&mut writer, 0)?; + + // Actually do the work.. + println!("Converting {} to {}", args.input, &temp_file); + let counter = converter.process_file(reader, &mut writer)?; + println!("Created {} events", counter); + + writer.seek(SeekFrom::Start(0))?; + write_npy_header::, Event>(&mut writer, counter)?; + writer.flush()?; + + let output = File::create(&args.output)?; + let zip_writer = BufWriter::new(output); + let mut zip = ZipWriter::new(zip_writer); + + let options = SimpleFileOptions::default() + .compression_method(zip::CompressionMethod::DEFLATE) + .compression_level(Some(9)); + + zip.start_file("data.npy", options)?; + + println!("Compressing {} to {}", &temp_file, &args.output); + let mut temp_read = BufReader::new(File::open(&temp_file)?); + copy(&mut temp_read, &mut zip)?; + zip.finish()?; + + println!("Removing {}", &temp_file); + remove_file(&temp_file)?; + + Ok(()) +} diff --git a/hftbacktest/src/backtest/data/mod.rs b/hftbacktest/src/backtest/data/mod.rs index 311cd1e..ca1b7a2 100644 --- a/hftbacktest/src/backtest/data/mod.rs +++ b/hftbacktest/src/backtest/data/mod.rs @@ -10,7 +10,10 @@ use std::{ slice::SliceIndex, }; -pub use npy::{read_npy_file, read_npz_file, write_npy, Field, NpyDTyped, NpyHeader}; +pub use npy::{ + read_npy_file, read_npz_file, write_npy, write_npy_data, write_npy_header, Field, NpyDTyped, + NpyHeader, +}; pub use reader::{Cache, DataPreprocess, DataSource, FeedLatencyAdjustment, Reader, ReaderBuilder}; use crate::utils::{AlignedArray, CACHE_LINE_SIZE}; diff --git a/hftbacktest/src/backtest/data/npy/mod.rs b/hftbacktest/src/backtest/data/npy/mod.rs index 41d35c1..6e0882a 100644 --- a/hftbacktest/src/backtest/data/npy/mod.rs +++ b/hftbacktest/src/backtest/data/npy/mod.rs @@ -248,6 +248,28 @@ pub fn read_npz_file(filepath: &str, name: &str) -> std::i read_npy(&mut file, size) } +pub fn write_npy_header(write: &mut W, len: usize) -> std::io::Result<()> { + let descr = T::descr(); + let header = NpyHeader { + descr, + fortran_order: false, + shape: vec![len], + }; + + write.write_all(b"\x93NUMPY\x01\x00")?; + let header_str = header.to_string_padding(); + let len = header_str.len() as u16; + write.write_all(&len.to_le_bytes())?; + write.write_all(header_str.as_bytes())?; + + Ok(()) +} + +pub fn write_npy_data(write: &mut W, data: &[T]) -> std::io::Result<()> { + write.write_all(vec_as_bytes(data))?; + Ok(()) +} + pub fn write_npy(write: &mut W, data: &[T]) -> std::io::Result<()> { let descr = T::descr(); let header = NpyHeader {