Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/coinbase tickers #2

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
2 changes: 1 addition & 1 deletion barter-data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ arbitrary number of exchange `MarketStream`s using input `Subscription`s. Simply
| **Bitmex** | `Bitmex` | Perpetual | PublicTrades |
| **BybitSpot** | `BybitSpot::default()` | Spot | PublicTrades |
| **BybitPerpetualsUsd** | `BybitPerpetualsUsd::default()` | Perpetual | PublicTrades |
| **Coinbase** | `Coinbase` | Spot | PublicTrades |
| **Coinbase** | `Coinbase` | Spot | PublicTrades <br> OrderBooksL1 |
| **GateioSpot** | `GateioSpot::default()` | Spot | PublicTrades |
| **GateioFuturesUsd** | `GateioFuturesUsd::default()` | Future | PublicTrades |
| **GateioFuturesBtc** | `GateioFuturesBtc::default()` | Future | PublicTrades |
Expand Down
166 changes: 166 additions & 0 deletions barter-data/src/exchange/coinbase/book/l1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use crate::{
event::{MarketEvent, MarketIter},
exchange::{coinbase::channel::CoinbaseChannel, subscription::ExchangeSub, ExchangeId},
subscription::book::{Level, OrderBookL1},
Identifier,
};
use barter_integration::model::{Exchange, SubscriptionId};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

/// [`Coinbase`](super::super::Coinbase) real-time OrderBook Level1 (top of book) message.
///
/// ### Raw Payload Examples
/// #### Coinbase OrderBookL1
/// See docs: <https://docs.cdp.coinbase.com/exchange/docs/websocket-channels/#level2-channel>
/// ```json
///{
/// "type": "ticker",
/// "sequence": 37475248783,
/// "product_id": "ETH-USD",
/// "price": "1285.22",
/// "open_24h": "1310.79",
/// "volume_24h": "245532.79269678",
/// "low_24h": "1280.52",
/// "high_24h": "1313.8",
/// "volume_30d": "9788783.60117027",
/// "best_bid": "1285.04",
/// "best_bid_size": "0.46688654",
/// "best_ask": "1285.27",
/// "best_ask_size": "1.56637040",
/// "side": "buy",
/// "time": "2022-10-19T23:28:22.061769Z",
/// "trade_id": 370843401,
/// "last_size": "11.4396987"
///}
/// ```
#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
pub struct CoinbaseOrderBookL1 {
#[serde(rename(deserialize = "time"))]
pub time: DateTime<Utc>,
#[serde(rename(deserialize = "type"))]
pub kind: String,
pub sequence: u64,
#[serde(
rename(deserialize = "product_id"),
deserialize_with = "de_ob_l1_subscription_id"
)]
pub subscription_id: SubscriptionId,
#[serde(
rename(deserialize = "best_bid"),
deserialize_with = "barter_integration::de::de_str"
)]
pub best_bid_price: f64,
#[serde(
rename(deserialize = "best_bid_size"),
deserialize_with = "barter_integration::de::de_str"
)]
pub best_bid_amount: f64,
#[serde(
rename(deserialize = "best_ask"),
deserialize_with = "barter_integration::de::de_str"
)]
pub best_ask_price: f64,
#[serde(
rename(deserialize = "best_ask_size"),
deserialize_with = "barter_integration::de::de_str"
)]
pub best_ask_amount: f64,
}

impl Identifier<Option<SubscriptionId>> for CoinbaseOrderBookL1 {
fn id(&self) -> Option<SubscriptionId> {
Some(self.subscription_id.clone())
}
}

impl<InstrumentId> From<(ExchangeId, InstrumentId, CoinbaseOrderBookL1)>
for MarketIter<InstrumentId, OrderBookL1>
{
fn from(
(exchange_id, instrument, book): (ExchangeId, InstrumentId, CoinbaseOrderBookL1),
) -> Self {
Self(vec![Ok(MarketEvent {
exchange_time: book.time,
received_time: Utc::now(),
exchange: Exchange::from(exchange_id),
instrument,
kind: OrderBookL1 {
last_update_time: book.time,
best_bid: Level::new(book.best_bid_price, book.best_bid_amount),
best_ask: Level::new(book.best_ask_price, book.best_ask_amount),
},
})])
}
}

/// Deserialize a [`CoinbaseOrderBookL1`] "s" (eg/ "BTCUSDT") as the associated [`SubscriptionId`].
///
/// eg/ "ticker|BTC-USD"
pub fn de_ob_l1_subscription_id<'de, D>(deserializer: D) -> Result<SubscriptionId, D::Error>
where
D: serde::de::Deserializer<'de>,
{
<&str as Deserialize>::deserialize(deserializer)
.map(|market| ExchangeSub::from((CoinbaseChannel::ORDER_BOOK_L1, market)).id())
}

