-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental solvable orders cache update #2923
Changes from all commits
bee782a
0b8c97d
22abb5e
ab2f8cb
e6a4a64
6984418
ac2f36a
efb7389
c185443
3a12bac
5786d82
e7165f6
d630e9e
fe50a81
350191f
a2262cb
298e31a
cea8850
8eb3e55
c883090
53969d5
3dc9243
2d0813b
6d012c5
8551f81
efc0d58
c173a33
d8b12a4
a5a0769
25faa4e
d34cc12
8868079
1d2efee
b5a869e
b2a04f9
b06e2a9
396fc43
157e6c0
f93f2f6
902687b
35cd677
8ca4434
945b7cc
e9d5463
de7324c
9e05929
c1a5cfa
11d9db5
85c9d16
9035848
a0f1d9e
b70a134
5b9d146
62907ed
6e61210
7004db3
c20b156
8eafd2b
a9262dd
065bed0
0e51662
1720b1c
772520b
157c126
779d6b9
1d3b179
65a630a
cd54bc4
3d7e0d4
6dd8299
79a989a
57d46fb
2604f12
c16ec11
2740db0
276f1b5
f1951c9
4b91878
c494446
2cc9125
ab0d5d5
ef721b5
86c3f45
fd4d039
73976cf
86cb105
b6c5538
6baaf2c
6519810
0c8eeee
4cbf63f
a10b4e3
7176646
1afe8ed
dd8b362
2c8a45c
ea84a28
72d8b7a
55eb516
459bf87
fb8dad3
db3a1b5
4376b22
bd95564
b24d9cc
1f94ebe
c724e35
b012d45
9438fa0
ac250e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -6,8 +6,9 @@ use { | |||||||||||||||||||||||||||||
infra::persistence::dto::AuctionId, | ||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||
anyhow::Context, | ||||||||||||||||||||||||||||||
bigdecimal::ToPrimitive, | ||||||||||||||||||||||||||||||
boundary::database::byte_array::ByteArray, | ||||||||||||||||||||||||||||||
chrono::Utc, | ||||||||||||||||||||||||||||||
chrono::{DateTime, Utc}, | ||||||||||||||||||||||||||||||
database::{ | ||||||||||||||||||||||||||||||
order_events::OrderEventLabel, | ||||||||||||||||||||||||||||||
orders::{ | ||||||||||||||||||||||||||||||
|
@@ -22,10 +23,14 @@ use { | |||||||||||||||||||||||||||||
SellTokenSource as DomainSellTokenSource, | ||||||||||||||||||||||||||||||
SigningScheme as DomainSigningScheme, | ||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||
number::conversions::{big_decimal_to_u256, u256_to_big_decimal}, | ||||||||||||||||||||||||||||||
futures::{StreamExt, TryStreamExt}, | ||||||||||||||||||||||||||||||
itertools::Itertools, | ||||||||||||||||||||||||||||||
number::conversions::{big_decimal_to_u256, u256_to_big_decimal, u256_to_big_uint}, | ||||||||||||||||||||||||||||||
primitive_types::{H160, H256}, | ||||||||||||||||||||||||||||||
shared::db_order_conversions::full_order_into_model_order, | ||||||||||||||||||||||||||||||
std::{ | ||||||||||||||||||||||||||||||
collections::{HashMap, HashSet}, | ||||||||||||||||||||||||||||||
ops::DerefMut, | ||||||||||||||||||||||||||||||
sync::Arc, | ||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||
tracing::Instrument, | ||||||||||||||||||||||||||||||
|
@@ -71,12 +76,13 @@ impl Persistence { | |||||||||||||||||||||||||||||
.map_err(DatabaseError) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
pub async fn solvable_orders( | ||||||||||||||||||||||||||||||
/// Finds solvable orders based on the order's min validity period. | ||||||||||||||||||||||||||||||
pub async fn all_solvable_orders( | ||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||
min_valid_to: u32, | ||||||||||||||||||||||||||||||
) -> Result<boundary::SolvableOrders, DatabaseError> { | ||||||||||||||||||||||||||||||
self.postgres | ||||||||||||||||||||||||||||||
.solvable_orders(min_valid_to) | ||||||||||||||||||||||||||||||
.all_solvable_orders(min_valid_to) | ||||||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||||||
.map_err(DatabaseError) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
@@ -395,6 +401,139 @@ impl Persistence { | |||||||||||||||||||||||||||||
Ok(solution) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/// Computes solvable orders based on the latest observed block number, | ||||||||||||||||||||||||||||||
/// order creation timestamp, and minimum validity period. | ||||||||||||||||||||||||||||||
pub async fn solvable_orders_after( | ||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||
current_orders: HashMap<domain::OrderUid, model::order::Order>, | ||||||||||||||||||||||||||||||
after_timestamp: DateTime<Utc>, | ||||||||||||||||||||||||||||||
after_block: u64, | ||||||||||||||||||||||||||||||
min_valid_to: u32, | ||||||||||||||||||||||||||||||
) -> anyhow::Result<boundary::SolvableOrders> { | ||||||||||||||||||||||||||||||
let after_block = i64::try_from(after_block).context("block number value exceeds i64")?; | ||||||||||||||||||||||||||||||
let mut tx = self.postgres.pool.begin().await.context("begin")?; | ||||||||||||||||||||||||||||||
// Set the transaction isolation level to REPEATABLE READ | ||||||||||||||||||||||||||||||
// so all the SELECT queries below are executed in the same database snapshot | ||||||||||||||||||||||||||||||
// taken at the moment before the first query is executed. | ||||||||||||||||||||||||||||||
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") | ||||||||||||||||||||||||||||||
.execute(tx.deref_mut()) | ||||||||||||||||||||||||||||||
.await?; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// Fetch the orders that were updated after the given block and were created or | ||||||||||||||||||||||||||||||
// cancelled after the given timestamp. | ||||||||||||||||||||||||||||||
let next_orders: HashMap<domain::OrderUid, model::order::Order> = { | ||||||||||||||||||||||||||||||
let _timer = Metrics::get() | ||||||||||||||||||||||||||||||
.database_queries | ||||||||||||||||||||||||||||||
.with_label_values(&["open_orders_after"]) | ||||||||||||||||||||||||||||||
.start_timer(); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
database::orders::open_orders_after(&mut tx, after_block, after_timestamp) | ||||||||||||||||||||||||||||||
.map(|result| match result { | ||||||||||||||||||||||||||||||
Ok(order) => full_order_into_model_order(order) | ||||||||||||||||||||||||||||||
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)), | ||||||||||||||||||||||||||||||
Err(err) => Err(anyhow::Error::from(err)), | ||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||
.try_collect() | ||||||||||||||||||||||||||||||
.await? | ||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// Fetch quotes for new orders and also update them for the cached ones since | ||||||||||||||||||||||||||||||
// they could also be updated. | ||||||||||||||||||||||||||||||
Comment on lines
+440
to
+441
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also needed because ethflow orders could theoretically be reorged? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because of this code: services/crates/autopilot/src/database/onchain_order_events.rs Lines 329 to 342 in e7de6bf
|
||||||||||||||||||||||||||||||
let updated_quotes = { | ||||||||||||||||||||||||||||||
let _timer = Metrics::get() | ||||||||||||||||||||||||||||||
.database_queries | ||||||||||||||||||||||||||||||
.with_label_values(&["read_quotes"]) | ||||||||||||||||||||||||||||||
.start_timer(); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
let all_order_uids = next_orders | ||||||||||||||||||||||||||||||
.keys() | ||||||||||||||||||||||||||||||
.chain(current_orders.keys()) | ||||||||||||||||||||||||||||||
.unique() | ||||||||||||||||||||||||||||||
.map(|uid| ByteArray(uid.0)) | ||||||||||||||||||||||||||||||
.collect::<Vec<_>>(); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
database::orders::read_quotes(&mut tx, &all_order_uids) | ||||||||||||||||||||||||||||||
.await? | ||||||||||||||||||||||||||||||
.into_iter() | ||||||||||||||||||||||||||||||
.filter_map(|quote| { | ||||||||||||||||||||||||||||||
let order_uid = domain::OrderUid(quote.order_uid.0); | ||||||||||||||||||||||||||||||
dto::quote::into_domain(quote) | ||||||||||||||||||||||||||||||
.map_err(|err| { | ||||||||||||||||||||||||||||||
tracing::warn!(?order_uid, ?err, "failed to convert quote from db") | ||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||
.ok() | ||||||||||||||||||||||||||||||
.map(|quote| (order_uid, quote)) | ||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||
.collect() | ||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
let latest_settlement_block = database::orders::latest_settlement_block(&mut tx) | ||||||||||||||||||||||||||||||
.await? | ||||||||||||||||||||||||||||||
.to_u64() | ||||||||||||||||||||||||||||||
.context("latest_settlement_block is not u64")?; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
Self::build_solvable_orders( | ||||||||||||||||||||||||||||||
current_orders, | ||||||||||||||||||||||||||||||
next_orders, | ||||||||||||||||||||||||||||||
updated_quotes, | ||||||||||||||||||||||||||||||
latest_settlement_block, | ||||||||||||||||||||||||||||||
min_valid_to, | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
fn build_solvable_orders( | ||||||||||||||||||||||||||||||
mut current_orders: HashMap<domain::OrderUid, model::order::Order>, | ||||||||||||||||||||||||||||||
next_orders: HashMap<domain::OrderUid, model::order::Order>, | ||||||||||||||||||||||||||||||
mut next_quotes: HashMap<domain::OrderUid, domain::Quote>, | ||||||||||||||||||||||||||||||
latest_settlement_block: u64, | ||||||||||||||||||||||||||||||
min_valid_to: u32, | ||||||||||||||||||||||||||||||
) -> anyhow::Result<boundary::SolvableOrders> { | ||||||||||||||||||||||||||||||
// Blindly insert all new orders into the cache. | ||||||||||||||||||||||||||||||
for (uid, order) in next_orders { | ||||||||||||||||||||||||||||||
current_orders.insert(uid, order); | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// Filter out all the invalid orders. | ||||||||||||||||||||||||||||||
current_orders.retain(|_uid, order| { | ||||||||||||||||||||||||||||||
let expired = order.data.valid_to < min_valid_to | ||||||||||||||||||||||||||||||
|| order | ||||||||||||||||||||||||||||||
.metadata | ||||||||||||||||||||||||||||||
.ethflow_data | ||||||||||||||||||||||||||||||
.as_ref() | ||||||||||||||||||||||||||||||
.is_some_and(|data| data.user_valid_to < i64::from(min_valid_to)); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
let invalidated = order.metadata.invalidated; | ||||||||||||||||||||||||||||||
let onchain_error = order | ||||||||||||||||||||||||||||||
.metadata | ||||||||||||||||||||||||||||||
.onchain_order_data | ||||||||||||||||||||||||||||||
.as_ref() | ||||||||||||||||||||||||||||||
.is_some_and(|data| data.placement_error.is_some()); | ||||||||||||||||||||||||||||||
let fulfilled = { | ||||||||||||||||||||||||||||||
match order.data.kind { | ||||||||||||||||||||||||||||||
model::order::OrderKind::Sell => { | ||||||||||||||||||||||||||||||
order.metadata.executed_sell_amount | ||||||||||||||||||||||||||||||
>= u256_to_big_uint(&order.data.sell_amount) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
model::order::OrderKind::Buy => { | ||||||||||||||||||||||||||||||
order.metadata.executed_buy_amount | ||||||||||||||||||||||||||||||
>= u256_to_big_uint(&order.data.buy_amount) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
!expired && !invalidated && !onchain_error && !fulfilled | ||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// Keep only relevant quotes. | ||||||||||||||||||||||||||||||
next_quotes.retain(|uid, _quote| current_orders.contains_key(uid)); | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
Ok(boundary::SolvableOrders { | ||||||||||||||||||||||||||||||
orders: current_orders, | ||||||||||||||||||||||||||||||
quotes: next_quotes, | ||||||||||||||||||||||||||||||
latest_settlement_block, | ||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/// Returns the oldest settlement event for which the accociated auction is | ||||||||||||||||||||||||||||||
/// not yet populated in the database. | ||||||||||||||||||||||||||||||
pub async fn get_settlement_without_auction( | ||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this
await
be parallelized with thelatest_settlement_block()
one? 🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, since there is an opened transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Are you saying that in order to get the quote, we need to first read the latest settlement block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an opened tx which doesn't allow executing SQL queries in parallel:
services/crates/autopilot/src/infra/persistence/mod.rs
Lines 400 to 406 in a10b4e3
We should fetch all the data using a single DB snapshot even for quotes since there is a logic of updating them based on the onchain data:
services/crates/autopilot/src/database/onchain_order_events.rs
Lines 329 to 342 in e7de6bf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, shouldn't we use the same
tx
for fetching the quote?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right. Looks like I reverted the wrong change during one of the reworks. Thanks!