Skip to content

Commit

Permalink
Add range queries to nasty client (#2317)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer authored Nov 21, 2024
1 parent 13ffc59 commit 9ca9918
Showing 1 changed file with 121 additions and 2 deletions.
123 changes: 121 additions & 2 deletions sequencer/src/bin/nasty-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ use futures::{
stream::{Peekable, StreamExt},
};
use hotshot_query_service::{
availability::{BlockQueryData, LeafQueryData, PayloadQueryData, VidCommonQueryData},
availability::{self, BlockQueryData, LeafQueryData, PayloadQueryData, VidCommonQueryData},
metrics::PrometheusMetrics,
node::TimeWindowQueryData,
types::HeightIndexed,
};
use hotshot_types::traits::{
block_contents::BlockHeader,
metrics::{Counter, Gauge, Histogram, Metrics as _},
};
use hotshot_types::traits::metrics::{Counter, Gauge, Histogram, Metrics as _};
use jf_merkle_tree::{
ForgetableMerkleTreeScheme, MerkleCommitment, MerkleTreeScheme, UniversalMerkleTreeScheme,
};
Expand Down Expand Up @@ -163,6 +167,14 @@ struct ActionDistribution {
#[clap(long, env = "ESPRESSO_NASTY_CLIENT_WEIGHT_QUERY", default_value = "20")]
weight_query: u8,

/// The weight of query range actions in the random distribution.
#[clap(
long,
env = "ESPRESSO_NASTY_CLIENT_WEIGHT_QUERY_RANGE",
default_value = "10"
)]
weight_query_range: u8,

