Skip to content

Commit

Permalink
[rpc] Fatal getSignaturesForAddress() when Bigtable errors (#3700)
Browse files Browse the repository at this point in the history
* Unindent code in `get_signatures_for_address`

* Add a custom JSON-RPC error to throw when long-term storage (ie. Bigtable) can't be reached

* When the `before`/`until` signatures can't be found, throw `SignatureNotFound` instead of `RowNotFound`

* Fatal `getSignaturesForAddress` calls when Bigtable must be queried but can't be reached
  • Loading branch information
steveluscher authored Nov 25, 2024
1 parent 657108c commit 52f132c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 97 deletions.
8 changes: 8 additions & 0 deletions rpc-client-api/src/custom_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub const JSON_RPC_SERVER_ERROR_UNSUPPORTED_TRANSACTION_VERSION: i64 = -32015;
pub const JSON_RPC_SERVER_ERROR_MIN_CONTEXT_SLOT_NOT_REACHED: i64 = -32016;
pub const JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE: i64 = -32017;
pub const JSON_RPC_SERVER_ERROR_SLOT_NOT_EPOCH_BOUNDARY: i64 = -32018;
pub const JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_UNREACHABLE: i64 = -32019;

#[derive(Error, Debug)]
pub enum RpcCustomError {
Expand Down Expand Up @@ -75,6 +76,8 @@ pub enum RpcCustomError {
},
#[error("SlotNotEpochBoundary")]
SlotNotEpochBoundary { slot: Slot },
#[error("LongTermStorageUnreachable")]
LongTermStorageUnreachable,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -244,6 +247,11 @@ impl From<RpcCustomError> for Error {
),
data: None,
},
RpcCustomError::LongTermStorageUnreachable => Self {
code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_UNREACHABLE),
message: "Failed to query long-term storage; please try again".to_string(),
data: None,
},
}
}
}
190 changes: 95 additions & 95 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1689,118 +1689,118 @@ impl JsonRpcRequestProcessor {
let commitment = config.commitment.unwrap_or_default();
check_is_at_least_confirmed(commitment)?;

if self.config.enable_rpc_transaction_history {
let highest_super_majority_root = self
.block_commitment_cache
.read()
.unwrap()
.highest_super_majority_root();
let highest_slot = if commitment.is_confirmed() {
let confirmed_bank = self.get_bank_with_config(config)?;
confirmed_bank.slot()
} else {
let min_context_slot = config.min_context_slot.unwrap_or_default();
if highest_super_majority_root < min_context_slot {
return Err(RpcCustomError::MinContextSlotNotReached {
context_slot: highest_super_majority_root,
}
.into());
if !self.config.enable_rpc_transaction_history {
return Err(RpcCustomError::TransactionHistoryNotAvailable.into());
}

let highest_super_majority_root = self
.block_commitment_cache
.read()
.unwrap()
.highest_super_majority_root();
let highest_slot = if commitment.is_confirmed() {
let confirmed_bank = self.get_bank_with_config(config)?;
confirmed_bank.slot()
} else {
let min_context_slot = config.min_context_slot.unwrap_or_default();
if highest_super_majority_root < min_context_slot {
return Err(RpcCustomError::MinContextSlotNotReached {
context_slot: highest_super_majority_root,
}
highest_super_majority_root
};
.into());
}
highest_super_majority_root
};

let SignatureInfosForAddress {
infos: mut results,
found_before,
} = self
.blockstore
.get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit)
.map_err(|err| Error::invalid_params(format!("{err}")))?;
let SignatureInfosForAddress {
infos: mut results,
found_before,
} = self
.blockstore
.get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit)
.map_err(|err| Error::invalid_params(format!("{err}")))?;

let map_results = |results: Vec<ConfirmedTransactionStatusWithSignature>| {
results
.into_iter()
.map(|x| {
let mut item: RpcConfirmedTransactionStatusWithSignature = x.into();
if item.slot <= highest_super_majority_root {
item.confirmation_status =
Some(TransactionConfirmationStatus::Finalized);
} else {
item.confirmation_status =
Some(TransactionConfirmationStatus::Confirmed);
if item.block_time.is_none() {
let r_bank_forks = self.bank_forks.read().unwrap();
item.block_time = r_bank_forks
.get(item.slot)
.map(|bank| bank.clock().unix_timestamp);
}
let map_results = |results: Vec<ConfirmedTransactionStatusWithSignature>| {
results
.into_iter()
.map(|x| {
let mut item: RpcConfirmedTransactionStatusWithSignature = x.into();
if item.slot <= highest_super_majority_root {
item.confirmation_status = Some(TransactionConfirmationStatus::Finalized);
} else {
item.confirmation_status = Some(TransactionConfirmationStatus::Confirmed);
if item.block_time.is_none() {
let r_bank_forks = self.bank_forks.read().unwrap();
item.block_time = r_bank_forks
.get(item.slot)
.map(|bank| bank.clock().unix_timestamp);
}
item
})
.collect()
};

if results.len() < limit {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let mut bigtable_before = before;
if !results.is_empty() {
limit -= results.len();
bigtable_before = results.last().map(|x| x.signature);
}
item
})
.collect()
};

// If the oldest address-signature found in Blockstore has not yet been
// uploaded to long-term storage, modify the storage query to return all latest
// signatures to prevent erroring on RowNotFound. This can race with upload.
if found_before && bigtable_before.is_some() {
match bigtable_ledger_storage
.get_signature_status(&bigtable_before.unwrap())
.await
{
Err(StorageError::SignatureNotFound) => {
bigtable_before = None;
}
Err(err) => {
warn!("{:?}", err);
return Ok(map_results(results));
}
Ok(_) => {}
if results.len() < limit {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let mut bigtable_before = before;
if !results.is_empty() {
limit -= results.len();
bigtable_before = results.last().map(|x| x.signature);
}

// If the oldest address-signature found in Blockstore has not yet been
// uploaded to long-term storage, modify the storage query to return all latest
// signatures to prevent erroring on RowNotFound. This can race with upload.
if found_before && bigtable_before.is_some() {
match bigtable_ledger_storage
.get_signature_status(&bigtable_before.unwrap())
.await
{
Err(StorageError::SignatureNotFound) => {
bigtable_before = None;
}
Err(err) => {
warn!("Failed to query Bigtable: {:?}", err);
return Err(RpcCustomError::LongTermStorageUnreachable.into());
}
Ok(_) => {}
}
}

let bigtable_results = bigtable_ledger_storage
.get_confirmed_signatures_for_address(
&address,
bigtable_before.as_ref(),
until.as_ref(),
limit,
)
.await;
match bigtable_results {
Ok(bigtable_results) => {
let results_set: HashSet<_> =
results.iter().map(|result| result.signature).collect();
for (bigtable_result, _) in bigtable_results {
// In the upload race condition, latest address-signatures in
// long-term storage may include original `before` signature...
if before != Some(bigtable_result.signature)
let bigtable_results = bigtable_ledger_storage
.get_confirmed_signatures_for_address(
&address,
bigtable_before.as_ref(),
until.as_ref(),
limit,
)
.await;
match bigtable_results {
Ok(bigtable_results) => {
let results_set: HashSet<_> =
results.iter().map(|result| result.signature).collect();
for (bigtable_result, _) in bigtable_results {
// In the upload race condition, latest address-signatures in
// long-term storage may include original `before` signature...
if before != Some(bigtable_result.signature)
// ...or earlier Blockstore signatures
&& !results_set.contains(&bigtable_result.signature)
{
results.push(bigtable_result);
}
{
results.push(bigtable_result);
}
}
Err(err) => {
warn!("{:?}", err);
}
}
Err(StorageError::SignatureNotFound) => {}
Err(err) => {
warn!("Failed to query Bigtable: {:?}", err);
return Err(RpcCustomError::LongTermStorageUnreachable.into());
}
}
}

Ok(map_results(results))
} else {
Err(RpcCustomError::TransactionHistoryNotAvailable.into())
}

Ok(map_results(results))
}

pub async fn get_first_available_block(&self) -> Slot {
Expand Down
12 changes: 10 additions & 2 deletions storage-bigtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,11 @@ impl LedgerStorage {
Some(before_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", before_signature.to_string())
.await?;
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::SignatureNotFound,
_ => err.into(),
})?;

(slot, index)
}
Expand All @@ -808,7 +812,11 @@ impl LedgerStorage {
Some(until_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", until_signature.to_string())
.await?;
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::SignatureNotFound,
_ => err.into(),
})?;

(slot, index)
}
Expand Down

0 comments on commit 52f132c

Please sign in to comment.