Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reorg threshold bug #5804

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ impl Blockchain for Chain {
// present in the DB.
Box::new(PollingBlockIngestor::new(
logger,
graph::env::ENV_VARS.reorg_threshold,
graph::env::ENV_VARS.reorg_threshold(),
self.chain_client(),
self.chain_store().cheap_clone(),
self.polling_ingestor_interval,
Expand Down
4 changes: 2 additions & 2 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,9 @@ impl Graft {
// The graft point must be at least `reorg_threshold` blocks
// behind the subgraph head so that a reorg can not affect the
// data that we copy for grafting
(Some(ptr), true) if self.block + ENV_VARS.reorg_threshold > ptr.number => Err(GraftBaseInvalid(format!(
(Some(ptr), true) if self.block + ENV_VARS.reorg_threshold() > ptr.number => Err(GraftBaseInvalid(format!(
"failed to graft onto `{}` at block {} since it's only at block {} which is within the reorg threshold of {} blocks",
self.base, self.block, ptr.number, ENV_VARS.reorg_threshold
self.base, self.block, ptr.number, ENV_VARS.reorg_threshold()
))),
// If the base deployment is failed *and* the `graft.block` is not
// less than the `base.block`, the graft shouldn't be permitted.
Expand Down
36 changes: 21 additions & 15 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod store;
use envconfig::Envconfig;
use lazy_static::lazy_static;
use semver::Version;
use std::sync::Mutex;
use std::{collections::HashSet, env::VarError, fmt, str::FromStr, time::Duration};

use self::graphql::*;
Expand All @@ -17,6 +18,7 @@ use crate::{

lazy_static! {
pub static ref ENV_VARS: EnvVars = EnvVars::from_env().unwrap();
pub static ref TEST_WITH_NO_REORG: Mutex<bool> = Mutex::new(false);
}

/// Panics if:
Expand Down Expand Up @@ -181,7 +183,7 @@ pub struct EnvVars {
pub static_filters_threshold: usize,
/// Set by the environment variable `ETHEREUM_REORG_THRESHOLD`. The default
/// value is 250 blocks.
pub reorg_threshold: BlockNumber,
reorg_threshold: BlockNumber,
/// The time to wait between polls when using polling block ingestor.
/// The value is set by `ETHERUM_POLLING_INTERVAL` in millis and the
/// default is 1000.
Expand Down Expand Up @@ -249,16 +251,6 @@ impl EnvVars {
let mapping_handlers = InnerMappingHandlers::init_from_env()?.into();
let store = InnerStore::init_from_env()?.into();

// The default reorganization (reorg) threshold is set to 250.
// For testing purposes, we need to set this threshold to 0 because:
// 1. Many tests involve reverting blocks.
// 2. Blocks cannot be reverted below the reorg threshold.
// Therefore, during tests, we want to set the reorg threshold to 0.
let reorg_threshold =
inner
.reorg_threshold
.unwrap_or_else(|| if cfg!(debug_assertions) { 0 } else { 250 });

Ok(Self {
graphql,
mappings: mapping_handlers,
Expand Down Expand Up @@ -312,13 +304,15 @@ impl EnvVars {
external_http_base_url: inner.external_http_base_url,
external_ws_base_url: inner.external_ws_base_url,
static_filters_threshold: inner.static_filters_threshold,
reorg_threshold,
reorg_threshold: inner.reorg_threshold,
ingestor_polling_interval: Duration::from_millis(inner.ingestor_polling_interval),
subgraph_settings: inner.subgraph_settings,
prefer_substreams_block_streams: inner.prefer_substreams_block_streams,
enable_dips_metrics: inner.enable_dips_metrics.0,
history_blocks_override: inner.history_blocks_override,
min_history_blocks: inner.min_history_blocks.unwrap_or(2 * reorg_threshold),
min_history_blocks: inner
.min_history_blocks
.unwrap_or(2 * inner.reorg_threshold),
dips_metrics_object_store_url: inner.dips_metrics_object_store_url,
section_map: inner.section_map,
firehose_grpc_max_decode_size_mb: inner.firehose_grpc_max_decode_size_mb,
Expand Down Expand Up @@ -362,6 +356,18 @@ impl EnvVars {
.filter(|x| !x.is_empty())
.collect()
}
pub fn reorg_threshold(&self) -> i32 {
// The default reorganization (reorg) threshold is set to 250.
// For testing purposes, we need to set this threshold to 0 because:
// 1. Many tests involve reverting blocks.
// 2. Blocks cannot be reverted below the reorg threshold.
// Therefore, during tests, we want to set the reorg threshold to 0.
if *TEST_WITH_NO_REORG.lock().unwrap() {
0
} else {
self.reorg_threshold
}
}
}

impl Default for EnvVars {
Expand Down Expand Up @@ -460,8 +466,8 @@ struct Inner {
#[envconfig(from = "GRAPH_STATIC_FILTERS_THRESHOLD", default = "10000")]
static_filters_threshold: usize,
// JSON-RPC specific.
#[envconfig(from = "ETHEREUM_REORG_THRESHOLD")]
reorg_threshold: Option<BlockNumber>,
#[envconfig(from = "ETHEREUM_REORG_THRESHOLD", default = "250")]
reorg_threshold: BlockNumber,
#[envconfig(from = "ETHEREUM_POLLING_INTERVAL", default = "1000")]
ingestor_polling_interval: u64,
#[envconfig(from = "GRAPH_EXPERIMENTAL_SUBGRAPH_SETTINGS")]
Expand Down
2 changes: 1 addition & 1 deletion node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ pub async fn networks_as_chains(
Arc::new(adapter_selector),
Arc::new(EthereumRuntimeAdapterBuilder {}),
eth_adapters,
ENV_VARS.reorg_threshold,
ENV_VARS.reorg_threshold(),
polling_interval,
true,
);
Expand Down
6 changes: 3 additions & 3 deletions node/src/manager/commands/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ pub async fn run(

println!("prune {deployment}");
println!(" latest: {latest}");
println!(" final: {}", latest - ENV_VARS.reorg_threshold);
println!(" final: {}", latest - ENV_VARS.reorg_threshold());
println!(" earliest: {}\n", latest - history);

let mut req = PruneRequest::new(
&deployment,
history,
ENV_VARS.reorg_threshold,
ENV_VARS.reorg_threshold(),
status.earliest_block_number,
latest,
)?;
Expand All @@ -217,7 +217,7 @@ pub async fn run(
store.subgraph_store().set_history_blocks(
&deployment,
history,
ENV_VARS.reorg_threshold,
ENV_VARS.reorg_threshold(),
)?;
}

Expand Down
4 changes: 2 additions & 2 deletions node/src/manager/commands/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ pub async fn run(
let deployment_details = deployment_store.deployment_details_for_id(locator)?;
let block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0);

if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold {
if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold() {
bail!(
"The block number {} is not safe to rewind to for deployment {}. The earliest block number of this deployment is {}. You can only safely rewind to block number {}",
block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0),
locator,
deployment_details.earliest_block_number,
deployment_details.earliest_block_number + ENV_VARS.reorg_threshold
deployment_details.earliest_block_number + ENV_VARS.reorg_threshold()
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ impl BlockStore {
};

if let Some(head_block) = store.remove_cursor(&&store.chain)? {
let lower_bound = head_block.saturating_sub(ENV_VARS.reorg_threshold * 2);
let lower_bound = head_block.saturating_sub(ENV_VARS.reorg_threshold() * 2);
info!(&self.logger, "Removed cursor for non-firehose chain, now cleaning shallow blocks"; "network" => &store.chain, "lower_bound" => lower_bound);
store.cleanup_shallow_blocks(lower_bound)?;
}
Expand Down
6 changes: 5 additions & 1 deletion store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,14 @@ pub fn revert_block_ptr(
// Work around a Diesel issue with serializing BigDecimals to numeric
let number = format!("{}::numeric", ptr.number);

// Intention is to revert to a block lower than the reorg threshold, on the other
// hand the earliest we can possibly go is genesys block, so go to genesys even
// if it's within the reorg threshold.
let earliest_block = i32::max(ptr.number - ENV_VARS.reorg_threshold(), 0);
let affected_rows = update(
d::table
.filter(d::deployment.eq(id.as_str()))
.filter(d::earliest_block_number.le(ptr.number - ENV_VARS.reorg_threshold)),
.filter(d::earliest_block_number.le(earliest_block)),
)
.set((
d::latest_ethereum_block_number.eq(sql(&number)),
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ impl DeploymentStore {
let req = PruneRequest::new(
&site.as_ref().into(),
history_blocks,
ENV_VARS.reorg_threshold,
ENV_VARS.reorg_threshold(),
earliest_block,
latest_block,
)?;
Expand Down
3 changes: 2 additions & 1 deletion tests/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ impl Config {
.stdout(stdout)
.stderr(stderr)
.args(args)
.env("GRAPH_STORE_WRITE_BATCH_DURATION", "5");
.env("GRAPH_STORE_WRITE_BATCH_DURATION", "5")
.env("ETHEREUM_REORG_THRESHOLD", "0");

status!(
"graph-node",
Expand Down
2 changes: 1 addition & 1 deletion tests/src/fixture/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn chain(
triggers_adapter,
Arc::new(NoopRuntimeAdapterBuilder {}),
eth_adapters,
ENV_VARS.reorg_threshold,
ENV_VARS.reorg_threshold(),
ENV_VARS.ingestor_polling_interval,
// We assume the tested chain is always ingestible for now
true,
Expand Down
4 changes: 3 additions & 1 deletion tests/tests/runner_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use graph::data::store::scalar::Bytes;
use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth};
use graph::data::value::Word;
use graph::data_source::CausalityRegion;
use graph::env::EnvVars;
use graph::env::{EnvVars, TEST_WITH_NO_REORG};
use graph::ipfs;
use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing;
use graph::object;
Expand Down Expand Up @@ -109,6 +109,8 @@ fn assert_eq_ignore_backtrace(err: &SubgraphError, expected: &SubgraphError) {

#[tokio::test]
async fn data_source_revert() -> anyhow::Result<()> {
*TEST_WITH_NO_REORG.lock().unwrap() = true;

let RunnerTestRecipe { stores, test_info } =
RunnerTestRecipe::new("data_source_revert", "data-source-revert").await;

Expand Down
Loading