From 13ffc5938d1d80fd797eb76e1bca6e5de8361fc9 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Wed, 20 Nov 2024 16:35:36 -0800 Subject: [PATCH 1/2] Recreate indexes on merkle root columns (#2318) --- .../api/migrations/V301__merkle_root_column_indexes.sql | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 sequencer/api/migrations/V301__merkle_root_column_indexes.sql diff --git a/sequencer/api/migrations/V301__merkle_root_column_indexes.sql b/sequencer/api/migrations/V301__merkle_root_column_indexes.sql new file mode 100644 index 000000000..2b904876d --- /dev/null +++ b/sequencer/api/migrations/V301__merkle_root_column_indexes.sql @@ -0,0 +1,6 @@ +-- Migration V16 created these columns and indexed them. Then, migration V36 altered the expression +-- use to populate the generated columns. However, it did so by dropping and re-adding the columns +-- with a different expression, which also caused the indexes on the columns to be dropped. They +-- were erroneously not added back. This migration corrects that error by recreating the indexes. +CREATE INDEX header_block_merkle_tree_root_idx ON header (block_merkle_tree_root); +CREATE INDEX header_fee_merkle_tree_root_idx ON header (fee_merkle_tree_root); From 9ca9918a0076da59566f055d47f7bcb9054b37e9 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Thu, 21 Nov 2024 07:59:11 -0800 Subject: [PATCH 2/2] Add range queries to nasty client (#2317) --- sequencer/src/bin/nasty-client.rs | 123 +++++++++++++++++++++++++++++- 1 file changed, 121 insertions(+), 2 deletions(-) diff --git a/sequencer/src/bin/nasty-client.rs b/sequencer/src/bin/nasty-client.rs index 7a5829d67..7fff64412 100644 --- a/sequencer/src/bin/nasty-client.rs +++ b/sequencer/src/bin/nasty-client.rs @@ -25,11 +25,15 @@ use futures::{ stream::{Peekable, StreamExt}, }; use hotshot_query_service::{ - availability::{BlockQueryData, LeafQueryData, PayloadQueryData, VidCommonQueryData}, + availability::{self, BlockQueryData, LeafQueryData, PayloadQueryData, VidCommonQueryData}, metrics::PrometheusMetrics, node::TimeWindowQueryData, + types::HeightIndexed, +}; +use hotshot_types::traits::{ + block_contents::BlockHeader, + metrics::{Counter, Gauge, Histogram, Metrics as _}, }; -use hotshot_types::traits::metrics::{Counter, Gauge, Histogram, Metrics as _}; use jf_merkle_tree::{ ForgetableMerkleTreeScheme, MerkleCommitment, MerkleTreeScheme, UniversalMerkleTreeScheme, }; @@ -163,6 +167,14 @@ struct ActionDistribution { #[clap(long, env = "ESPRESSO_NASTY_CLIENT_WEIGHT_QUERY", default_value = "20")] weight_query: u8, + /// The weight of query range actions in the random distribution. + #[clap( + long, + env = "ESPRESSO_NASTY_CLIENT_WEIGHT_QUERY_RANGE", + default_value = "10" + )] + weight_query_range: u8, + /// The weight of "open stream" actions in the random distribution. #[clap( long, @@ -224,6 +236,7 @@ impl ActionDistribution { fn weight(&self, action: ActionDiscriminants) -> u8 { match action { ActionDiscriminants::Query => self.weight_query, + ActionDiscriminants::QueryRange => self.weight_query_range, ActionDiscriminants::OpenStream => self.weight_open_stream, ActionDiscriminants::CloseStream => self.weight_close_stream, ActionDiscriminants::PollStream => self.weight_poll_stream, @@ -239,6 +252,7 @@ impl ActionDistribution { struct Metrics { open_streams: HashMap>, query_actions: HashMap>, + query_range_actions: HashMap>, open_stream_actions: HashMap>, close_stream_actions: HashMap>, poll_stream_actions: HashMap>, @@ -274,6 +288,18 @@ impl Metrics { ) }) .collect(), + query_range_actions: Resource::VARIANTS + .iter() + .map(|resource| { + ( + *resource, + registry.create_counter( + format!("{}_query_range_actions", resource.singular()), + None, + ), + ) + }) + .collect(), open_stream_actions: Resource::VARIANTS .iter() .map(|resource| { @@ -339,6 +365,10 @@ trait Queryable: DeserializeOwned + Debug + Eq { /// This may be none if the resource does not support fetching by payload hash. const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str>; + /// Does this object use the large object limit for range queries? + const IS_LARGE_OBJECT: bool; + + fn height(&self) -> usize; fn hash(&self) -> String; fn payload_hash(&self) -> String; } @@ -347,6 +377,11 @@ impl Queryable for BlockQueryData { const RESOURCE: Resource = Resource::Blocks; const HASH_URL_SEGMENT: &'static str = "hash"; const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("payload-hash"); + const IS_LARGE_OBJECT: bool = true; + + fn height(&self) -> usize { + HeightIndexed::height(self) as usize + } fn hash(&self) -> String { self.hash().to_string() @@ -361,6 +396,11 @@ impl Queryable for LeafQueryData { const RESOURCE: Resource = Resource::Leaves; const HASH_URL_SEGMENT: &'static str = "hash"; const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = None; + const IS_LARGE_OBJECT: bool = false; + + fn height(&self) -> usize { + HeightIndexed::height(self) as usize + } fn hash(&self) -> String { self.hash().to_string() @@ -375,6 +415,11 @@ impl Queryable for Header { const RESOURCE: Resource = Resource::Headers; const HASH_URL_SEGMENT: &'static str = "hash"; const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("payload-hash"); + const IS_LARGE_OBJECT: bool = true; + + fn height(&self) -> usize { + self.block_number() as usize + } fn hash(&self) -> String { self.commit().to_string() @@ -389,6 +434,11 @@ impl Queryable for PayloadQueryData { const RESOURCE: Resource = Resource::Payloads; const HASH_URL_SEGMENT: &'static str = "block-hash"; const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("hash"); + const IS_LARGE_OBJECT: bool = true; + + fn height(&self) -> usize { + HeightIndexed::height(self) as usize + } fn hash(&self) -> String { self.block_hash().to_string() @@ -591,6 +641,55 @@ impl ResourceManager { Ok(()) } + async fn query_range(&mut self, from: u64, len: u16) -> anyhow::Result<()> { + let from = self.adjust_index(from).await? as usize; + let limits = self + .get::("availability/limits") + .await?; + let limit = if T::IS_LARGE_OBJECT { + limits.large_object_range_limit + } else { + limits.small_object_range_limit + }; + + // Adjust `len`, 10% of the time query above the limit for this type (so the query fails); + // the rest of the time query a valid range. + let max_len = limit * 11 / 10; + let to = self + .adjust_index(from as u64 + (len as u64) % (max_len as u64)) + .await? as usize; + match self + .get::>(format!("availability/{}/{from}/{to}", Self::singular())) + .await + { + Ok(range) => { + ensure!(to - from <= limit, "range endpoint succeeded and returned {} results for request over limit; limit: {limit} from: {from} to: {to}", range.len()); + ensure!(range.len() == to - from, "range endpoint returned wrong number of results; from: {from} to: {to} results: {}", range.len()); + for (i, obj) in range.iter().enumerate() { + ensure!( + obj.height() == from + i, + "object in range has wrong height; from: {from} to: {to} i: {i} height: {}", + obj.height() + ); + } + } + Err(_) if to - from > limit => { + tracing::info!( + limit, + from, + to, + "range query exceeding limit failed as expected" + ); + } + Err(err) => { + return Err(err).context("error in range query"); + } + } + + self.metrics.query_range_actions[&T::RESOURCE].add(1); + Ok(()) + } + async fn open_stream(&mut self, from: u64) -> anyhow::Result<()> { if self.open_streams.len() >= self.cfg.max_open_streams { tracing::info!( @@ -1104,6 +1203,11 @@ enum Action { resource: Resource, at: u64, }, + QueryRange { + resource: Resource, + from: u64, + len: u16, + }, OpenStream { resource: Resource, from: u64, @@ -1147,6 +1251,11 @@ impl Action { resource: Resource::random(rng), at: rng.next_u64(), }, + ActionDiscriminants::QueryRange => Self::QueryRange { + resource: Resource::random(rng), + from: rng.next_u64(), + len: (rng.next_u32() % u16::MAX as u32) as u16, + }, ActionDiscriminants::OpenStream => Self::OpenStream { resource: Resource::random(rng), from: rng.next_u64(), @@ -1209,6 +1318,16 @@ impl Client { Resource::Headers => self.headers.query(at).await, Resource::Payloads => self.payloads.query(at).await, }, + Action::QueryRange { + resource, + from, + len, + } => match resource { + Resource::Blocks => self.blocks.query_range(from, len).await, + Resource::Leaves => self.leaves.query_range(from, len).await, + Resource::Headers => self.headers.query_range(from, len).await, + Resource::Payloads => self.payloads.query_range(from, len).await, + }, Action::OpenStream { resource, from } => match resource { Resource::Blocks => self.blocks.open_stream(from).await, Resource::Leaves => self.leaves.open_stream(from).await,