From 0934863cfa60352b58f8d837a5a4cd507c1cb14d Mon Sep 17 00:00:00 2001 From: MrishoLukamba Date: Mon, 6 May 2024 03:53:22 +0300 Subject: [PATCH] added subscribe revert tx & changed some data structures --- Cargo.toml | 2 + av-layer/Cargo.toml | 3 +- av-layer/src/handlers.rs | 107 ++++++++++++++++++---------- av-layer/src/main.rs | 23 ++++-- av-layer/src/traits.rs | 12 ++-- integration-test/Cargo.toml | 2 + integration-test/src/main.rs | 37 +++++++--- primitives/Cargo.toml | 20 +++--- primitives/src/lib.rs | 132 ++++++++++++++++++++++++----------- 9 files changed, 235 insertions(+), 103 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6642a7f..d77a48d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,8 @@ flume = "0.11.0" sp-rpc = "29.0.0" subxt = "0.35.0" tokio = "1.36.0" +hex = "0.4.3" +hex-literal = "0.4.1" tower-http = { version = "0.4.0", features = ["full"] } tower = { version = "0.4.13", features = ["full"] } jsonrpsee = { version = "0.17", features = ["server", "client-core", "http-client", "ws-client", "macros"] } diff --git a/av-layer/Cargo.toml b/av-layer/Cargo.toml index 0d1e722..a18452e 100644 --- a/av-layer/Cargo.toml +++ b/av-layer/Cargo.toml @@ -25,4 +25,5 @@ serde_json = { workspace = true} subxt = { workspace = true} tracing = { workspace = true} sp-tracing = { workspace = true} -tracing-subscriber = { workspace = true} \ No newline at end of file +tracing-subscriber = { workspace = true} +hex = { workspace = true} diff --git a/av-layer/src/handlers.rs b/av-layer/src/handlers.rs index 5cf9283..75fe583 100644 --- a/av-layer/src/handlers.rs +++ b/av-layer/src/handlers.rs @@ -1,12 +1,12 @@ use crate::traits::*; +use anyhow::ensure; use jsonrpsee::core::{async_trait, SubscriptionResult}; use jsonrpsee::core::{Error::Custom, RpcResult}; use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage}; -use parity_scale_codec::alloc::sync::Once; use parity_scale_codec::{Decode, Encode}; use primitives::{ - BlockchainNetwork, ConfirmationStatus, TxConfirmationObject, TxObject, TxSimulationObject, - VaneCallData, VaneMultiAddress, + BlockchainNetwork, ConfirmationStatus, MultiId, TxConfirmationObject, TxObject, + TxSimulationObject, VaneCallData, VaneMultiAddress, }; use serde_json::Value as JsonValue; use sp_core::ecdsa::{Public as ecdsaPublic, Signature as ECDSASignature}; @@ -20,26 +20,7 @@ use std::{ }; use subxt::utils::{AccountId32, MultiAddress, MultiSignature}; use tokio::sync::Mutex; -use tracing_subscriber; - -// Tracing setup -static INIT: Once = Once::new(); -pub fn init_tracing() -> anyhow::Result<()> { - // Add test tracing (from sp_tracing::init_for_tests()) but filtering for xcm logs only - let vane_subscriber = tracing_subscriber::fmt() - .compact() - .with_file(true) - .with_line_number(true) - .with_target(true) - .finish(); - - tracing::dispatcher::set_global_default(vane_subscriber.into()) - .expect("Failed to initialise tracer"); - Ok(()) -} -/// Types for easier code navigation -pub type MultiId = VaneMultiAddress; /// A mock database storing each address to the transactions each having a key /// `address` ===> `multi_id`=====> `Vec` pub struct MockDB { @@ -56,7 +37,7 @@ pub struct MockDB { // Store ready to be simulated tx `TxSimulationObject` (queue) pub simulation: VecDeque>, // Record reverted transactions per sender - pub reverted_transactions: BTreeMap, Vec>, + pub reverted_transactions: BTreeMap, Vec>, // ============================================================================ // METRICS @@ -87,8 +68,8 @@ impl TransactionHandler { inner_db_multi_ids.push(multi_id.clone()); db.transactions.insert(multi_id, data.encode()); - db.multi_ids.insert(address, inner_db_multi_ids); - tracing::info!("recorded tx to the memory db") + db.multi_ids.insert(address.clone(), inner_db_multi_ids); + tracing::info!("recorded tx to the memory db for {:?}", address) } pub async fn set_confirmation_transaction_data( @@ -140,6 +121,20 @@ impl TransactionHandler { } } + pub async fn get_reverted_txs(&self) -> Vec { + let db = self.db.lock().await; + let reverted_txs: Vec = db + .reverted_transactions + .values() + .into_iter() + .map(|tx| { + let decoded_tx: TxConfirmationObject = Decode::decode(&mut &tx[..]).expect("hh"); + decoded_tx + }) + .collect(); + return reverted_txs; + } + pub async fn propagate_tx(&self, tx_simulate: TxSimulationObject) { let mut db = self.db.lock().await; db.simulation.push_front(tx_simulate.encode()); @@ -162,6 +157,15 @@ impl TransactionHandler { } } + pub async fn record_reverted_tx( + &self, + sender: VaneMultiAddress, + reverted_tx: TxConfirmationObject, + ) { + let mut db = self.db.lock().await; + db.reverted_transactions + .insert(sender, reverted_tx.encode()); + } // METRICS pub async fn record_subscriber(&self, id: JsonValue) { @@ -209,7 +213,7 @@ impl TransactionServer for TransactionHandler { todo!() } - async fn subscribe_tx_confirmation( + async fn receiver_subscribe_tx_confirmation( &self, pending: PendingSubscriptionSink, address: VaneMultiAddress, @@ -241,7 +245,7 @@ impl TransactionServer for TransactionHandler { } // Subscribe for sender to listen to confirmed tx from the receiver - async fn subscribe_tx_confirmation_sender( + async fn sender_subscribe_tx_confirmation( &self, pending: PendingSubscriptionSink, address: VaneMultiAddress, @@ -288,7 +292,9 @@ impl TransactionServer for TransactionHandler { .ok_or(Custom("Transaction Not Found".to_string()))?; // record the confirmation - let msg = tx.clone().call; + // message to sign for address verification + let msg = tx.clone().get_tx_id(); + let sig = Sr25519Signature::from_slice(&signature) .ok_or(Custom("Failed to convert signature sr25519".to_string()))?; @@ -297,13 +303,17 @@ impl TransactionServer for TransactionHandler { .try_into() .expect("Failed to covert address to bytes"); let public_account = sr25519Public::from_h256(H256::from(account_bytes)); + if sig.verify(&msg.encode()[..], &public_account) { let mut tx_confirmation_object: TxConfirmationObject = tx.into(); + + // update the confirmed address + tx_confirmation_object.set_confirmed_receiver(address); + // update the confirmation status tx_confirmation_object.update_confirmation_status( primitives::ConfirmationStatus::WaitingForSender, ); - tx_confirmation_object.set_receiver_sig(signature); // store the tx confirmation object self.set_confirmation_transaction_data(multi_id.into(), tx_confirmation_object) .await @@ -328,12 +338,16 @@ impl TransactionServer for TransactionHandler { .get_confirmation_transaction_data(multi_id.clone().into()) .await .ok_or(Custom("Confirmation data unavailable".to_string()))?; + // check if the if the receiver has confirmed if tx.get_confirmation_status() != ConfirmationStatus::WaitingForSender { return Err(Custom("Wait for receiver to confirm".to_string())); } // verify the signature and the address - let msg = tx.clone().call; + + // message to sign for address verification + let msg = tx.clone().get_tx_id(); + let sig = Sr25519Signature::from_slice(&signature) .expect("Failed to convert signature sr25519"); @@ -342,13 +356,28 @@ impl TransactionServer for TransactionHandler { .try_into() .expect("Failed to covert address to bytes"); let public_account = sr25519Public::from_h256(H256::from(account_bytes)); + if sig.verify(&msg.encode()[..], &public_account) { - tx.update_confirmation_status(ConfirmationStatus::Ready); - let tx_simulation_object: TxSimulationObject = tx.into(); - // store to the ready to be simulated tx storage - self.propagate_tx(tx_simulation_object).await; + // confirm the resulting multi_id + let multi_id = tx.calculate_confirmed_multi_id(address.clone()); + + if multi_id == tx.get_multi_id() { + tx.set_confirmed_sender(address); + + tx.update_confirmation_status(ConfirmationStatus::Ready); + let tx_simulation_object: TxSimulationObject = tx.into(); + // store to the ready to be simulated tx storage + self.propagate_tx(tx_simulation_object).await; + tracing::info!("sender confirmed"); + } else { + // record to reverted tx, as this tx is automatically reverted due to mismatch in address confirmation + tx.update_confirmation_status(ConfirmationStatus::RejectedMismatchAddress); + + tracing::info!("tx reverted due to mismatched confirmed address"); + self.record_reverted_tx(address, tx).await; + Err(Custom("Address Mismatched , Tx reverted".to_string()))?; + } }; - tracing::info!("sender confirmed"); Ok(()) } _ => Err(Custom("Blockchain network not supported".to_string())), @@ -364,6 +393,14 @@ impl TransactionServer for TransactionHandler { todo!() } + async fn subscribe_revert_tx(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { + let sink = pending.accept().await?; + let reverted_txs: Vec = self.get_reverted_txs().await; + let json_reverted_txs = SubscriptionMessage::from_json(&reverted_txs)?; + sink.send(json_reverted_txs).await?; + Ok(()) + } + async fn receive_confirmed_tx(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { let sink = pending.accept().await?; // fetch the confirmed and ready to be simulated txn diff --git a/av-layer/src/main.rs b/av-layer/src/main.rs index 311c23d..aa37eb1 100644 --- a/av-layer/src/main.rs +++ b/av-layer/src/main.rs @@ -2,7 +2,6 @@ use anyhow::Ok; use clap::Parser; use jsonrpsee::server::ServerBuilder; use std::collections::BTreeMap; -use std::collections::HashMap; use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; @@ -12,11 +11,11 @@ mod traits; use handlers::MockDB; use handlers::TransactionHandler; +use parity_scale_codec::alloc::sync::Once; use tokio::sync::Mutex; +use tracing_subscriber; use traits::TransactionServer; -use crate::handlers::init_tracing; - /// Address Verification layer cli server arguments #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -34,9 +33,26 @@ pub struct ServerProfile { port: u16, } +// Tracing setup +static INIT: Once = Once::new(); +pub fn init_tracing() -> anyhow::Result<()> { + // Add test tracing (from sp_tracing::init_for_tests()) but filtering for xcm logs only + let vane_subscriber = tracing_subscriber::fmt() + .compact() + .with_file(true) + .with_line_number(true) + .with_target(true) + .finish(); + + tracing::dispatcher::set_global_default(vane_subscriber.into()) + .expect("Failed to initialise tracer"); + Ok(()) +} + #[tokio::main] async fn main() -> anyhow::Result<()> { init_tracing()?; + tracing::info!("initiliasing av layer 🔥⚡️"); //let args = AvLayerServerCli::parse(); // Initialise the database let mock_db_transactions = BTreeMap::new(); @@ -54,7 +70,6 @@ async fn main() -> anyhow::Result<()> { subscribed: Vec::new(), })), }; - println!("Starting server"); // Initialize the server run_rpc_server(rpc_handler, "127.0.0.1:8000".to_owned()).await?; diff --git a/av-layer/src/traits.rs b/av-layer/src/traits.rs index e15850c..3a4a688 100644 --- a/av-layer/src/traits.rs +++ b/av-layer/src/traits.rs @@ -26,15 +26,15 @@ pub trait Transaction { /// Subscription to start listening to any upcoming confirmation request /// returns `Vec` in encoded format - #[subscription(name = "subscribeTxConfirmation", item=Vec>)] - async fn subscribe_tx_confirmation( + #[subscription(name = "receiverSubscribeTxConfirmation", unsubscribe= "receiverUnsubscribeTxConfirmation", item=Vec>)] + async fn receiver_subscribe_tx_confirmation( &self, address: VaneMultiAddress, ) -> SubscriptionResult; /// Subscriptiom for sender to listen to incoming confirmed transactions from the receiver - #[subscription(name = "subscribeTxConfirmationSender", item=Vec)] - async fn subscribe_tx_confirmation_sender( + #[subscription(name = "senderSubscribeTxConfirmation",unsubscribe= "senderUnsubscribeTxConfirmation", item=Vec)] + async fn sender_subscribe_tx_confirmation( &self, address: VaneMultiAddress, ) -> SubscriptionResult; @@ -69,6 +69,10 @@ pub trait Transaction { network: BlockchainNetwork, ) -> RpcResult<()>; + /// Subscribe to reverted transactions + #[subscription(name = "subscribeRevertTx",unsubscribe= "unsubscribeRevertTx", item=Vec)] + async fn subscribe_revert_tx(&self) -> SubscriptionResult; + /// Handling confirmation of transaction from receiver and sender /// A websocket connection diff --git a/integration-test/Cargo.toml b/integration-test/Cargo.toml index c4de352..cc6a760 100644 --- a/integration-test/Cargo.toml +++ b/integration-test/Cargo.toml @@ -23,3 +23,5 @@ subxt-signer = { workspace = true} tracing = { workspace = true} sp-tracing = { workspace = true} tracing-subscriber = { workspace = true} +clap = { workspace = true, features = ["derive"]} +hex = { workspace = true} diff --git a/integration-test/src/main.rs b/integration-test/src/main.rs index 503e174..aae81b6 100644 --- a/integration-test/src/main.rs +++ b/integration-test/src/main.rs @@ -28,6 +28,9 @@ use subxt::tx::TxPayload; use subxt::utils::{AccountId32, MultiAddress}; use subxt::{Metadata, OnlineClient, PolkadotConfig}; use subxt_signer::sr25519::{dev, Keypair}; +use jsonrpsee::rpc_params; +use clap::Parser; + #[subxt::subxt(runtime_metadata_path = "polkadot.scale")] pub mod polkadot {} @@ -35,13 +38,30 @@ pub mod polkadot {} // Solana testing // Ethereum testing +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct VaneTestCli { + /// Name of the server + #[arg(short, long)] + name: String +} + #[tokio::main] async fn main() -> anyhow::Result<()> { - // generate accounts - // let alicePair = sr25519Pair::from_string("//Alice", None).expect("Failed to generate key pair"); - // let bobPair = sr25519Pair::from_string("//Bob", None).expect("Failed to generate key pair"); - let alice = dev::alice().public_key(); - //let bob:VaneMultiAddress = VaneMultiAddress::Address32(dev::bob().public_key().into()); + + let args = VaneTestCli::parse(); + + if args.name == "polkadot".to_string() { + let alice = dev::alice(); + let bob = dev::bob(); + let vane_account = (); + + let polkadot_test = PolkadotTest::connect().await; + + + } + + // construct a transfer tx //let transfer_call = polkadot::tx().balances().transfer_keep_alive(bob, 10_000); // send to vane av-layer @@ -66,7 +86,7 @@ pub struct PolkadotTest { impl PolkadotTest { pub async fn connect() -> PolkadotTest { let client = WsClientBuilder::default() - .build("127.0.0.1:8000") + .build("ws://127.0.0.1:8000") .await .expect("Failed to initilise Ws"); @@ -87,11 +107,12 @@ impl PolkadotTest { let vane_call_data = VaneCallData::new(BlockchainNetwork::Polkadot, amount); // use the client to submit the transaction to av layer + //let params = rpc_params!(sender_multi,receiver_multi,vane_call_data); if self.client.is_connected() { self.client .request( - "submitTransaction", - vec![sender_multi,receiver_multi], + "submit_transaction", + (vane_call_data, sender_multi, receiver_multi), ) .await?; } diff --git a/primitives/Cargo.toml b/primitives/Cargo.toml index 6bc0be3..ab1ce42 100644 --- a/primitives/Cargo.toml +++ b/primitives/Cargo.toml @@ -6,14 +6,16 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -subxt = {workspace = true} -chrono = {workspace = true} -parity-scale-codec = { workspace = true } -anyhow = {workspace = true } -serde = { worskspace = true } -frame-support = { workspace = true } -sp-runtime = {workspace = true} +subxt = { workspace = true} +chrono = { workspace = true} +parity-scale-codec = { workspace = true} +anyhow = { workspace = true} +serde = { workspace = true} +frame-support = { workspace = true} +sp-runtime = { workspace = true} sp-core = { workspace = true} -scale-info = { workspace = true } +scale-info = { workspace = true} +hex = { workspace = true} derivative = "2.2.0" -tinyrand = "0.5.0" \ No newline at end of file +tinyrand = "0.5.0" +hex-literal = { workspace = true} diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index c64043a..d1078eb 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -1,16 +1,21 @@ pub use common::*; +use derivative::Derivative; use frame_support::StorageHasher; use frame_support::Twox64Concat; +use hex::ToHex; +use hex_literal::hex; use parity_scale_codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; -use derivative::Derivative; -use tinyrand::{Rand, StdRand, RandRange}; +use tinyrand::{Rand, RandRange, StdRand}; pub mod common { use std::borrow::Cow; use sp_core::{blake2_128, blake2_256}; - use subxt::{tx::{DynamicPayload, Payload}, utils::{AccountId32,MultiAddress,MultiSignature}}; + use subxt::{ + tx::{DynamicPayload, Payload}, + utils::{AccountId32, MultiAddress, MultiSignature}, + }; use super::*; /// The transaction object which all operations will be applied upon @@ -31,6 +36,9 @@ pub mod common { pub lifetime_status: LifetimeStatus, } + /// Types for easier code navigation + pub type MultiId = VaneMultiAddress; + impl TxObject { pub fn new( call: VaneCallData, @@ -38,11 +46,13 @@ pub mod common { receiver_address: MultiAddress, network: BlockchainNetwork, ) -> Self { - let tx_id = call.get_tx_id(); - let tx_id = String::from_utf8(tx_id).expect("Failed to convert tx id from bytes"); + let tx_id_vec = call.get_tx_id(); + let tx_id_hash = Twox64Concat::hash(&tx_id_vec[..]); + let tx_id = hex::encode(tx_id_hash); - let multi_id = (sender_address.clone(), receiver_address.clone(),b"VANE").using_encoded(blake2_256); - let multi_id: MultiAddress = MultiAddress::Address32(multi_id); + let multi_id = (sender_address.clone(), receiver_address.clone(), b"VANE") + .using_encoded(blake2_256); + let multi_id: VaneMultiAddress = VaneMultiAddress::Address32(multi_id); Self { tx_id, call, @@ -58,6 +68,10 @@ pub mod common { pub fn get_multi_id(&self) -> VaneMultiAddress { self.multi_id.clone().into() } + + pub fn get_tx_id(self) -> String { + self.tx_id + } } /// VaneCallData represents enumeration on different network transaction function types ( Call ) @@ -66,35 +80,32 @@ pub mod common { #[derive(Debug, Encode, Decode, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum VaneCallData { SubstrateCallData { - amount: u128 + amount: u128, }, - SolanaCallData{ - amount:u128, - extra_receivers:Vec> + SolanaCallData { + amount: u128, + extra_receivers: Vec>, + }, + EthereumCallData { + amount: u128, }, - EthereumCallData{ - amount: u128 - } } impl VaneCallData { - pub fn new(network: BlockchainNetwork, amount: u128) -> Self { match network { - BlockchainNetwork::Polkadot => { - VaneCallData::SubstrateCallData { amount } - }, - _ => todo!() + BlockchainNetwork::Polkadot => VaneCallData::SubstrateCallData { amount }, + _ => todo!(), } } pub fn get_tx_id(&self) -> Vec { - match self{ - VaneCallData::SubstrateCallData{amount} => { + match self { + VaneCallData::SubstrateCallData { amount } => { let mut rand = StdRand::default(); - Twox64Concat::hash(&format!("{}{}",amount, rand.next_u128()).encode()[..]) - }, - _ => todo!() + Twox64Concat::hash(&format!("{}{}", amount, rand.next_u128()).encode()[..]) + } + _ => todo!(), } } } @@ -127,7 +138,8 @@ pub mod common { WaitingForSender, Ready, Accepted, - Rejected, + RejectedMismatchAddress, + RejectedSenderRevert } impl From for TxConfirmationObject { @@ -135,12 +147,11 @@ pub mod common { Self { tx_id: value.tx_id, call: value.call, - receiver_sig: None, - sender_sig: None, confirmation_status: ConfirmationStatus::WaitingForReceiver, network: value.network, - sender_address: value.sender_address.into(), - receiver_address: value.receiver_address.into(), + confirmed_sender_address: None, + confirmed_receiver_address: None, + multi_id: value.multi_id, } } } @@ -160,20 +171,24 @@ pub mod common { network: BlockchainNetwork, } + impl TxSimulationObject { + pub fn get_tx_id(self) -> String { + self.tx_id + } + } + /// Struct to be sent in the network for confirmation from sender and receiver #[derive(Debug, Encode, Decode, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct TxConfirmationObject { - sender_address: VaneMultiAddress, - receiver_address: VaneMultiAddress, + confirmed_sender_address: Option>, + confirmed_receiver_address: Option>, // Tx hash representation and acting as a link among 3 objects (TxObject, TxSimulation, TxConfirmation) tx_id: String, pub call: VaneCallData, // State of the Tx to be confirmed confirmation_status: ConfirmationStatus, - // Receiver confirmation signature - receiver_sig: Option>, - // Sender confirmation signature - sender_sig: Option>, + // multi_id + multi_id: VaneMultiAddress, // Blockchain network to submit the Tx to network: BlockchainNetwork, } @@ -185,8 +200,12 @@ pub mod common { call: value.call, confirmation_status: value.confirmation_status, network: value.network, - sender_address: value.sender_address, - receiver_address: value.receiver_address, + sender_address: value + .confirmed_sender_address + .expect("Failed to unwrap confirmed sender"), + receiver_address: value + .confirmed_receiver_address + .expect("Failed to unwrap confirmed receiver"), } } } @@ -196,17 +215,45 @@ pub mod common { self.confirmation_status = status } - pub fn set_receiver_sig(&mut self, receiver_sig: Vec) { - self.receiver_sig = Some(receiver_sig) + pub fn set_confirmed_receiver( + &mut self, + confirmed_receiver: VaneMultiAddress, + ) { + self.confirmed_receiver_address = Some(confirmed_receiver) } - pub fn set_sender_sig(&mut self, sender_sig: Vec) { - self.sender_sig = Some(sender_sig) + pub fn set_confirmed_sender( + &mut self, + confirmed_sender: VaneMultiAddress, + ) { + self.confirmed_sender_address = Some(confirmed_sender) } pub fn get_confirmation_status(&self) -> ConfirmationStatus { self.confirmation_status.clone() } + + pub fn calculate_confirmed_multi_id( + &self, + sender: VaneMultiAddress, + ) -> MultiId { + let receiver = self + .confirmed_receiver_address + .clone() + .expect("Failed to unwrap confirmed receiver"); + + let multi_id = (sender, receiver, b"VANE").using_encoded(blake2_256); + let multi_id: VaneMultiAddress = VaneMultiAddress::Address32(multi_id); + multi_id + } + + pub fn get_multi_id(&self) -> MultiId { + self.multi_id.clone() + } + + pub fn get_tx_id(self) -> String { + self.tx_id + } } #[derive(Debug, Encode, Decode, Clone, Serialize, Deserialize)] @@ -241,7 +288,8 @@ pub mod common { serde::Deserialize, Debug, Hash, - PartialOrd, Ord + PartialOrd, + Ord, )] #[cfg_attr(feature = "std", derive(Hash))] pub enum VaneMultiAddress {