Skip to content

Commit

Permalink
fix: removed block_id from starknet_subscribeTransactionStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
vbar committed Jan 21, 2025
1 parent f3edbd7 commit 1eb1176
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 65 deletions.
109 changes: 46 additions & 63 deletions crates/rpc/src/method/subscribe_transaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use axum::async_trait;
use pathfinder_common::receipt::ExecutionStatus;
use pathfinder_common::{BlockId, BlockNumber, TransactionHash};
use pathfinder_common::{BlockNumber, TransactionHash};
use reply::transaction_status as status;
use starknet_gateway_client::GatewayApi;
use starknet_gateway_types::reply;
Expand All @@ -12,7 +12,6 @@ use tokio::time::MissedTickBehavior;

use super::REORG_SUBSCRIPTION_NAME;
use crate::context::RpcContext;
use crate::error::ApplicationError;
use crate::jsonrpc::{RpcError, RpcSubscriptionFlow, SubscriptionMessage};
use crate::Reorg;

Expand All @@ -21,15 +20,13 @@ pub struct SubscribeTransactionStatus;
#[derive(Debug, Clone, Default)]
pub struct Params {
transaction_hash: TransactionHash,
block_id: Option<BlockId>,
}

impl crate::dto::DeserializeForVersion for Params {
fn deserialize(value: crate::dto::Value) -> Result<Self, serde_json::Error> {
value.deserialize_map(|value| {
Ok(Self {
transaction_hash: value.deserialize("transaction_hash").map(TransactionHash)?,
block_id: value.deserialize_optional_serde("block_id")?,
})
})
}
Expand Down Expand Up @@ -143,49 +140,51 @@ impl RpcSubscriptionFlow for SubscribeTransactionStatus {
let mut l2_blocks = state.notifications.l2_blocks.subscribe();
let mut reorgs = state.notifications.reorgs.subscribe();
let storage = state.storage.clone();
if let Some(first_block) = params.block_id {
// Check if we have the transaction in our database, and if so, send the
// relevant transaction status updates.
let (first_block, l1_state, tx_with_receipt) =
util::task::spawn_blocking(move |_| -> Result<_, RpcError> {
let mut conn = storage.connection().map_err(RpcError::InternalError)?;
let db = conn.transaction().map_err(RpcError::InternalError)?;
let first_block = db
.block_number(first_block.try_into().map_err(|_| {
RpcError::ApplicationError(ApplicationError::CallOnPending)
})?)
.map_err(RpcError::InternalError)?;
let l1_block_number =
db.latest_l1_state().map_err(RpcError::InternalError)?;
let tx_with_receipt = db
.transaction_with_receipt(tx_hash)
.map_err(RpcError::InternalError)?;
Ok((first_block, l1_block_number, tx_with_receipt))
})
.await
.map_err(|e| RpcError::InternalError(e.into()))??;
let first_block = first_block
.ok_or_else(|| RpcError::ApplicationError(ApplicationError::BlockNotFound))?;
if let Some((_, receipt, _, block_number)) = tx_with_receipt {
// We already have the transaction in the database.
if let Some(parent) = block_number.parent() {
// This transaction was pending in the parent block.
if first_block <= parent {
if sender
.send(parent, FinalityStatus::Received, None)
.await
.is_err()
{
// Subscription closing.
break;
}
}
// Check if we have the transaction in our database, and if so, send the
// relevant transaction status updates.
let (l1_state, tx_with_receipt) =
util::task::spawn_blocking(move |_| -> Result<_, RpcError> {
let mut conn = storage.connection().map_err(RpcError::InternalError)?;
let db = conn.transaction().map_err(RpcError::InternalError)?;
let l1_block_number = db.latest_l1_state().map_err(RpcError::InternalError)?;
let tx_with_receipt = db
.transaction_with_receipt(tx_hash)
.map_err(RpcError::InternalError)?;
Ok((l1_block_number, tx_with_receipt))
})
.await
.map_err(|e| RpcError::InternalError(e.into()))??;
if let Some((_, receipt, _, block_number)) = tx_with_receipt {
// We already have the transaction in the database.
if let Some(parent) = block_number.parent() {
// This transaction was pending in the parent block.
if sender
.send(parent, FinalityStatus::Received, None)
.await
.is_err()
{
// Subscription closing.
break;
}
if first_block <= block_number {
}
if sender
.send(
block_number,
FinalityStatus::AcceptedOnL2,
Some(receipt.execution_status.clone()),
)
.await
.is_err()
{
// Subscription closing.
break;
}
if let Some(l1_state) = l1_state {
if l1_state.block_number >= block_number {
if sender
.send(
block_number,
FinalityStatus::AcceptedOnL2,
l1_state.block_number,
FinalityStatus::AcceptedOnL1,
Some(receipt.execution_status.clone()),
)
.await
Expand All @@ -195,22 +194,6 @@ impl RpcSubscriptionFlow for SubscribeTransactionStatus {
break;
}
}
if let Some(l1_state) = l1_state {
if l1_state.block_number >= block_number {
if sender
.send(
l1_state.block_number,
FinalityStatus::AcceptedOnL1,
Some(receipt.execution_status.clone()),
)
.await
.is_err()
{
// Subscription closing.
break;
}
}
}
}
}
let pending = pending_data.borrow_and_update().clone();
Expand Down Expand Up @@ -492,7 +475,7 @@ mod tests {
"execution_status": "SUCCEEDED",
}
},
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V07)).unwrap(),
"subscription_id": subscription_id.serialize(Serializer::new(RpcVersion::V08)).unwrap(),
}
})
]
Expand Down Expand Up @@ -943,7 +926,7 @@ mod tests {
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
let params = serde_json::json!(
{"block_id": {"block_number": 0}, "transaction_hash": tx_hash}
{"transaction_hash": tx_hash}
);
receiver_tx
.send(Ok(Message::Text(
Expand Down Expand Up @@ -1097,7 +1080,7 @@ mod tests {
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
let params = serde_json::json!(
{"block_id": {"block_number": 0}, "transaction_hash": tx_hash}
{"transaction_hash": tx_hash}
);
receiver_tx
.send(Ok(Message::Text(
Expand Down
11 changes: 9 additions & 2 deletions doc/rpc/v08/starknet_ws_api.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,21 @@
{
"name": "starknet_subscribeTransactionStatus",
"summary": "Transaction Status subscription",
"description": "Creates a WebSocket stream which at first fires an event with the current known transaction status, followed by events for every transaction status update",
"description": "Creates a WebSocket stream which will fire events when a transaction status is updated",
"params": [
{
"name": "transaction_hash",
"summary": "The transaction hash to fetch status updates for",
"required": true,
"schema": {
"$ref": "#/components/schemas/FELT"
},
{
"name": "block_id",
"summary": "The block to get notifications from, default is latest, limited to 1024 blocks back",
"required": false,
"schema": {
"$ref": "./api/starknet_api_openrpc.json#/components/schemas/BLOCK_ID"
}
}
],
Expand Down Expand Up @@ -360,4 +367,4 @@
}
}
}
}
}

0 comments on commit 1eb1176

Please sign in to comment.