#[cfg(test)]
mod tests {
use super::*;

mod de {
use super::*;

#[test]
fn test_coinbase_order_book_l1() {
struct TestCase {
input: &'static str,
expected: CoinbaseOrderBookL1,
}

let time = Utc::now();

let tests = vec![TestCase {
// TC0: valid Spot CoinbaseOrderBookL1
input: r#"
{
"type": "ticker",
"sequence": 37475248783,
"product_id": "ETH-USD",
"price": "1285.22",
"open_24h": "1310.79",
"volume_24h": "245532.79269678",
"low_24h": "1280.52",
"high_24h": "1313.8",
"volume_30d": "9788783.60117027",
"best_bid": "1285.04",
"best_bid_size": "0.46688654",
"best_ask": "1285.27",
"best_ask_size": "1.56637040",
"side": "buy",
"time": "2022-10-19T23:28:22.061769Z",
"trade_id": 370843401,
"last_size": "11.4396987"
}
"#,
expected: CoinbaseOrderBookL1 {
kind: "ticker".into(),
sequence: 37475248783,
subscription_id: SubscriptionId::from("ticker|ETH-USD"),
time,
best_bid_price: 1285.04,
best_bid_amount: 0.46688654,
best_ask_price: 1285.27,
best_ask_amount: 1.56637040,
},
}];

for (index, test) in tests.into_iter().enumerate() {
let actual = serde_json::from_str::<CoinbaseOrderBookL1>(test.input).unwrap();
let actual = CoinbaseOrderBookL1 { time, ..actual };
assert_eq!(actual, test.expected, "TC{} failed", index);
}
}
}
}
1 change: 1 addition & 0 deletions barter-data/src/exchange/coinbase/book/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod l1;
12 changes: 11 additions & 1 deletion barter-data/src/exchange/coinbase/channel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Coinbase;
use crate::{
subscription::{trade::PublicTrades, Subscription},
subscription::{book::OrderBooksL1, trade::PublicTrades, Subscription},
Identifier,
};
use serde::Serialize;
Expand All @@ -17,6 +17,10 @@ impl CoinbaseChannel {
///
/// See docs: <https://docs.cloud.coinbase.com/exchange/docs/websocket-channels#match>
pub const TRADES: Self = Self("matches");
/// [`Coinbase`] real-time L1 orderbook channel.
///
/// See docs: <https://docs.cloud.coinbase.com/exchange/docs/websocket-channels#level2-channel>
pub const ORDER_BOOK_L1: Self = Self("ticker");
}

impl<Instrument> Identifier<CoinbaseChannel> for Subscription<Coinbase, Instrument, PublicTrades> {
Expand All @@ -25,6 +29,12 @@ impl<Instrument> Identifier<CoinbaseChannel> for Subscription<Coinbase, Instrume
}
}

impl<Instrument> Identifier<CoinbaseChannel> for Subscription<Coinbase, Instrument, OrderBooksL1> {
fn id(&self) -> CoinbaseChannel {
CoinbaseChannel::ORDER_BOOK_L1
}
}

impl AsRef<str> for CoinbaseChannel {
fn as_ref(&self) -> &str {
self.0
Expand Down
19 changes: 16 additions & 3 deletions barter-data/src/exchange/coinbase/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use self::{
channel::CoinbaseChannel, market::CoinbaseMarket, subscription::CoinbaseSubResponse,
trade::CoinbaseTrade,
book::l1::CoinbaseOrderBookL1, channel::CoinbaseChannel, market::CoinbaseMarket,
subscription::CoinbaseSubResponse, trade::CoinbaseTrade,
};
use crate::{
exchange::{Connector, ExchangeId, ExchangeSub, StreamSelector},
instrument::InstrumentData,
subscriber::{validator::WebSocketSubValidator, WebSocketSubscriber},
subscription::trade::PublicTrades,
subscription::{book::OrderBooksL1, trade::PublicTrades},
transformer::stateless::StatelessTransformer,
ExchangeWsStream,
};

use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
use barter_macro::{DeExchange, SerExchange};
use serde_json::json;
use url::Url;

/// OrderBook types for [`Coinbase`]
pub mod book;

/// Defines the type that translates a Barter [`Subscription`](crate::subscription::Subscription)
/// into an exchange [`Connector`] specific channel used for generating [`Connector::requests`].
pub mod channel;
Expand Down Expand Up @@ -79,3 +83,12 @@ where
type Stream =
ExchangeWsStream<StatelessTransformer<Self, Instrument::Id, PublicTrades, CoinbaseTrade>>;
}

impl<Instrument> StreamSelector<Instrument, OrderBooksL1> for Coinbase
where
Instrument: InstrumentData,
{
type Stream = ExchangeWsStream<
StatelessTransformer<Self, Instrument::Id, OrderBooksL1, CoinbaseOrderBookL1>,
>;
}