Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converter #151

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ members = [
"hftbacktest",
"hftbacktest-derive",
"py-hftbacktest",
"collector"
, "connector"]
"collector",
"connector",
"contrib/converter"]

[profile.dev]
opt-level = 0
Expand Down
13 changes: 13 additions & 0 deletions contrib/converter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "converter"
version = "0.1.0"
edition = "2021"

[dependencies]
zip = { version = "2.1.3" }
flate2 = {version = "1.0.34" }
hftbacktest = { path = "../../hftbacktest" }
hftbacktest-derive = { path = "../../hftbacktest-derive" }
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
clap = { version = "4.5.17", features = ["derive"] }
177 changes: 177 additions & 0 deletions contrib/converter/src/bybit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use crate::converter::ConverterBase;

use hftbacktest::types::Event;
use hftbacktest::types::{
BUY_EVENT, DEPTH_CLEAR_EVENT, DEPTH_EVENT, DEPTH_SNAPSHOT_EVENT, SELL_EVENT, TRADE_EVENT,
};
use serde_json::Value;

use serde::de::Error;
use serde::{Deserialize, Deserializer};

// Everything we do below is based on the bybit API which is covered in detail
// at https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook.

// This is almost a carbon copy of the structs used in the bybit connector. In an ideal
// world they would be moved into the main rust lib and then reused here.

fn from_str_to_f64<'de, D: Deserializer<'de>>(deserializer: D) -> Result<f64, D::Error> {
Ok(match Value::deserialize(deserializer)? {
Value::String(s) => s.parse().map_err(Error::custom)?,
Value::Number(num) => num
.as_f64()
.ok_or_else(|| Error::custom("Invalid number"))?,
_ => return Err(Error::custom("wrong type")),
})
}

#[derive(Deserialize, Debug)]
pub struct PublicStream {
pub topic: String,
#[serde(rename = "type")]
pub event_type: String,
pub data: Value,
pub cts: Option<i64>,
}

#[derive(Deserialize, Debug)]
pub struct OrderBook {
#[serde(rename = "b")]
pub bids: Vec<(String, String)>,
#[serde(rename = "a")]
pub asks: Vec<(String, String)>,
}

#[derive(Deserialize, Debug)]
pub struct Trade {
#[serde(rename = "T")]
pub ts: i64,
#[serde(rename = "S")]
pub side: String,
#[serde(rename = "v")]
#[serde(deserialize_with = "from_str_to_f64")]
pub trade_size: f64,
#[serde(rename = "p")]
#[serde(deserialize_with = "from_str_to_f64")]
pub trade_price: f64,
}

pub fn bybit_process(
base: &mut ConverterBase,
local_ts: i64,
payload: &str,
) -> Result<Vec<Event>, Box<dyn std::error::Error>> {
let mut result: Vec<Event> = Vec::new();

let stream = serde_json::from_str::<PublicStream>(payload)?;

if stream.topic.starts_with("publicTrade") {
let trades: Vec<Trade> = serde_json::from_value(stream.data)?;
for trade in trades {
// adjust if necessary and detect negative latency..
let mut exch_ts = base.convert_ts(trade.ts);
let latency = local_ts - exch_ts;
exch_ts = local_ts + base.latency(latency);

let event_type = match &*trade.side {
"Sell" => TRADE_EVENT | SELL_EVENT,
"Buy" => TRADE_EVENT | BUY_EVENT,
_ => TRADE_EVENT | SELL_EVENT, // Assume mm trade..
};

result.push(Event {
ev: event_type,
exch_ts,
local_ts,
order_id: 0,
px: trade.trade_price,
qty: trade.trade_size,
ival: 0,
fval: 0.0,
})
}
} else if stream.topic.starts_with("orderbook") {
// adjust if necessary and detect negative latency..
let mut exch_ts = base.convert_ts(stream.cts.ok_or("Missing CTS on order book event")?);
let latency = local_ts - exch_ts;
exch_ts = local_ts + base.latency(latency);

let order_book: OrderBook = serde_json::from_value(stream.data)?;

if order_book.asks.len() > 0 {
let (last_ask_px_str, _) = order_book.asks.last().unwrap();

let ev = match &*stream.event_type {
"snapshot" => DEPTH_SNAPSHOT_EVENT | SELL_EVENT,
_ => DEPTH_EVENT | SELL_EVENT,
};

// Clear the books if this is a snapshot..
if stream.event_type == "snapshot" {
result.push(Event {
ev: DEPTH_CLEAR_EVENT | SELL_EVENT,
exch_ts,
local_ts,
order_id: 0,
px: last_ask_px_str.parse::<f64>()?,
qty: 0.0,
ival: 0,
fval: 0.0,
})
}

// Insert entries..
for (ask_px, ask_qty) in order_book.asks {
result.push(Event {
ev,
exch_ts,
local_ts,
order_id: 0,
px: ask_px.parse::<f64>()?,
qty: ask_qty.parse::<f64>()?,
ival: 0,
fval: 0.0,
})
}
}

if order_book.bids.len() > 0 {
let (last_bid_px_str, _) = order_book.bids.last().unwrap();

let ev = match &*stream.event_type {
"snapshot" => DEPTH_SNAPSHOT_EVENT | BUY_EVENT,
_ => DEPTH_EVENT | BUY_EVENT,
};

// Clear the books if this is a snapshot..
if stream.event_type == "snapshot" {
result.push(Event {
ev: DEPTH_CLEAR_EVENT | BUY_EVENT,
exch_ts,
local_ts,
order_id: 0,
px: last_bid_px_str.parse::<f64>()?,
qty: 0.0,
ival: 0,
fval: 0.0,
})
}

// Insert entries..
for (bid_px, bid_qty) in order_book.bids {
result.push(Event {
ev,
exch_ts,
local_ts,
order_id: 0,
px: bid_px.parse::<f64>()?,
qty: bid_qty.parse::<f64>()?,
ival: 0,
fval: 0.0,
})
}
}
}

Ok(result)
}
98 changes: 98 additions & 0 deletions contrib/converter/src/converter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use flate2::read::GzDecoder;
use hftbacktest::types::Event;
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter};

