Skip to content

Commit

Permalink
fixup! feat: Rewrite order matching strategy
Browse files Browse the repository at this point in the history
Do not block on database interactions

TODO: match market order
  • Loading branch information
holzeis committed Jun 4, 2024
1 parent 1cfc725 commit 602eca1
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 67 deletions.
81 changes: 41 additions & 40 deletions coordinator/src/orderbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,19 @@ impl OrderbookSide {
entry.push(order);
}

/// removes the order by the given id from the orderbook.
pub fn remove_order(&mut self, order_id: Uuid) {
for vec in self.orders.values_mut() {
vec.retain(|order| order.id != order_id);
/// removes the order by the given id from the orderbook. Returns true if the order was found,
/// returns false otherwise.
fn remove_order(&mut self, order_id: Uuid) -> bool {
for (_price, orders) in self.orders.iter_mut() {
if let Some(pos) = orders.iter().position(|order| order.id == order_id) {
orders.remove(pos);
if orders.is_empty() {
self.orders.retain(|_price, orders| !orders.is_empty());
}
return true;
}
}
self.orders.retain(|_, vec| !vec.is_empty());
false
}

/// returns the a sorted vec orders of the orderbook side with the best price at first.
Expand Down Expand Up @@ -204,24 +211,25 @@ impl Orderbook {
}

/// Adds a limit order to the orderbook.
async fn add_limit_order(&mut self, new_order: NewLimitOrder) -> Result<Message> {
let order = spawn_blocking({
let mut conn = self.pool.clone().get()?;
fn add_limit_order(&mut self, new_order: NewLimitOrder) -> Message {
spawn_blocking({
let pool = self.pool.clone();
move || {
let mut conn = pool.clone().get()?;
let order = orders::insert_limit_order(&mut conn, new_order, OrderReason::Manual)
.map_err(|e| anyhow!(e))
.context("Failed to insert new order into DB")?;

anyhow::Ok(order)
}
})
.await??;
});

let order: Order = new_order.into();
match order.direction {
commons::Direction::Short => self.shorts.add_order(order),
commons::Direction::Long => self.longs.add_order(order),
}
Ok(Message::NewOrder(order))
Message::NewOrder(order)
}

/// Matches a market order against the orderbook. Will fail if the market order can't be fully
Expand All @@ -235,7 +243,7 @@ impl Orderbook {
// TODO(holzeis): We might want to delay that action in a regular task allowing us to
// process orderbook updates much faster in memory and postpone the expensive database
// transactions.
let order = spawn_blocking({
spawn_blocking({
let mut conn = self.pool.clone().get()?;
move || {
let order = orders::insert_market_order(&mut conn, new_order, order_reason)
Expand All @@ -244,8 +252,9 @@ impl Orderbook {

anyhow::Ok(order)
}
})
.await??;
});

let order: Order = new_order.into();

// find a match for the market order.
let matched_orders = match order.direction.opposite() {
Expand Down Expand Up @@ -274,7 +283,7 @@ impl Orderbook {
"Couldn't match order due to insufficient liquidity in the orderbook."
);

spawn_blocking(fail_order).await??;
spawn_blocking(fail_order);
bail!("No match found.");
}

Expand Down Expand Up @@ -359,8 +368,8 @@ impl Orderbook {

/// Updates a limit order in the orderbook. If the order id couldn't be found the update will be
/// ignored.
async fn update_limit_order(&mut self, order: Order) -> Result<Message> {
self.remove_order(order.id).await?;
fn update_limit_order(&mut self, order: Order) -> Message {
self.remove_order(order.id);
self.add_limit_order(NewLimitOrder {
id: order.id,
contract_symbol: order.contract_symbol,
Expand All @@ -371,10 +380,9 @@ impl Orderbook {
leverage: Decimal::try_from(order.leverage).expect("to fit"),
expiry: order.expiry,
stable: false,
})
.await?;
});

Ok(Message::Update(order))
Message::Update(order)
}

/// Sets the order and the corresponding match to failed by the given order id
Expand Down Expand Up @@ -424,13 +432,11 @@ impl Orderbook {

/// Removes a limit order from the orderbook. Ignores removing market orders, since they aren't
/// stored into the orderbook.
async fn remove_order(&mut self, order_id: Uuid) -> Result<Option<Message>> {
// TODO(holzeis): We might want to delay that action in a regular task allowing us to
// process orderbook updates much faster in memory and postpone the expensive database
// transactions.
let order = spawn_blocking({
let mut conn = self.pool.clone().get()?;
fn remove_order(&mut self, order_id: Uuid) -> Option<Message> {
spawn_blocking({
let pool = self.pool.clone();
move || {
let mut conn = pool.clone().get()?;
let matches = matches::get_matches_by_order_id(&mut conn, order_id)?;

let order = if !matches.is_empty() {
Expand All @@ -444,22 +450,17 @@ impl Orderbook {

anyhow::Ok(order)
}
})
.await??;
});

let message = match (order.order_type, order.direction) {
(OrderType::Limit, commons::Direction::Short) => {
self.shorts.remove_order(order_id);
Some(DeleteOrder(order_id))
}
(OrderType::Limit, commons::Direction::Long) => {
self.longs.remove_order(order_id);
Some(DeleteOrder(order_id))
}
(OrderType::Market, _) => None,
};
if self.shorts.remove_order(order_id) {
return Some(DeleteOrder(order_id));
}

if self.longs.remove_order(order_id) {
return Some(DeleteOrder(order_id));
}

Ok(message)
None
}
}

Expand Down
33 changes: 7 additions & 26 deletions coordinator/src/orderbook/trading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,38 +118,19 @@ pub fn spawn_orderbook(
new_order: NewOrder::Limit(new_order),
..
} => {
let trader_pubkey = new_order.trader_id;
let order_id = new_order.id;
match orderbook.add_limit_order(new_order).await {
Ok(message) => Some(message),
Err(e) => {
tracing::error!(%trader_pubkey, %order_id, "Failed to process new limit order. Error: {e:#}");
continue;
}
}
}
OrderbookMessage::DeleteOrder(order_id) => {
match orderbook.remove_order(order_id).await {
Ok(message) => message,
Err(e) => {
tracing::error!(%order_id, "Failed to process remove order. Error: {e:#}");
continue;
}
}
let message = orderbook.add_limit_order(new_order);
Some(message)
}
OrderbookMessage::DeleteOrder(order_id) => orderbook.remove_order(order_id),
OrderbookMessage::Update(
order @ Order {
order_type: commons::OrderType::Limit,
..
},
) => match orderbook.update_limit_order(order).await {
Ok(message) => Some(message),
Err(e) => {
tracing::error!(trader_pubkey=%order.trader_id, order_id=%order.id,
"Failed to process update order. Error: {e:#}");
continue;
}
},
) => {
let message = orderbook.update_limit_order(order);
Some(message)
}
OrderbookMessage::Update(Order {
order_type: commons::OrderType::Market,
..
Expand Down
41 changes: 41 additions & 0 deletions crates/xxi-node/src/commons/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Result;
use bitcoin::hashes::sha256;
use bitcoin::secp256k1::PublicKey;
use bitcoin::Amount;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use secp256k1::ecdsa::Signature;
use secp256k1::Message;
Expand Down Expand Up @@ -114,6 +115,26 @@ pub struct NewLimitOrder {
pub stable: bool,
}

impl From<NewLimitOrder> for Order {
fn from(value: NewLimitOrder) -> Self {
Self {
id: value.id,
price: value.price,
leverage: value.leverage.to_f32().expect("to fit"),
contract_symbol: value.contract_symbol,
trader_id: value.trader_id,
direction: value.direction,
quantity: value.quantity,
order_type: OrderType::Limit,
timestamp: OffsetDateTime::now_utc(),
expiry: value.expiry,
order_state: OrderState::Open,
order_reason: OrderReason::Manual,
stable: false,
}
}
}

impl NewLimitOrder {
pub fn message(&self) -> Message {
let mut vec: Vec<u8> = vec![];
Expand Down Expand Up @@ -144,6 +165,26 @@ impl NewLimitOrder {
}
}

impl From<NewMarketOrder> for Order {
fn from(value: NewMarketOrder) -> Self {
Self {
id: value.id,
price: Decimal::ZERO, // market orders don't have a price.
leverage: value.leverage.to_f32().expect("to fit"),
contract_symbol: value.contract_symbol,
trader_id: value.trader_id,
direction: value.direction,
quantity: value.quantity,
order_type: OrderType::Market,
timestamp: OffsetDateTime::now_utc(),
expiry: value.expiry,
order_state: OrderState::Open,
order_reason: OrderReason::Manual,
stable: false,
}
}
}

impl NewMarketOrder {
pub fn message(&self) -> Message {
let mut vec: Vec<u8> = vec![];
Expand Down
2 changes: 1 addition & 1 deletion crates/xxi-node/src/commons/trade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct TradeParams {
/// The filling information from the orderbook
///
/// This is used by the coordinator to be able to make sure both trading parties are acting.
/// The `quantity` has to match the cummed up quantities of the matches in `filled_with`.
/// The `quantity` has to match the summed up quantities of the matches in `filled_with`.
pub filled_with: FilledWith,
}

Expand Down

0 comments on commit 602eca1

Please sign in to comment.