/// The weight of "open stream" actions in the random distribution.
#[clap(
long,
Expand Down Expand Up @@ -224,6 +236,7 @@ impl ActionDistribution {
fn weight(&self, action: ActionDiscriminants) -> u8 {
match action {
ActionDiscriminants::Query => self.weight_query,
ActionDiscriminants::QueryRange => self.weight_query_range,
ActionDiscriminants::OpenStream => self.weight_open_stream,
ActionDiscriminants::CloseStream => self.weight_close_stream,
ActionDiscriminants::PollStream => self.weight_poll_stream,
Expand All @@ -239,6 +252,7 @@ impl ActionDistribution {
struct Metrics {
open_streams: HashMap<Resource, Box<dyn Gauge>>,
query_actions: HashMap<Resource, Box<dyn Counter>>,
query_range_actions: HashMap<Resource, Box<dyn Counter>>,
open_stream_actions: HashMap<Resource, Box<dyn Counter>>,
close_stream_actions: HashMap<Resource, Box<dyn Counter>>,
poll_stream_actions: HashMap<Resource, Box<dyn Counter>>,
Expand Down Expand Up @@ -274,6 +288,18 @@ impl Metrics {
)
})
.collect(),
query_range_actions: Resource::VARIANTS
.iter()
.map(|resource| {
(
*resource,
registry.create_counter(
format!("{}_query_range_actions", resource.singular()),
None,
),
)
})
.collect(),
open_stream_actions: Resource::VARIANTS
.iter()
.map(|resource| {
Expand Down Expand Up @@ -339,6 +365,10 @@ trait Queryable: DeserializeOwned + Debug + Eq {
/// This may be none if the resource does not support fetching by payload hash.
const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str>;

/// Does this object use the large object limit for range queries?
const IS_LARGE_OBJECT: bool;

fn height(&self) -> usize;
fn hash(&self) -> String;
fn payload_hash(&self) -> String;
}
Expand All @@ -347,6 +377,11 @@ impl Queryable for BlockQueryData<SeqTypes> {
const RESOURCE: Resource = Resource::Blocks;
const HASH_URL_SEGMENT: &'static str = "hash";
const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("payload-hash");
const IS_LARGE_OBJECT: bool = true;

fn height(&self) -> usize {
HeightIndexed::height(self) as usize
}

fn hash(&self) -> String {
self.hash().to_string()
Expand All @@ -361,6 +396,11 @@ impl Queryable for LeafQueryData<SeqTypes> {
const RESOURCE: Resource = Resource::Leaves;
const HASH_URL_SEGMENT: &'static str = "hash";
const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = None;
const IS_LARGE_OBJECT: bool = false;

fn height(&self) -> usize {
HeightIndexed::height(self) as usize
}

fn hash(&self) -> String {
self.hash().to_string()
Expand All @@ -375,6 +415,11 @@ impl Queryable for Header {
const RESOURCE: Resource = Resource::Headers;
const HASH_URL_SEGMENT: &'static str = "hash";
const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("payload-hash");
const IS_LARGE_OBJECT: bool = true;

fn height(&self) -> usize {
self.block_number() as usize
}

fn hash(&self) -> String {
self.commit().to_string()
Expand All @@ -389,6 +434,11 @@ impl Queryable for PayloadQueryData<SeqTypes> {
const RESOURCE: Resource = Resource::Payloads;
const HASH_URL_SEGMENT: &'static str = "block-hash";
const PAYLOAD_HASH_URL_SEGMENT: Option<&'static str> = Some("hash");
const IS_LARGE_OBJECT: bool = true;

fn height(&self) -> usize {
HeightIndexed::height(self) as usize
}

fn hash(&self) -> String {
self.block_hash().to_string()
Expand Down Expand Up @@ -591,6 +641,55 @@ impl<T: Queryable> ResourceManager<T> {
Ok(())
}

async fn query_range(&mut self, from: u64, len: u16) -> anyhow::Result<()> {
let from = self.adjust_index(from).await? as usize;
let limits = self
.get::<availability::Limits>("availability/limits")
.await?;
let limit = if T::IS_LARGE_OBJECT {
limits.large_object_range_limit
} else {
limits.small_object_range_limit
};

// Adjust `len`, 10% of the time query above the limit for this type (so the query fails);
// the rest of the time query a valid range.
let max_len = limit * 11 / 10;
let to = self
.adjust_index(from as u64 + (len as u64) % (max_len as u64))
.await? as usize;
match self
.get::<Vec<T>>(format!("availability/{}/{from}/{to}", Self::singular()))
.await
{
Ok(range) => {
ensure!(to - from <= limit, "range endpoint succeeded and returned {} results for request over limit; limit: {limit} from: {from} to: {to}", range.len());
ensure!(range.len() == to - from, "range endpoint returned wrong number of results; from: {from} to: {to} results: {}", range.len());
for (i, obj) in range.iter().enumerate() {
ensure!(
obj.height() == from + i,
"object in range has wrong height; from: {from} to: {to} i: {i} height: {}",
obj.height()
);
}
}
Err(_) if to - from > limit => {
tracing::info!(
limit,
from,
to,
"range query exceeding limit failed as expected"
);
}
Err(err) => {
return Err(err).context("error in range query");
}
}

self.metrics.query_range_actions[&T::RESOURCE].add(1);
Ok(())
}

async fn open_stream(&mut self, from: u64) -> anyhow::Result<()> {
if self.open_streams.len() >= self.cfg.max_open_streams {
tracing::info!(
Expand Down Expand Up @@ -1104,6 +1203,11 @@ enum Action {
resource: Resource,
at: u64,
},
QueryRange {
resource: Resource,
from: u64,
len: u16,
},
OpenStream {
resource: Resource,
from: u64,
Expand Down Expand Up @@ -1147,6 +1251,11 @@ impl Action {
resource: Resource::random(rng),
at: rng.next_u64(),
},
ActionDiscriminants::QueryRange => Self::QueryRange {
resource: Resource::random(rng),
from: rng.next_u64(),
len: (rng.next_u32() % u16::MAX as u32) as u16,
},
ActionDiscriminants::OpenStream => Self::OpenStream {
resource: Resource::random(rng),
from: rng.next_u64(),
Expand Down Expand Up @@ -1209,6 +1318,16 @@ impl Client {
Resource::Headers => self.headers.query(at).await,
Resource::Payloads => self.payloads.query(at).await,
},
Action::QueryRange {
resource,
from,
len,
} => match resource {
Resource::Blocks => self.blocks.query_range(from, len).await,
Resource::Leaves => self.leaves.query_range(from, len).await,
Resource::Headers => self.headers.query_range(from, len).await,
Resource::Payloads => self.payloads.query_range(from, len).await,
},
Action::OpenStream { resource, from } => match resource {
Resource::Blocks => self.blocks.open_stream(from).await,
Resource::Leaves => self.leaves.open_stream(from).await,
Expand Down

0 comments on commit 9ca9918

Please sign in to comment.