use hftbacktest::backtest::data::write_npy_data;

use crate::bybit::bybit_process;

pub struct ConverterBase {
base_latency: i64,
min_latency: i64,
time_mul: i64,
}

impl ConverterBase {
pub fn new(base_latency: i64) -> Self {
Self {
base_latency,
min_latency: i64::MAX,
time_mul: 1_000_000, // convert to nanos.
}
}

pub fn convert_ts(&self, ts: i64) -> i64 {
ts * self.time_mul
}

pub fn latency(&mut self, latency: i64) -> i64 {
if latency > 0 && latency < self.min_latency {
self.min_latency = latency;
}

if latency < 0 {
return self.min_latency + self.base_latency;
}

latency
}
}

#[allow(non_camel_case_types)]
pub enum Converter {
bybit(ConverterBase),
}

pub trait IConverter {
fn process(
&mut self,
local_ts: i64,
payload: &str,
) -> Result<Vec<Event>, Box<dyn std::error::Error>>;
}

impl IConverter for Converter {
fn process(
&mut self,
local_ts: i64,
payload: &str,
) -> Result<Vec<Event>, Box<dyn std::error::Error>> {
match self {
Converter::bybit(base) => bybit_process(base, local_ts, payload),
}
}
}

impl Converter {
pub fn new(exchange: &str, base_latency: i64) -> Self {
match exchange {
"bybit" => Converter::bybit(ConverterBase::new(base_latency)),
_ => panic!("Unknown exchange"),
}
}

pub fn process_file(
&mut self,
input: BufReader<GzDecoder<File>>,
output: &mut BufWriter<File>,
) -> Result<usize, Box<dyn std::error::Error>> {
let mut counter: usize = 0;
for line in input.lines() {
let line = line?;

// Split the line into timestamp and JSON part
if let Some((timestamp_str, json_str)) = line.split_once(' ') {
// Parse the timestamp
let timestamp: i64 = timestamp_str.parse()?;
let events = self.process(timestamp, json_str)?;
counter += events.len();
write_npy_data(output, events.as_slice())?;
} else {
eprintln!("Error: line format incorrect: {}", line);
}
}

Ok(counter)
}
}
80 changes: 80 additions & 0 deletions contrib/converter/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use flate2::read::GzDecoder;
use hftbacktest::backtest::data::write_npy_header;
use hftbacktest::types::Event;
use std::fs::{remove_file, File};
use std::io::{copy, BufReader, BufWriter, Seek, SeekFrom, Write};
use zip::write::SimpleFileOptions;
use zip::ZipWriter;

use clap::Parser;

mod bybit;
mod converter;

use converter::Converter;

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(long)]
exchange: String,

#[arg(long)]
input: String,

#[arg(long, default_value = "test.npz")]
output: String,

#[arg(long, default_value_t = 5_000_000)]
base_latency: i64,

#[arg(long, default_value = "/tmp/")]
temp_dir: String,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

let mut temp_file = args.temp_dir;
temp_file.push_str("temp.npy");
let file = File::create(&temp_file)?;
let mut writer = BufWriter::new(file);

// This ordering may not be optimal for speed!
let input = File::open(args.input.clone())?;
let decoder = GzDecoder::new(input);
let reader = BufReader::new(decoder);

let mut converter = Converter::new(&*args.exchange, args.base_latency);

write_npy_header::<BufWriter<File>, Event>(&mut writer, 0)?;

// Actually do the work..
println!("Converting {} to {}", args.input, &temp_file);
let counter = converter.process_file(reader, &mut writer)?;
println!("Created {} events", counter);

writer.seek(SeekFrom::Start(0))?;
write_npy_header::<BufWriter<File>, Event>(&mut writer, counter)?;
writer.flush()?;

let output = File::create(&args.output)?;
let zip_writer = BufWriter::new(output);
let mut zip = ZipWriter::new(zip_writer);

let options = SimpleFileOptions::default()
.compression_method(zip::CompressionMethod::DEFLATE)
.compression_level(Some(9));

zip.start_file("data.npy", options)?;

println!("Compressing {} to {}", &temp_file, &args.output);
let mut temp_read = BufReader::new(File::open(&temp_file)?);
copy(&mut temp_read, &mut zip)?;
zip.finish()?;

println!("Removing {}", &temp_file);
remove_file(&temp_file)?;

Ok(())
}
5 changes: 4 additions & 1 deletion hftbacktest/src/backtest/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use std::{
slice::SliceIndex,
};

pub use npy::{read_npy_file, read_npz_file, write_npy, Field, NpyDTyped, NpyHeader};
pub use npy::{
read_npy_file, read_npz_file, write_npy, write_npy_data, write_npy_header, Field, NpyDTyped,
NpyHeader,
};
pub use reader::{Cache, DataPreprocess, DataSource, FeedLatencyAdjustment, Reader, ReaderBuilder};

use crate::utils::{AlignedArray, CACHE_LINE_SIZE};
Expand Down
Loading