diff --git a/rpc-client-api/src/custom_error.rs b/rpc-client-api/src/custom_error.rs index c441c94341cdba..57381f2676fea3 100644 --- a/rpc-client-api/src/custom_error.rs +++ b/rpc-client-api/src/custom_error.rs @@ -26,6 +26,7 @@ pub const JSON_RPC_SERVER_ERROR_UNSUPPORTED_TRANSACTION_VERSION: i64 = -32015; pub const JSON_RPC_SERVER_ERROR_MIN_CONTEXT_SLOT_NOT_REACHED: i64 = -32016; pub const JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE: i64 = -32017; pub const JSON_RPC_SERVER_ERROR_SLOT_NOT_EPOCH_BOUNDARY: i64 = -32018; +pub const JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_UNREACHABLE: i64 = -32019; #[derive(Error, Debug)] pub enum RpcCustomError { @@ -75,6 +76,8 @@ pub enum RpcCustomError { }, #[error("SlotNotEpochBoundary")] SlotNotEpochBoundary { slot: Slot }, + #[error("LongTermStorageUnreachable")] + LongTermStorageUnreachable, } #[derive(Debug, Serialize, Deserialize)] @@ -244,6 +247,11 @@ impl From for Error { ), data: None, }, + RpcCustomError::LongTermStorageUnreachable => Self { + code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_UNREACHABLE), + message: "Failed to query long-term storage; please try again".to_string(), + data: None, + }, } } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index dbdb7e1208f6a5..29cce7b1a74680 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -1689,118 +1689,118 @@ impl JsonRpcRequestProcessor { let commitment = config.commitment.unwrap_or_default(); check_is_at_least_confirmed(commitment)?; - if self.config.enable_rpc_transaction_history { - let highest_super_majority_root = self - .block_commitment_cache - .read() - .unwrap() - .highest_super_majority_root(); - let highest_slot = if commitment.is_confirmed() { - let confirmed_bank = self.get_bank_with_config(config)?; - confirmed_bank.slot() - } else { - let min_context_slot = config.min_context_slot.unwrap_or_default(); - if highest_super_majority_root < min_context_slot { - return Err(RpcCustomError::MinContextSlotNotReached { - context_slot: highest_super_majority_root, - } - .into()); + if !self.config.enable_rpc_transaction_history { + return Err(RpcCustomError::TransactionHistoryNotAvailable.into()); + } + + let highest_super_majority_root = self + .block_commitment_cache + .read() + .unwrap() + .highest_super_majority_root(); + let highest_slot = if commitment.is_confirmed() { + let confirmed_bank = self.get_bank_with_config(config)?; + confirmed_bank.slot() + } else { + let min_context_slot = config.min_context_slot.unwrap_or_default(); + if highest_super_majority_root < min_context_slot { + return Err(RpcCustomError::MinContextSlotNotReached { + context_slot: highest_super_majority_root, } - highest_super_majority_root - }; + .into()); + } + highest_super_majority_root + }; - let SignatureInfosForAddress { - infos: mut results, - found_before, - } = self - .blockstore - .get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit) - .map_err(|err| Error::invalid_params(format!("{err}")))?; + let SignatureInfosForAddress { + infos: mut results, + found_before, + } = self + .blockstore + .get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit) + .map_err(|err| Error::invalid_params(format!("{err}")))?; - let map_results = |results: Vec| { - results - .into_iter() - .map(|x| { - let mut item: RpcConfirmedTransactionStatusWithSignature = x.into(); - if item.slot <= highest_super_majority_root { - item.confirmation_status = - Some(TransactionConfirmationStatus::Finalized); - } else { - item.confirmation_status = - Some(TransactionConfirmationStatus::Confirmed); - if item.block_time.is_none() { - let r_bank_forks = self.bank_forks.read().unwrap(); - item.block_time = r_bank_forks - .get(item.slot) - .map(|bank| bank.clock().unix_timestamp); - } + let map_results = |results: Vec| { + results + .into_iter() + .map(|x| { + let mut item: RpcConfirmedTransactionStatusWithSignature = x.into(); + if item.slot <= highest_super_majority_root { + item.confirmation_status = Some(TransactionConfirmationStatus::Finalized); + } else { + item.confirmation_status = Some(TransactionConfirmationStatus::Confirmed); + if item.block_time.is_none() { + let r_bank_forks = self.bank_forks.read().unwrap(); + item.block_time = r_bank_forks + .get(item.slot) + .map(|bank| bank.clock().unix_timestamp); } - item - }) - .collect() - }; - - if results.len() < limit { - if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { - let mut bigtable_before = before; - if !results.is_empty() { - limit -= results.len(); - bigtable_before = results.last().map(|x| x.signature); } + item + }) + .collect() + }; - // If the oldest address-signature found in Blockstore has not yet been - // uploaded to long-term storage, modify the storage query to return all latest - // signatures to prevent erroring on RowNotFound. This can race with upload. - if found_before && bigtable_before.is_some() { - match bigtable_ledger_storage - .get_signature_status(&bigtable_before.unwrap()) - .await - { - Err(StorageError::SignatureNotFound) => { - bigtable_before = None; - } - Err(err) => { - warn!("{:?}", err); - return Ok(map_results(results)); - } - Ok(_) => {} + if results.len() < limit { + if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { + let mut bigtable_before = before; + if !results.is_empty() { + limit -= results.len(); + bigtable_before = results.last().map(|x| x.signature); + } + + // If the oldest address-signature found in Blockstore has not yet been + // uploaded to long-term storage, modify the storage query to return all latest + // signatures to prevent erroring on RowNotFound. This can race with upload. + if found_before && bigtable_before.is_some() { + match bigtable_ledger_storage + .get_signature_status(&bigtable_before.unwrap()) + .await + { + Err(StorageError::SignatureNotFound) => { + bigtable_before = None; + } + Err(err) => { + warn!("Failed to query Bigtable: {:?}", err); + return Err(RpcCustomError::LongTermStorageUnreachable.into()); } + Ok(_) => {} } + } - let bigtable_results = bigtable_ledger_storage - .get_confirmed_signatures_for_address( - &address, - bigtable_before.as_ref(), - until.as_ref(), - limit, - ) - .await; - match bigtable_results { - Ok(bigtable_results) => { - let results_set: HashSet<_> = - results.iter().map(|result| result.signature).collect(); - for (bigtable_result, _) in bigtable_results { - // In the upload race condition, latest address-signatures in - // long-term storage may include original `before` signature... - if before != Some(bigtable_result.signature) + let bigtable_results = bigtable_ledger_storage + .get_confirmed_signatures_for_address( + &address, + bigtable_before.as_ref(), + until.as_ref(), + limit, + ) + .await; + match bigtable_results { + Ok(bigtable_results) => { + let results_set: HashSet<_> = + results.iter().map(|result| result.signature).collect(); + for (bigtable_result, _) in bigtable_results { + // In the upload race condition, latest address-signatures in + // long-term storage may include original `before` signature... + if before != Some(bigtable_result.signature) // ...or earlier Blockstore signatures && !results_set.contains(&bigtable_result.signature) - { - results.push(bigtable_result); - } + { + results.push(bigtable_result); } } - Err(err) => { - warn!("{:?}", err); - } + } + Err(StorageError::SignatureNotFound) => {} + Err(err) => { + warn!("Failed to query Bigtable: {:?}", err); + return Err(RpcCustomError::LongTermStorageUnreachable.into()); } } } - - Ok(map_results(results)) - } else { - Err(RpcCustomError::TransactionHistoryNotAvailable.into()) } + + Ok(map_results(results)) } pub async fn get_first_available_block(&self) -> Slot { diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 3af928a626d834..a72b6b2321cb20 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -796,7 +796,11 @@ impl LedgerStorage { Some(before_signature) => { let TransactionInfo { slot, index, .. } = bigtable .get_bincode_cell("tx", before_signature.to_string()) - .await?; + .await + .map_err(|err| match err { + bigtable::Error::RowNotFound => Error::SignatureNotFound, + _ => err.into(), + })?; (slot, index) } @@ -808,7 +812,11 @@ impl LedgerStorage { Some(until_signature) => { let TransactionInfo { slot, index, .. } = bigtable .get_bincode_cell("tx", until_signature.to_string()) - .await?; + .await + .map_err(|err| match err { + bigtable::Error::RowNotFound => Error::SignatureNotFound, + _ => err.into(), + })?; (slot, index) }