diff --git a/Cargo.lock b/Cargo.lock index a412225a6..d97127b43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,20 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-runtime-components" +version = "0.1.0" +dependencies = [ + "cgp-core", + "futures-channel", + "futures-core", + "futures-util", + "ibc-relayer-components", + "ibc-relayer-components-extra", + "ibc-test-components", + "rand", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -530,7 +544,7 @@ dependencies = [ [[package]] name = "cgp-component-macro" version = "0.1.0" -source = "git+https://github.com/informalsystems/cgp.git?branch=main#359d5bbbe6368e1dbe7b2dcd0f0489ac1e55ca75" +source = "git+https://github.com/informalsystems/cgp.git?branch=soares/fix-impl-gat-bounds#5a9330c1340cf0097ed7996aa852f422a6cfeccd" dependencies = [ "itertools 0.11.0", "proc-macro2", @@ -736,7 +750,6 @@ dependencies = [ "ibc-relayer-components", "ibc-relayer-components-extra", "ibc-relayer-runtime", - "ibc-relayer-subscription", "ibc-relayer-types", "ibc-test-components", "itertools 0.11.0", @@ -797,7 +810,6 @@ dependencies = [ "ibc-relayer-components", "ibc-relayer-components-extra", "ibc-relayer-runtime", - "ibc-relayer-subscription", "ibc-relayer-types", "ibc-test-components", "itertools 0.11.0", @@ -1367,9 +1379,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" dependencies = [ "futures-channel", "futures-core", @@ -1382,9 +1394,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -1392,15 +1404,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" dependencies = [ "futures-core", "futures-task", @@ -1409,9 +1421,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-lite" @@ -1430,9 +1442,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", @@ -1441,21 +1453,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -1904,6 +1916,7 @@ dependencies = [ name = "ibc-relayer-cosmos" version = "0.1.0" dependencies = [ + "async-runtime-components", "async-trait", "cgp-core", "cosmos-client-components", @@ -1916,7 +1929,6 @@ dependencies = [ "ibc-relayer-components", "ibc-relayer-components-extra", "ibc-relayer-runtime", - "ibc-relayer-subscription", "ibc-relayer-types", "ibc-telemetry", "ibc-test-components", @@ -1977,16 +1989,15 @@ dependencies = [ name = "ibc-relayer-runtime" version = "0.1.0" dependencies = [ + "async-runtime-components", "async-trait", "cgp-core", "futures", "ibc-relayer-components", "ibc-relayer-components-extra", - "ibc-relayer-subscription", "ibc-test-components", "tokio", "tokio-runtime-components", - "tokio-stream", "tracing", ] @@ -2021,18 +2032,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "ibc-relayer-subscription" -version = "0.1.0" -dependencies = [ - "async-trait", - "cgp-core", - "futures-core", - "futures-util", - "ibc-relayer-components", - "ibc-relayer-components-extra", -] - [[package]] name = "ibc-relayer-types" version = "0.26.2" @@ -2943,7 +2942,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.37", @@ -4164,6 +4163,7 @@ dependencies = [ name = "tokio-runtime-components" version = "0.1.0" dependencies = [ + "async-runtime-components", "cgp-core", "futures", "ibc-relayer-components", @@ -4171,6 +4171,7 @@ dependencies = [ "ibc-test-components", "rand", "tokio", + "tokio-stream", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7bec166f6..a55e9c56c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,6 @@ members = [ "crates/relayer-cosmos-mock", "crates/relayer-runtime", "crates/relayer-solomachine", - "crates/relayer-subscription", "crates/relayer-mock", "crates/test-components", "crates/test-suite", @@ -53,17 +52,21 @@ tokio = { version = "1.34" } tracing = { version = "0.1.40" } tonic = { version = "0.10" } toml = { version = "0.8.8" } -futures = { version = "0.3" } +futures = { version = "0.3.29", default-features = false } +futures-core = { version = "0.3.29", default-features = false } +futures-util = { version = "0.3.29", default-features = false } +futures-channel = { version = "0.3.29", default-features = false } eyre = { version = "0.6.8" } itertools = { version = "0.11" } http = { version = "0.2.10" } flex-error = { version = "0.4.4", default-features = false } + ibc-relayer-components = { version = "0.1.0", path = "./crates/relayer-components" } ibc-relayer-components-extra = { version = "0.1.0", path = "./crates/relayer-components-extra" } ibc-relayer-runtime = { version = "0.1.0", path = "./crates/relayer-runtime" } -ibc-relayer-subscription = { version = "0.1.0", path = "./crates/relayer-subscription" } cosmos-client-components = { version = "0.1.0", path = "./crates/cosmos-client-components" } cosmos-test-components = { version = "0.1.0", path = "./crates/cosmos-test-components" } +async-runtime-components = { version = "0.1.0", path = "./crates/async-runtime-components" } tokio-runtime-components = { version = "0.1.0", path = "./crates/tokio-runtime-components" } ibc-relayer-cosmos = { version = "0.1.0", path = "./crates/relayer-cosmos" } ibc-relayer-solomachine = { version = "0.1.0", path = "./crates/relayer-solomachine" } @@ -82,7 +85,7 @@ basecoin-store = { git = "https://github.com/informalsystems/basecoin-rs.gi cgp-core = { git = "https://github.com/informalsystems/cgp.git", branch = "main" } cgp-component = { git = "https://github.com/informalsystems/cgp.git", branch = "main" } -cgp-component-macro = { git = "https://github.com/informalsystems/cgp.git", branch = "main" } +cgp-component-macro = { git = "https://github.com/informalsystems/cgp.git", branch = "soares/fix-impl-gat-bounds" } cgp-error = { git = "https://github.com/informalsystems/cgp.git", branch = "main" } cgp-error-eyre = { git = "https://github.com/informalsystems/cgp.git", branch = "main" } cgp-async = { git = "https://github.com/informalsystems/cgp.git", branch = "main" } diff --git a/crates/relayer-subscription/Cargo.toml b/crates/async-runtime-components/Cargo.toml similarity index 52% rename from crates/relayer-subscription/Cargo.toml rename to crates/async-runtime-components/Cargo.toml index 387c6a5b9..7b28bd146 100644 --- a/crates/relayer-subscription/Cargo.toml +++ b/crates/async-runtime-components/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "ibc-relayer-subscription" +name = "async-runtime-components" version = { workspace = true } edition = { workspace = true } license = { workspace = true } @@ -9,19 +9,17 @@ authors = { workspace = true } rust-version = { workspace = true } readme = "README.md" description = """ - Implementation of an IBC Relayer in Rust, as a library + Modular runtime components implemented using Tokio """ -[package.metadata.docs.rs] -all-features = true - -[features] - [dependencies] ibc-relayer-components = { workspace = true } ibc-relayer-components-extra = { workspace = true } -cgp-core = { workspace = true } -async-trait = { workspace = true } +ibc-test-components = { workspace = true } + +cgp-core = { workspace = true } +futures-core = { workspace = true } +futures-util = { workspace = true } +futures-channel = { workspace = true } -futures-core = { version = "0.3", default-features = false, features = ["alloc"] } -futures-util = { version = "0.3", default-features = false, features = ["alloc"] } \ No newline at end of file +rand = { version = "0.8.5" } \ No newline at end of file diff --git a/crates/async-runtime-components/src/channel/impls.rs b/crates/async-runtime-components/src/channel/impls.rs new file mode 100644 index 000000000..3e7429734 --- /dev/null +++ b/crates/async-runtime-components/src/channel/impls.rs @@ -0,0 +1,156 @@ +use alloc::boxed::Box; +use alloc::sync::Arc; +use cgp_core::prelude::*; +use cgp_core::CanRaiseError; +use futures_channel::mpsc; +use futures_util::lock::Mutex; +use futures_util::stream::StreamExt; +use ibc_relayer_components_extra::runtime::traits::channel::ReceiverStreamer; +use ibc_relayer_components_extra::runtime::traits::channel::SenderCloner; +use ibc_relayer_components_extra::runtime::traits::channel::{ + ChannelCreator, ChannelUser, ProvideChannelType, +}; + +use crate::channel::traits::{HasUnboundedChannelType, UnboundedChannelTypeProvider}; +use crate::channel::types::ChannelClosedError; +use crate::stream::traits::boxed::HasBoxedStreamType; + +pub struct ProvideUnboundedChannelType; + +impl ProvideChannelType for ProvideUnboundedChannelType +where + Runtime: Async, +{ + type Sender = Arc>> + where + T: Async; + + type Receiver = mpsc::UnboundedReceiver + where + T: Async; +} + +impl UnboundedChannelTypeProvider for ProvideUnboundedChannelType +where + Runtime: Async, +{ + fn from_unbounded_sender(sender: Arc>>) -> Self::Sender + where + T: Async, + { + sender + } + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async, + { + receiver + } + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async, + { + receiver + } + fn to_unbounded_sender_ref(sender: &Self::Sender) -> &Arc>> + where + T: Async, + { + sender + } + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async, + { + receiver + } +} + +impl ChannelCreator for ProvideUnboundedChannelType +where + Runtime: HasUnboundedChannelType, +{ + fn new_channel() -> (Runtime::Sender, Runtime::Receiver) + where + T: Async, + { + let (sender, receiver) = mpsc::unbounded(); + + ( + Runtime::from_unbounded_sender(Arc::new(Mutex::new(sender))), + Runtime::from_unbounded_receiver(receiver), + ) + } +} + +#[async_trait] +impl ChannelUser for ProvideUnboundedChannelType +where + Runtime: HasUnboundedChannelType + CanRaiseError, +{ + async fn send(sender: &Runtime::Sender, value: T) -> Result<(), Runtime::Error> + where + T: Async, + { + Runtime::to_unbounded_sender_ref(sender) + .lock() + .await + .unbounded_send(value) + .map_err(|_| Runtime::raise_error(ChannelClosedError)) + } + + async fn receive(receiver: &mut Runtime::Receiver) -> Result + where + T: Async, + { + Runtime::to_unbounded_receiver_ref(receiver) + .next() + .await + .ok_or(Runtime::raise_error(ChannelClosedError)) + } + + fn try_receive(receiver: &mut Runtime::Receiver) -> Result, Runtime::Error> + where + T: Async, + { + let res = Runtime::to_unbounded_receiver_ref(receiver).try_next(); + + // The result semantics of the futures version of receiver is slightly different + match res { + Ok(Some(res)) => Ok(Some(res)), + // Ok(None) means that the channel is closed + Ok(None) => Err(Runtime::raise_error(ChannelClosedError)), + // Error means that there is no meesage currently available + Err(_) => Ok(None), + } + } +} + +impl ReceiverStreamer for ProvideUnboundedChannelType +where + Runtime: HasUnboundedChannelType + HasBoxedStreamType, +{ + fn receiver_to_stream(receiver: Runtime::Receiver) -> Runtime::Stream + where + T: Async, + { + Runtime::from_boxed_stream(Box::pin(Runtime::to_unbounded_receiver(receiver))) + } +} + +impl SenderCloner for ProvideUnboundedChannelType +where + Runtime: HasUnboundedChannelType, +{ + fn clone_sender(sender: &Runtime::Sender) -> Runtime::Sender + where + T: Async, + { + Runtime::from_unbounded_sender(Runtime::to_unbounded_sender_ref(sender).clone()) + } +} diff --git a/crates/async-runtime-components/src/channel/mod.rs b/crates/async-runtime-components/src/channel/mod.rs new file mode 100644 index 000000000..1c792999e --- /dev/null +++ b/crates/async-runtime-components/src/channel/mod.rs @@ -0,0 +1,3 @@ +pub mod impls; +pub mod traits; +pub mod types; diff --git a/crates/async-runtime-components/src/channel/traits.rs b/crates/async-runtime-components/src/channel/traits.rs new file mode 100644 index 000000000..2dfe55339 --- /dev/null +++ b/crates/async-runtime-components/src/channel/traits.rs @@ -0,0 +1,149 @@ +use alloc::sync::Arc; +use cgp_core::prelude::*; +use futures_channel::mpsc; +use futures_util::lock::Mutex; +use ibc_relayer_components_extra::runtime::traits::channel::{ + ChannelTypeComponent, HasChannelTypes, ProvideChannelType, +}; + +pub trait HasUnboundedChannelType: HasChannelTypes { + fn from_unbounded_sender(sender: Arc>>) -> Self::Sender + where + T: Async; + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async; + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async; + + fn to_unbounded_sender_ref( + sender: &Self::Sender, + ) -> &Arc>> + where + T: Async; + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async; +} + +pub trait UnboundedChannelTypeProvider: ProvideChannelType +where + Runtime: Async, +{ + fn from_unbounded_sender(sender: Arc>>) -> Self::Sender + where + T: Async; + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async; + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async; + + fn to_unbounded_sender_ref( + sender: &Self::Sender, + ) -> &Arc>> + where + T: Async; + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async; +} + +impl HasUnboundedChannelType for Runtime +where + Runtime: HasComponents, + Components: UnboundedChannelTypeProvider + ProvideChannelType, +{ + fn from_unbounded_sender(sender: Arc>>) -> Self::Sender + where + T: Async, + { + Components::from_unbounded_sender(sender) + } + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async, + { + Components::from_unbounded_receiver(receiver) + } + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async, + { + Components::to_unbounded_receiver(receiver) + } + + fn to_unbounded_sender_ref(sender: &Self::Sender) -> &Arc>> + where + T: Async, + { + Components::to_unbounded_sender_ref(sender) + } + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async, + { + Components::to_unbounded_receiver_ref(receiver) + } +} + +impl UnboundedChannelTypeProvider for Component +where + Runtime: Async, + Component: DelegateComponent, + Delegate: UnboundedChannelTypeProvider, +{ + fn from_unbounded_sender(sender: Arc>>) -> Self::Sender + where + T: Async, + { + Delegate::from_unbounded_sender(sender) + } + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async, + { + Delegate::from_unbounded_receiver(receiver) + } + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async, + { + Delegate::to_unbounded_receiver(receiver) + } + + fn to_unbounded_sender_ref(sender: &Self::Sender) -> &Arc>> + where + T: Async, + { + Delegate::to_unbounded_sender_ref(sender) + } + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async, + { + Delegate::to_unbounded_receiver_ref(receiver) + } +} diff --git a/crates/async-runtime-components/src/channel/types.rs b/crates/async-runtime-components/src/channel/types.rs new file mode 100644 index 000000000..79075280b --- /dev/null +++ b/crates/async-runtime-components/src/channel/types.rs @@ -0,0 +1 @@ +pub struct ChannelClosedError; diff --git a/crates/async-runtime-components/src/channel_once/impls.rs b/crates/async-runtime-components/src/channel_once/impls.rs new file mode 100644 index 000000000..9e82060d5 --- /dev/null +++ b/crates/async-runtime-components/src/channel_once/impls.rs @@ -0,0 +1,101 @@ +use alloc::boxed::Box; + +use cgp_core::prelude::*; +use cgp_core::CanRaiseError; +use futures_channel::oneshot::{channel, Receiver, Sender}; +use ibc_relayer_components_extra::runtime::traits::channel_once::{ + ChannelOnceCreator, ChannelOnceUser, ProvideChannelOnceType, +}; + +use crate::channel::types::ChannelClosedError; +use crate::channel_once::traits::HasOneShotChannelType; +use crate::channel_once::traits::OneShotChannelTypeProvider; + +pub struct ProvideOneShotChannelType; + +impl ProvideChannelOnceType for ProvideOneShotChannelType +where + Runtime: Async, +{ + type SenderOnce = Sender + where + T: Async; + + type ReceiverOnce = Receiver + where + T: Async; +} + +impl OneShotChannelTypeProvider for ProvideOneShotChannelType +where + Runtime: Async, +{ + fn from_oneshot_sender(sender: Sender) -> Self::SenderOnce + where + T: Async, + { + sender + } + + fn from_oneshot_receiver(receiver: Receiver) -> Self::ReceiverOnce + where + T: Async, + { + receiver + } + + fn to_oneshot_sender(sender: Self::SenderOnce) -> Sender + where + T: Async, + { + sender + } + + fn to_oneshot_receiver(receiver: Self::ReceiverOnce) -> Receiver + where + T: Async, + { + receiver + } +} + +impl ChannelOnceCreator for ProvideOneShotChannelType +where + Runtime: HasOneShotChannelType, +{ + fn new_channel_once() -> (Runtime::SenderOnce, Runtime::ReceiverOnce) + where + T: Async, + { + let (sender, receiver) = channel(); + + ( + Runtime::from_oneshot_sender(sender), + Runtime::from_oneshot_receiver(receiver), + ) + } +} + +#[async_trait] +impl ChannelOnceUser for ProvideOneShotChannelType +where + Runtime: HasOneShotChannelType + CanRaiseError, +{ + fn send_once(sender: Runtime::SenderOnce, value: T) -> Result<(), Runtime::Error> + where + T: Async, + { + Runtime::to_oneshot_sender(sender) + .send(value) + .map_err(|_| Runtime::raise_error(ChannelClosedError)) + } + + async fn receive_once(receiver: Runtime::ReceiverOnce) -> Result + where + T: Async, + { + Runtime::to_oneshot_receiver(receiver) + .await + .map_err(|_| Runtime::raise_error(ChannelClosedError)) + } +} diff --git a/crates/async-runtime-components/src/channel_once/mod.rs b/crates/async-runtime-components/src/channel_once/mod.rs new file mode 100644 index 000000000..d44cc4a18 --- /dev/null +++ b/crates/async-runtime-components/src/channel_once/mod.rs @@ -0,0 +1,2 @@ +pub mod impls; +pub mod traits; diff --git a/crates/async-runtime-components/src/channel_once/traits.rs b/crates/async-runtime-components/src/channel_once/traits.rs new file mode 100644 index 000000000..3541daabe --- /dev/null +++ b/crates/async-runtime-components/src/channel_once/traits.rs @@ -0,0 +1,109 @@ +use cgp_core::prelude::*; +use futures_channel::oneshot::{Receiver, Sender}; +use ibc_relayer_components_extra::runtime::traits::channel_once::{ + ChannelOnceTypeComponent, HasChannelOnceTypes, ProvideChannelOnceType, +}; + +pub trait HasOneShotChannelType: HasChannelOnceTypes { + fn from_oneshot_sender(sender: Sender) -> Self::SenderOnce + where + T: Async; + + fn from_oneshot_receiver(receiver: Receiver) -> Self::ReceiverOnce + where + T: Async; + + fn to_oneshot_sender(sender: Self::SenderOnce) -> Sender + where + T: Async; + + fn to_oneshot_receiver(receiver: Self::ReceiverOnce) -> Receiver + where + T: Async; +} + +pub trait OneShotChannelTypeProvider: ProvideChannelOnceType { + fn from_oneshot_sender(sender: Sender) -> Self::SenderOnce + where + T: Async; + + fn from_oneshot_receiver(receiver: Receiver) -> Self::ReceiverOnce + where + T: Async; + + fn to_oneshot_sender(sender: Self::SenderOnce) -> Sender + where + T: Async; + + fn to_oneshot_receiver(receiver: Self::ReceiverOnce) -> Receiver + where + T: Async; +} + +impl HasOneShotChannelType for Runtime +where + Runtime: HasComponents, + Components: OneShotChannelTypeProvider, +{ + fn from_oneshot_sender(sender: Sender) -> Self::SenderOnce + where + T: Async, + { + Components::from_oneshot_sender(sender) + } + + fn from_oneshot_receiver(receiver: Receiver) -> Self::ReceiverOnce + where + T: Async, + { + Components::from_oneshot_receiver(receiver) + } + + fn to_oneshot_sender(sender: Self::SenderOnce) -> Sender + where + T: Async, + { + Components::to_oneshot_sender(sender) + } + + fn to_oneshot_receiver(receiver: Self::ReceiverOnce) -> Receiver + where + T: Async, + { + Components::to_oneshot_receiver(receiver) + } +} + +impl OneShotChannelTypeProvider for Component +where + Component: DelegateComponent, + Delegate: OneShotChannelTypeProvider, +{ + fn from_oneshot_sender(sender: Sender) -> Self::SenderOnce + where + T: Async, + { + Delegate::from_oneshot_sender(sender) + } + + fn from_oneshot_receiver(receiver: Receiver) -> Self::ReceiverOnce + where + T: Async, + { + Delegate::from_oneshot_receiver(receiver) + } + + fn to_oneshot_sender(sender: Self::SenderOnce) -> Sender + where + T: Async, + { + Delegate::to_oneshot_sender(sender) + } + + fn to_oneshot_receiver(receiver: Self::ReceiverOnce) -> Receiver + where + T: Async, + { + Delegate::to_oneshot_receiver(receiver) + } +} diff --git a/crates/async-runtime-components/src/lib.rs b/crates/async-runtime-components/src/lib.rs new file mode 100644 index 000000000..c8efcd790 --- /dev/null +++ b/crates/async-runtime-components/src/lib.rs @@ -0,0 +1,11 @@ +#![no_std] +#![allow(clippy::type_complexity)] + +extern crate alloc; + +pub mod channel; +pub mod channel_once; +pub mod mutex; +pub mod stream; +pub mod subscription; +pub mod task; diff --git a/crates/async-runtime-components/src/mutex/impls/mod.rs b/crates/async-runtime-components/src/mutex/impls/mod.rs new file mode 100644 index 000000000..479802a09 --- /dev/null +++ b/crates/async-runtime-components/src/mutex/impls/mod.rs @@ -0,0 +1 @@ +pub mod mutex; diff --git a/crates/relayer-runtime/src/impls/runtime/mutex.rs b/crates/async-runtime-components/src/mutex/impls/mutex.rs similarity index 54% rename from crates/relayer-runtime/src/impls/runtime/mutex.rs rename to crates/async-runtime-components/src/mutex/impls/mutex.rs index 56d5d45eb..00bbd84f1 100644 --- a/crates/relayer-runtime/src/impls/runtime/mutex.rs +++ b/crates/async-runtime-components/src/mutex/impls/mutex.rs @@ -1,12 +1,15 @@ -use async_trait::async_trait; -use cgp_core::Async; -use futures::lock::{Mutex, MutexGuard}; -use ibc_relayer_components::runtime::traits::mutex::HasMutex; +use alloc::boxed::Box; +use cgp_core::prelude::*; +use futures_util::lock::{Mutex, MutexGuard}; +use ibc_relayer_components::runtime::traits::mutex::ProvideMutex; -use crate::types::runtime::TokioRuntimeContext; +pub struct ProvideFuturesMutex; #[async_trait] -impl HasMutex for TokioRuntimeContext { +impl ProvideMutex for ProvideFuturesMutex +where + Runtime: Async, +{ type Mutex = Mutex; type MutexGuard<'a, T: Async> = MutexGuard<'a, T>; diff --git a/crates/async-runtime-components/src/mutex/mod.rs b/crates/async-runtime-components/src/mutex/mod.rs new file mode 100644 index 000000000..b132bc255 --- /dev/null +++ b/crates/async-runtime-components/src/mutex/mod.rs @@ -0,0 +1 @@ +pub mod impls; diff --git a/crates/async-runtime-components/src/stream/impls/boxed.rs b/crates/async-runtime-components/src/stream/impls/boxed.rs new file mode 100644 index 000000000..e715cdd0d --- /dev/null +++ b/crates/async-runtime-components/src/stream/impls/boxed.rs @@ -0,0 +1,40 @@ +use alloc::boxed::Box; +use core::pin::Pin; + +use cgp_core::prelude::*; +use futures_core::stream::Stream; +use ibc_relayer_components::runtime::traits::stream::ProvideStreamType; + +use crate::stream::traits::boxed::BoxedStreamTypeProvider; + +pub struct ProvideBoxedStreamType; + +impl ProvideStreamType for ProvideBoxedStreamType +where + Runtime: Async, +{ + type Stream = Pin + Send + Sync + 'static>>; +} + +impl BoxedStreamTypeProvider for ProvideBoxedStreamType +where + Runtime: Async, +{ + fn to_boxed_stream( + stream: Self::Stream, + ) -> Pin + Send + Sync + 'static>> + where + Item: Async, + { + stream + } + + fn from_boxed_stream( + stream: Pin + Send + Sync + 'static>>, + ) -> Self::Stream + where + Item: Async, + { + stream + } +} diff --git a/crates/async-runtime-components/src/stream/impls/map.rs b/crates/async-runtime-components/src/stream/impls/map.rs new file mode 100644 index 000000000..f71cf2b0a --- /dev/null +++ b/crates/async-runtime-components/src/stream/impls/map.rs @@ -0,0 +1,24 @@ +use alloc::boxed::Box; +use cgp_core::prelude::*; +use futures_util::stream::StreamExt; +use ibc_relayer_components::runtime::traits::stream::StreamMapper; + +use crate::stream::traits::boxed::HasBoxedStreamType; + +pub struct BoxedStreamMapper; + +impl StreamMapper for BoxedStreamMapper +where + Runtime: HasBoxedStreamType, +{ + fn map_stream(stream: Runtime::Stream, mapper: M) -> Runtime::Stream + where + T: Async, + U: Async, + M: Fn(T) -> U + Async, + { + let mapped = Runtime::to_boxed_stream(stream).map(mapper); + + Runtime::from_boxed_stream(Box::pin(mapped)) + } +} diff --git a/crates/async-runtime-components/src/stream/impls/mod.rs b/crates/async-runtime-components/src/stream/impls/mod.rs new file mode 100644 index 000000000..f4863b628 --- /dev/null +++ b/crates/async-runtime-components/src/stream/impls/mod.rs @@ -0,0 +1,2 @@ +pub mod boxed; +pub mod map; diff --git a/crates/async-runtime-components/src/stream/mod.rs b/crates/async-runtime-components/src/stream/mod.rs new file mode 100644 index 000000000..d44cc4a18 --- /dev/null +++ b/crates/async-runtime-components/src/stream/mod.rs @@ -0,0 +1,2 @@ +pub mod impls; +pub mod traits; diff --git a/crates/async-runtime-components/src/stream/traits/boxed.rs b/crates/async-runtime-components/src/stream/traits/boxed.rs new file mode 100644 index 000000000..24112694d --- /dev/null +++ b/crates/async-runtime-components/src/stream/traits/boxed.rs @@ -0,0 +1,88 @@ +use alloc::boxed::Box; +use core::pin::Pin; + +use cgp_core::prelude::*; +use futures_core::stream::Stream; +use ibc_relayer_components::runtime::traits::stream::{ + HasStreamType, ProvideStreamType, StreamTypeComponent, +}; + +pub trait HasBoxedStreamType: HasStreamType { + fn to_boxed_stream( + stream: Self::Stream, + ) -> Pin + Send + Sync + 'static>> + where + Item: Async; + + fn from_boxed_stream( + stream: Pin + Send + Sync + 'static>>, + ) -> Self::Stream + where + Item: Async; +} + +impl HasBoxedStreamType for Runtime +where + Runtime: HasComponents, + Components: BoxedStreamTypeProvider, +{ + fn to_boxed_stream( + stream: Self::Stream, + ) -> Pin + Send + Sync + 'static>> + where + Item: Async, + { + Components::to_boxed_stream(stream) + } + + fn from_boxed_stream( + stream: Pin + Send + Sync + 'static>>, + ) -> Self::Stream + where + Item: Async, + { + Components::from_boxed_stream(stream) + } +} + +pub trait BoxedStreamTypeProvider: ProvideStreamType +where + Runtime: Async, +{ + fn to_boxed_stream( + stream: Self::Stream, + ) -> Pin + Send + Sync + 'static>> + where + Item: Async; + + fn from_boxed_stream( + stream: Pin + Send + Sync + 'static>>, + ) -> Self::Stream + where + Item: Async; +} + +impl BoxedStreamTypeProvider for Component +where + Runtime: Async, + Component: DelegateComponent, + Delegate: BoxedStreamTypeProvider, +{ + fn to_boxed_stream( + stream: Self::Stream, + ) -> Pin + Send + Sync + 'static>> + where + Item: Async, + { + Delegate::to_boxed_stream(stream) + } + + fn from_boxed_stream( + stream: Pin + Send + Sync + 'static>>, + ) -> Self::Stream + where + Item: Async, + { + Delegate::from_boxed_stream(stream) + } +} diff --git a/crates/async-runtime-components/src/stream/traits/mod.rs b/crates/async-runtime-components/src/stream/traits/mod.rs new file mode 100644 index 000000000..0ffd82a71 --- /dev/null +++ b/crates/async-runtime-components/src/stream/traits/mod.rs @@ -0,0 +1 @@ +pub mod boxed; diff --git a/crates/relayer-subscription/src/impls/closure.rs b/crates/async-runtime-components/src/subscription/impls/closure.rs similarity index 95% rename from crates/relayer-subscription/src/impls/closure.rs rename to crates/async-runtime-components/src/subscription/impls/closure.rs index 7ae420d98..e27178d44 100644 --- a/crates/relayer-subscription/src/impls/closure.rs +++ b/crates/async-runtime-components/src/subscription/impls/closure.rs @@ -1,14 +1,13 @@ +use alloc::boxed::Box; use alloc::sync::Arc; use core::future::Future; use core::pin::Pin; -use async_trait::async_trait; -use cgp_core::Async; +use cgp_core::prelude::*; use futures_core::stream::Stream; use ibc_relayer_components::runtime::traits::mutex::HasMutex; -use crate::std_prelude::*; -use crate::traits::subscription::Subscription; +use crate::subscription::traits::subscription::Subscription; /** An auto trait that is implemented by all runtime contexts that implement diff --git a/crates/relayer-subscription/src/impls/empty.rs b/crates/async-runtime-components/src/subscription/impls/empty.rs similarity index 82% rename from crates/relayer-subscription/src/impls/empty.rs rename to crates/async-runtime-components/src/subscription/impls/empty.rs index 200045c25..a64e51045 100644 --- a/crates/relayer-subscription/src/impls/empty.rs +++ b/crates/async-runtime-components/src/subscription/impls/empty.rs @@ -1,12 +1,11 @@ +use alloc::boxed::Box; use core::marker::PhantomData; use core::pin::Pin; -use async_trait::async_trait; -use cgp_core::Async; +use cgp_core::prelude::*; use futures_core::stream::Stream; -use crate::std_prelude::*; -use crate::traits::subscription::Subscription; +use crate::subscription::traits::subscription::Subscription; pub struct EmptySubscription(pub PhantomData); diff --git a/crates/relayer-subscription/src/impls/mod.rs b/crates/async-runtime-components/src/subscription/impls/mod.rs similarity index 75% rename from crates/relayer-subscription/src/impls/mod.rs rename to crates/async-runtime-components/src/subscription/impls/mod.rs index 276122d19..3cbf7b63d 100644 --- a/crates/relayer-subscription/src/impls/mod.rs +++ b/crates/async-runtime-components/src/subscription/impls/mod.rs @@ -2,3 +2,4 @@ pub mod closure; pub mod empty; pub mod multiplex; pub mod stream; +pub mod subscription; diff --git a/crates/relayer-subscription/src/impls/multiplex.rs b/crates/async-runtime-components/src/subscription/impls/multiplex.rs similarity index 87% rename from crates/relayer-subscription/src/impls/multiplex.rs rename to crates/async-runtime-components/src/subscription/impls/multiplex.rs index 714be912b..f00dba7ab 100644 --- a/crates/relayer-subscription/src/impls/multiplex.rs +++ b/crates/async-runtime-components/src/subscription/impls/multiplex.rs @@ -1,10 +1,11 @@ +use alloc::boxed::Box; use alloc::sync::Arc; +use alloc::vec::Vec; use core::marker::PhantomData; use core::ops::DerefMut; use core::pin::Pin; -use async_trait::async_trait; -use cgp_core::Async; +use cgp_core::prelude::*; use futures_core::stream::Stream; use futures_util::stream::StreamExt; use ibc_relayer_components::runtime::traits::mutex::HasMutex; @@ -15,9 +16,8 @@ use ibc_relayer_components_extra::runtime::traits::channel::{ }; use ibc_relayer_components_extra::runtime::traits::spawn::CanSpawnTask; -use crate::std_prelude::*; -use crate::traits::stream::HasAsyncStreamType; -use crate::traits::subscription::Subscription; +use crate::stream::traits::boxed::HasBoxedStreamType; +use crate::subscription::traits::subscription::Subscription; /** Multiplex the incoming [`Stream`] provided by an underlying [`Subscription`] @@ -102,9 +102,18 @@ where let mut m_senders = Runtime::acquire_mutex(task_senders).await; if let Some(senders) = m_senders.deref_mut() { - // Remove senders where the receiver side has been dropped - senders - .retain(|sender| Runtime::send(sender, mapped.clone()).is_ok()); + let mut new_senders = Vec::new(); + + for sender in senders.drain(..) { + let send_result = Runtime::send(&sender, mapped.clone()).await; + // Remove senders where the receiver side has been dropped, + // i.e. keep the ones where sending is successful + if send_result.is_ok() { + new_senders.push(sender); + } + } + + *senders = new_senders; } }) .await; @@ -129,7 +138,7 @@ where + CanCreateChannels + CanUseChannels + CanStreamReceiver - + HasAsyncStreamType, + + HasBoxedStreamType, { fn multiplex_subscription( &self, @@ -189,7 +198,7 @@ where impl Subscription for MultiplexingSubscription where T: Async, - Runtime: HasMutex + CanCreateChannels + CanStreamReceiver + HasAsyncStreamType, + Runtime: HasMutex + CanCreateChannels + CanStreamReceiver + HasBoxedStreamType, { type Item = T; @@ -206,7 +215,7 @@ where let stream = Runtime::receiver_to_stream(receiver); - Some(Runtime::to_async_stream(stream)) + Some(Runtime::to_boxed_stream(stream)) } None => None, } diff --git a/crates/relayer-subscription/src/impls/stream.rs b/crates/async-runtime-components/src/subscription/impls/stream.rs similarity index 75% rename from crates/relayer-subscription/src/impls/stream.rs rename to crates/async-runtime-components/src/subscription/impls/stream.rs index 6006000e2..64f8f6c7e 100644 --- a/crates/relayer-subscription/src/impls/stream.rs +++ b/crates/async-runtime-components/src/subscription/impls/stream.rs @@ -1,8 +1,9 @@ use alloc::sync::Arc; +use alloc::vec::Vec; use core::ops::DerefMut; -use async_trait::async_trait; -use cgp_core::Async; +use alloc::boxed::Box; +use cgp_core::prelude::*; use futures_core::stream::Stream; use futures_util::stream::StreamExt; use ibc_relayer_components::runtime::traits::mutex::HasMutex; @@ -12,10 +13,9 @@ use ibc_relayer_components_extra::runtime::traits::channel::{ }; use ibc_relayer_components_extra::runtime::traits::spawn::CanSpawnTask; -use crate::impls::multiplex::MultiplexingSubscription; -use crate::std_prelude::*; -use crate::traits::stream::HasAsyncStreamType; -use crate::traits::subscription::Subscription; +use crate::stream::traits::boxed::HasBoxedStreamType; +use crate::subscription::impls::multiplex::MultiplexingSubscription; +use crate::subscription::traits::subscription::Subscription; /** Allows multiplexing of a single [`Stream`] into a subscription. @@ -45,7 +45,7 @@ where #[async_trait] impl Task for StreamSubscriptionTask where - Runtime: HasMutex + CanCreateChannels + CanUseChannels + CanStreamReceiver + HasAsyncStreamType, + Runtime: HasMutex + CanCreateChannels + CanUseChannels + CanStreamReceiver + HasBoxedStreamType, T: Clone + Async, S: Stream + Async, { @@ -57,8 +57,18 @@ where let mut m_senders = Runtime::acquire_mutex(task_senders).await; if let Some(senders) = m_senders.deref_mut() { - // Remove senders where the receiver side has been dropped - senders.retain(|sender| Runtime::send(sender, item.clone()).is_ok()); + let mut new_senders = Vec::new(); + + for sender in senders.drain(..) { + let send_result = Runtime::send(&sender, item.clone()).await; + // Remove senders where the receiver side has been dropped, + // i.e. keep the ones where sending is successful + if send_result.is_ok() { + new_senders.push(sender); + } + } + + *senders = new_senders; } }) .await; @@ -75,7 +85,7 @@ where + CanCreateChannels + CanUseChannels + CanStreamReceiver - + HasAsyncStreamType, + + HasBoxedStreamType, { fn stream_subscription(&self, stream: S) -> Arc> where diff --git a/crates/async-runtime-components/src/subscription/impls/subscription.rs b/crates/async-runtime-components/src/subscription/impls/subscription.rs new file mode 100644 index 000000000..a39da3320 --- /dev/null +++ b/crates/async-runtime-components/src/subscription/impls/subscription.rs @@ -0,0 +1,27 @@ +use alloc::boxed::Box; +use alloc::sync::Arc; +use cgp_core::prelude::*; +use ibc_relayer_components::runtime::traits::subscription::ProvideSubscription; + +use crate::stream::traits::boxed::HasBoxedStreamType; +use crate::subscription::traits::subscription::Subscription; + +pub struct ProvideBoxedSubscription; + +#[async_trait] +impl ProvideSubscription for ProvideBoxedSubscription +where + Runtime: HasBoxedStreamType, +{ + type Subscription = Arc>; + + async fn subscribe(subscription: &Self::Subscription) -> Option> + where + T: Async, + { + subscription + .subscribe() + .await + .map(Runtime::from_boxed_stream) + } +} diff --git a/crates/async-runtime-components/src/subscription/mod.rs b/crates/async-runtime-components/src/subscription/mod.rs new file mode 100644 index 000000000..d44cc4a18 --- /dev/null +++ b/crates/async-runtime-components/src/subscription/mod.rs @@ -0,0 +1,2 @@ +pub mod impls; +pub mod traits; diff --git a/crates/relayer-subscription/src/traits/mod.rs b/crates/async-runtime-components/src/subscription/traits/mod.rs similarity index 57% rename from crates/relayer-subscription/src/traits/mod.rs rename to crates/async-runtime-components/src/subscription/traits/mod.rs index b4e042625..3e515356f 100644 --- a/crates/relayer-subscription/src/traits/mod.rs +++ b/crates/async-runtime-components/src/subscription/traits/mod.rs @@ -1,2 +1 @@ -pub mod stream; pub mod subscription; diff --git a/crates/relayer-subscription/src/traits/stream.rs b/crates/async-runtime-components/src/subscription/traits/stream.rs similarity index 94% rename from crates/relayer-subscription/src/traits/stream.rs rename to crates/async-runtime-components/src/subscription/traits/stream.rs index a5960a4ca..a42dfcee5 100644 --- a/crates/relayer-subscription/src/traits/stream.rs +++ b/crates/async-runtime-components/src/subscription/traits/stream.rs @@ -4,8 +4,6 @@ use cgp_core::Async; use futures_core::stream::Stream; use ibc_relayer_components::runtime::traits::stream::HasStreamType; -use crate::std_prelude::*; - pub trait HasAsyncStreamType: HasStreamType { fn from_async_stream( stream: Pin + Send + Sync + 'static>>, diff --git a/crates/relayer-subscription/src/traits/subscription.rs b/crates/async-runtime-components/src/subscription/traits/subscription.rs similarity index 98% rename from crates/relayer-subscription/src/traits/subscription.rs rename to crates/async-runtime-components/src/subscription/traits/subscription.rs index 472c53605..26c38089d 100644 --- a/crates/relayer-subscription/src/traits/subscription.rs +++ b/crates/async-runtime-components/src/subscription/traits/subscription.rs @@ -1,12 +1,10 @@ +use alloc::boxed::Box; use alloc::sync::Arc; use core::pin::Pin; -use async_trait::async_trait; -use cgp_core::Async; +use cgp_core::prelude::*; use futures_core::stream::Stream; -use crate::std_prelude::*; - /** A [`Subscription`] is a multi-consumer abstraction over a single-consumer [`Stream`] construct. A [`Subscription`] value can be shared by wrapping diff --git a/crates/async-runtime-components/src/task/impls/concurrent.rs b/crates/async-runtime-components/src/task/impls/concurrent.rs new file mode 100644 index 000000000..6dcca71bd --- /dev/null +++ b/crates/async-runtime-components/src/task/impls/concurrent.rs @@ -0,0 +1,37 @@ +use alloc::boxed::Box; +use alloc::vec::Vec; +use cgp_core::prelude::*; +use futures_util::stream::{self, Stream, StreamExt}; +use ibc_relayer_components::runtime::traits::task::{ConcurrentTaskRunner, Task}; + +use crate::stream::traits::boxed::HasBoxedStreamType; + +pub struct RunConcurrentTasks; + +#[async_trait] +impl ConcurrentTaskRunner for RunConcurrentTasks +where + Runtime: HasBoxedStreamType, +{ + async fn run_concurrent_tasks(_runtime: &Runtime, tasks: Vec) + where + T: Task, + { + run_concurrent_tasks(stream::iter(tasks)).await + } + + async fn run_concurrent_task_stream(_runtime: &Runtime, tasks: Runtime::Stream) + where + T: Task, + { + run_concurrent_tasks(Runtime::to_boxed_stream(tasks)).await + } +} +pub async fn run_concurrent_tasks(tasks: impl Stream) +where + T: Task, +{ + tasks + .for_each_concurrent(None, |task| Box::pin(async move { task.run().await })) + .await; +} diff --git a/crates/async-runtime-components/src/task/impls/mod.rs b/crates/async-runtime-components/src/task/impls/mod.rs new file mode 100644 index 000000000..df4f37853 --- /dev/null +++ b/crates/async-runtime-components/src/task/impls/mod.rs @@ -0,0 +1 @@ +pub mod concurrent; diff --git a/crates/async-runtime-components/src/task/mod.rs b/crates/async-runtime-components/src/task/mod.rs new file mode 100644 index 000000000..b132bc255 --- /dev/null +++ b/crates/async-runtime-components/src/task/mod.rs @@ -0,0 +1 @@ +pub mod impls; diff --git a/crates/cosmos-client-components/Cargo.toml b/crates/cosmos-client-components/Cargo.toml index 11d8f8acb..1f30f99e1 100644 --- a/crates/cosmos-client-components/Cargo.toml +++ b/crates/cosmos-client-components/Cargo.toml @@ -20,7 +20,6 @@ cgp-core = { workspace = true } ibc-relayer-runtime = { workspace = true } ibc-relayer-components = { workspace = true } ibc-relayer-components-extra = { workspace = true } -ibc-relayer-subscription = { workspace = true } ibc-test-components = { workspace = true } tendermint = { workspace = true, features = ["secp256k1"] } tendermint-rpc = { workspace = true, features = ["http-client", "websocket-client"] } diff --git a/crates/cosmos-test-components/Cargo.toml b/crates/cosmos-test-components/Cargo.toml index dc50a3f8c..80ea363a7 100644 --- a/crates/cosmos-test-components/Cargo.toml +++ b/crates/cosmos-test-components/Cargo.toml @@ -20,7 +20,6 @@ cgp-core = { workspace = true } ibc-relayer-runtime = { workspace = true } ibc-relayer-components = { workspace = true } ibc-relayer-components-extra = { workspace = true } -ibc-relayer-subscription = { workspace = true } ibc-test-components = { workspace = true } tendermint = { workspace = true, features = ["secp256k1"] } tendermint-rpc = { workspace = true, features = ["http-client", "websocket-client"] } diff --git a/crates/relayer-components-extra/src/batch/components/message_sender.rs b/crates/relayer-components-extra/src/batch/components/message_sender.rs index a59909fa9..2282d0b3f 100644 --- a/crates/relayer-components-extra/src/batch/components/message_sender.rs +++ b/crates/relayer-components-extra/src/batch/components/message_sender.rs @@ -37,6 +37,7 @@ where let message_sender = context.get_batch_sender(); Runtime::send(message_sender, (messages, result_sender)) + .await .map_err(TargetChain::raise_error) .map_err(Target::target_chain_error)?; diff --git a/crates/relayer-components-extra/src/batch/worker.rs b/crates/relayer-components-extra/src/batch/worker.rs index 4389abe80..5ca71aa12 100644 --- a/crates/relayer-components-extra/src/batch/worker.rs +++ b/crates/relayer-components-extra/src/batch/worker.rs @@ -2,7 +2,7 @@ use alloc::collections::VecDeque; use core::marker::PhantomData; use core::mem; -use cgp_core::{async_trait, Async}; +use cgp_core::prelude::*; use ibc_relayer_components::chain::traits::types::chain::HasChainTypes; use ibc_relayer_components::chain::traits::types::message::{ CanEstimateMessageSize, HasMessageType, @@ -47,7 +47,7 @@ where Relay: Clone + CanRunLoop, Target: ChainTarget, Target::TargetChain: HasRuntime, - Runtime: CanSpawnTask + HasChannelTypes + HasChannelOnceTypes, + Runtime: CanSpawnTask + HasChannelTypes + HasChannelOnceTypes + HasErrorType, { fn spawn_batch_message_worker( &self, @@ -85,7 +85,7 @@ where Relay: CanRunLoop, Target: ChainTarget, Target::TargetChain: HasRuntime, - Runtime: HasChannelTypes + HasChannelOnceTypes, + Runtime: HasChannelTypes + HasChannelOnceTypes + HasErrorType, { async fn run(self) { self.relay.run_loop(&self.config, self.receiver).await; @@ -197,7 +197,7 @@ where Target: ChainTarget, Target::TargetChain: HasRuntime, Target::TargetChain: CanPartitionMessageBatches, - Runtime: HasTime + CanSpawnTask + HasChannelTypes + HasChannelOnceTypes, + Runtime: HasTime + CanSpawnTask + HasChannelTypes + HasChannelOnceTypes + HasErrorType, { async fn process_message_batches( &self, @@ -250,7 +250,7 @@ where Error: Async, Chain: HasChainTypes + HasRuntime, Chain: CanEstimateBatchSize, - Runtime: HasChannelTypes + HasChannelOnceTypes, + Runtime: HasChannelTypes + HasChannelOnceTypes + HasErrorType, { fn partition_message_batches( config: &BatchConfig, diff --git a/crates/relayer-components-extra/src/build/components/relay/batch.rs b/crates/relayer-components-extra/src/build/components/relay/batch.rs index 0088d302f..7e8389fc0 100644 --- a/crates/relayer-components-extra/src/build/components/relay/batch.rs +++ b/crates/relayer-components-extra/src/build/components/relay/batch.rs @@ -45,8 +45,8 @@ where DstChain: HasIbcChainTypes, SrcChain: HasRuntime + HasChainId, DstChain: HasRuntime + HasChainId, - SrcRuntime: HasChannelTypes + HasChannelOnceTypes, - DstRuntime: HasChannelTypes + HasChannelOnceTypes, + SrcRuntime: HasChannelTypes + HasChannelOnceTypes + HasErrorType, + DstRuntime: HasChannelTypes + HasChannelOnceTypes + HasErrorType, Build::Runtime: HasMutex, { async fn build_relay_from_chains( @@ -142,7 +142,7 @@ where Target: ChainBuildTarget, Chain: HasIbcChainTypes + HasRuntime, Counterparty: HasIbcChainTypes, - Runtime: CanCreateChannels + HasChannelOnceTypes + CanCloneSender, + Runtime: CanCreateChannels + HasChannelOnceTypes + CanCloneSender + HasErrorType, Build: HasBatchSenderCache>, Build::Runtime: HasMutex, Chain::ChainId: Ord + Clone, diff --git a/crates/relayer-components-extra/src/components/extra/closures/relay/auto_relayer.rs b/crates/relayer-components-extra/src/components/extra/closures/relay/auto_relayer.rs index 705b358fe..704542e09 100644 --- a/crates/relayer-components-extra/src/components/extra/closures/relay/auto_relayer.rs +++ b/crates/relayer-components-extra/src/components/extra/closures/relay/auto_relayer.rs @@ -7,7 +7,7 @@ use ibc_relayer_components::logger::traits::level::HasBaseLogLevels; use ibc_relayer_components::relay::traits::chains::HasRelayChains; use ibc_relayer_components::runtime::traits::runtime::{HasRuntime, HasRuntimeType}; use ibc_relayer_components::runtime::traits::stream::CanMapStream; -use ibc_relayer_components::runtime::traits::subscription::HasSubscriptionType; +use ibc_relayer_components::runtime::traits::subscription::HasSubscription; use ibc_relayer_components::runtime::traits::task::CanRunConcurrentTasks; use crate::components::extra::closures::relay::event_relayer::UseExtraEventRelayer; @@ -39,9 +39,9 @@ where Relay::Runtime: CanSpawnTask + CanRunConcurrentTasks, Relay::Logger: HasBaseLogLevels, ::Runtime: - HasSubscriptionType + CanRunConcurrentTasks + CanMapStream, + HasSubscription + CanRunConcurrentTasks + CanMapStream, ::Runtime: - HasSubscriptionType + CanRunConcurrentTasks + CanMapStream, + HasSubscription + CanRunConcurrentTasks + CanMapStream, Components: DelegatesToExtraRelayComponents, { } diff --git a/crates/relayer-components-extra/src/runtime/traits/channel.rs b/crates/relayer-components-extra/src/runtime/traits/channel.rs index 1fdc373c1..35cb95441 100644 --- a/crates/relayer-components-extra/src/runtime/traits/channel.rs +++ b/crates/relayer-components-extra/src/runtime/traits/channel.rs @@ -7,7 +7,7 @@ [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html). */ -use cgp_core::{async_trait, Async, HasErrorType}; +use cgp_core::prelude::*; use ibc_relayer_components::runtime::traits::stream::HasStreamType; use crate::std_prelude::*; @@ -54,7 +54,8 @@ use crate::std_prelude::*; which defines abstract one-shot channel types that allow at most one message to be sent over. */ -pub trait HasChannelTypes: HasErrorType { +#[derive_component(ChannelTypeComponent, ProvideChannelType)] +pub trait HasChannelTypes: Async { /** The sender end of a channel with payload type `T`. */ @@ -74,6 +75,7 @@ pub trait HasChannelTypes: HasErrorType { Allow the creation of new sender-receiver pairs for the channel types defined in [`HasChannelTypes`]. */ +#[derive_component(ChannelCreatorComponent, ChannelCreator)] pub trait CanCreateChannels: HasChannelTypes { /** Given a generic payload type `T`, create a @@ -103,8 +105,9 @@ pub trait CanCreateChannels: HasChannelTypes { [`Sender`](HasChannelTypes::Sender) and [`Receiver`](HasChannelTypes::Receiver) ends of a channel. */ +#[derive_component(ChannelUserComponent, ChannelUser)] #[async_trait] -pub trait CanUseChannels: HasChannelTypes { +pub trait CanUseChannels: HasChannelTypes + HasErrorType { /** Given a reference to [`Sender`](HasChannelTypes::Sender), send a message payload of type `T` over the sender. @@ -119,7 +122,7 @@ pub trait CanUseChannels: HasChannelTypes { that any operation after `receive()` is called on the receiving end should _never_ execute within `send()`. */ - fn send(sender: &Self::Sender, value: T) -> Result<(), Self::Error> + async fn send(sender: &Self::Sender, value: T) -> Result<(), Self::Error> where T: Async; @@ -140,12 +143,14 @@ pub trait CanUseChannels: HasChannelTypes { T: Async; } +#[derive_component(ReceiverStreamerComponent, ReceiverStreamer)] pub trait CanStreamReceiver: HasChannelTypes + HasStreamType { fn receiver_to_stream(receiver: Self::Receiver) -> Self::Stream where T: Async; } +#[derive_component(SenderClonerComponent, SenderCloner)] pub trait CanCloneSender: HasChannelTypes { fn clone_sender(sender: &Self::Sender) -> Self::Sender where diff --git a/crates/relayer-components-extra/src/runtime/traits/channel_once.rs b/crates/relayer-components-extra/src/runtime/traits/channel_once.rs index 7be3c46e6..e525e0755 100644 --- a/crates/relayer-components-extra/src/runtime/traits/channel_once.rs +++ b/crates/relayer-components-extra/src/runtime/traits/channel_once.rs @@ -1,8 +1,9 @@ -use cgp_core::{async_trait, Async, HasErrorType}; +use alloc::boxed::Box; -use crate::std_prelude::*; +use cgp_core::prelude::*; -pub trait HasChannelOnceTypes: HasErrorType { +#[derive_component(ChannelOnceTypeComponent, ProvideChannelOnceType)] +pub trait HasChannelOnceTypes { type SenderOnce: Async where T: Async; @@ -12,14 +13,16 @@ pub trait HasChannelOnceTypes: HasErrorType { T: Async; } +#[derive_component(ChannelOnceCreatorComponent, ChannelOnceCreator)] pub trait CanCreateChannelsOnce: HasChannelOnceTypes { fn new_channel_once() -> (Self::SenderOnce, Self::ReceiverOnce) where T: Async; } +#[derive_component(ChannelOnceUserComponent, ChannelOnceUser)] #[async_trait] -pub trait CanUseChannelsOnce: HasChannelOnceTypes { +pub trait CanUseChannelsOnce: HasChannelOnceTypes + HasErrorType { fn send_once(sender: Self::SenderOnce, value: T) -> Result<(), Self::Error> where T: Async; diff --git a/crates/relayer-components-extra/src/runtime/traits/spawn.rs b/crates/relayer-components-extra/src/runtime/traits/spawn.rs index 29a11fdf5..68d0f3159 100644 --- a/crates/relayer-components-extra/src/runtime/traits/spawn.rs +++ b/crates/relayer-components-extra/src/runtime/traits/spawn.rs @@ -1,6 +1,7 @@ -use cgp_core::Async; +use cgp_core::prelude::*; use ibc_relayer_components::runtime::traits::task::Task; +#[derive_component(TaskSpawnerComponent, TaskSpawner)] pub trait CanSpawnTask: Async { fn spawn_task(&self, task: T) where diff --git a/crates/relayer-components/src/chain/traits/event_subscription.rs b/crates/relayer-components/src/chain/traits/event_subscription.rs index a25263e8b..f0141ac99 100644 --- a/crates/relayer-components/src/chain/traits/event_subscription.rs +++ b/crates/relayer-components/src/chain/traits/event_subscription.rs @@ -1,13 +1,13 @@ use crate::chain::traits::types::event::HasEventType; use crate::chain::traits::types::height::HasHeightType; use crate::runtime::traits::runtime::HasRuntime; -use crate::runtime::traits::subscription::HasSubscriptionType; +use crate::runtime::traits::subscription::HasSubscription; pub trait HasEventSubscription: HasHeightType + HasEventType + HasRuntime where - Self::Runtime: HasSubscriptionType, + Self::Runtime: HasSubscription, { fn event_subscription( &self, - ) -> &::Subscription<(Self::Height, Self::Event)>; + ) -> &::Subscription<(Self::Height, Self::Event)>; } diff --git a/crates/relayer-components/src/components/default/closures/relay/auto_relayer.rs b/crates/relayer-components/src/components/default/closures/relay/auto_relayer.rs index c580eae37..d13dd8c54 100644 --- a/crates/relayer-components/src/components/default/closures/relay/auto_relayer.rs +++ b/crates/relayer-components/src/components/default/closures/relay/auto_relayer.rs @@ -6,7 +6,7 @@ use crate::components::default::relay::DelegatesToDefaultRelayComponents; use crate::relay::traits::chains::HasRelayChains; use crate::runtime::traits::runtime::{HasRuntime, HasRuntimeType}; use crate::runtime::traits::stream::CanMapStream; -use crate::runtime::traits::subscription::HasSubscriptionType; +use crate::runtime::traits::subscription::HasSubscription; use crate::runtime::traits::task::CanRunConcurrentTasks; pub trait CanUseDefaultAutoRelayer: UseDefaultAutoRelayer {} @@ -24,9 +24,9 @@ where Relay::DstChain: HasEventSubscription, Relay::Runtime: CanRunConcurrentTasks, ::Runtime: - HasSubscriptionType + CanRunConcurrentTasks + CanMapStream, + HasSubscription + CanRunConcurrentTasks + CanMapStream, ::Runtime: - HasSubscriptionType + CanRunConcurrentTasks + CanMapStream, + HasSubscription + CanRunConcurrentTasks + CanMapStream, Components: DelegatesToDefaultRelayComponents, { } diff --git a/crates/relayer-components/src/relay/components/auto_relayers/event.rs b/crates/relayer-components/src/relay/components/auto_relayers/event.rs index 64130e290..7e60e7ab9 100644 --- a/crates/relayer-components/src/relay/components/auto_relayers/event.rs +++ b/crates/relayer-components/src/relay/components/auto_relayers/event.rs @@ -11,7 +11,7 @@ use crate::relay::traits::components::event_relayer::CanRelayEvent; use crate::relay::traits::target::ChainTarget; use crate::runtime::traits::runtime::HasRuntime; use crate::runtime::traits::stream::CanMapStream; -use crate::runtime::traits::subscription::HasSubscriptionType; +use crate::runtime::traits::subscription::HasSubscription; use crate::runtime::traits::task::{CanRunConcurrentTasks, Task}; use crate::std_prelude::*; @@ -52,7 +52,7 @@ where Relay: CanRelayEvent + HasRuntime + Clone, Target: ChainTarget, Target::TargetChain: HasEventSubscription, - Runtime: HasSubscriptionType + CanMapStream + CanRunConcurrentTasks + HasErrorType, + Runtime: HasSubscription + CanMapStream + CanRunConcurrentTasks + HasErrorType, { async fn auto_relay(relay: &Relay, _target: Target) -> Result<(), Relay::Error> { let subscription = Target::target_chain(relay).event_subscription(); diff --git a/crates/relayer-components/src/runtime/traits/mutex.rs b/crates/relayer-components/src/runtime/traits/mutex.rs index 3b3c0a380..fb9c5e723 100644 --- a/crates/relayer-components/src/runtime/traits/mutex.rs +++ b/crates/relayer-components/src/runtime/traits/mutex.rs @@ -1,9 +1,9 @@ +use alloc::boxed::Box; use core::ops::DerefMut; -use cgp_core::{async_trait, Async}; - -use crate::std_prelude::*; +use cgp_core::prelude::*; +#[derive_component(MutexComponent, ProvideMutex)] #[async_trait] pub trait HasMutex: Async { type Mutex: Async; diff --git a/crates/relayer-components/src/runtime/traits/sleep.rs b/crates/relayer-components/src/runtime/traits/sleep.rs index 4da248125..80bc4482f 100644 --- a/crates/relayer-components/src/runtime/traits/sleep.rs +++ b/crates/relayer-components/src/runtime/traits/sleep.rs @@ -1,10 +1,9 @@ +use alloc::boxed::Box; use core::time::Duration; -use cgp_core::{async_trait, Async}; - -#[allow(unused_imports)] -use crate::std_prelude::*; +use cgp_core::prelude::*; +#[derive_component(SleeperComponent, Sleeper)] #[async_trait] pub trait CanSleep: Async { async fn sleep(&self, duration: Duration); diff --git a/crates/relayer-components/src/runtime/traits/stream.rs b/crates/relayer-components/src/runtime/traits/stream.rs index 5f4c88e89..182d85043 100644 --- a/crates/relayer-components/src/runtime/traits/stream.rs +++ b/crates/relayer-components/src/runtime/traits/stream.rs @@ -1,9 +1,11 @@ -use cgp_core::Async; +use cgp_core::prelude::*; +#[derive_component(StreamTypeComponent, ProvideStreamType)] pub trait HasStreamType: Async { type Stream: Async; } +#[derive_component(StreamMapperComponent, StreamMapper)] pub trait CanMapStream: HasStreamType { fn map_stream(stream: Self::Stream, mapper: M) -> Self::Stream where diff --git a/crates/relayer-components/src/runtime/traits/subscription.rs b/crates/relayer-components/src/runtime/traits/subscription.rs index dff711de3..9b4382615 100644 --- a/crates/relayer-components/src/runtime/traits/subscription.rs +++ b/crates/relayer-components/src/runtime/traits/subscription.rs @@ -1,10 +1,11 @@ -use cgp_core::{async_trait, Async}; +use cgp_core::prelude::*; use crate::runtime::traits::stream::HasStreamType; use crate::std_prelude::*; +#[derive_component(SubscriptionComponent, ProvideSubscription)] #[async_trait] -pub trait HasSubscriptionType: HasStreamType { +pub trait HasSubscription: HasStreamType { type Subscription: Async; async fn subscribe(subcription: &Self::Subscription) -> Option> diff --git a/crates/relayer-components/src/runtime/traits/task.rs b/crates/relayer-components/src/runtime/traits/task.rs index f918adb1a..40b138cc3 100644 --- a/crates/relayer-components/src/runtime/traits/task.rs +++ b/crates/relayer-components/src/runtime/traits/task.rs @@ -1,13 +1,15 @@ -use cgp_core::{async_trait, Async}; +use alloc::boxed::Box; +use alloc::vec::Vec; +use cgp_core::prelude::*; use crate::runtime::traits::stream::HasStreamType; -use crate::std_prelude::*; #[async_trait] pub trait Task: Async { async fn run(self); } +#[derive_component(ConcurrentTaskRunnerComponent, ConcurrentTaskRunner)] #[async_trait] pub trait CanRunConcurrentTasks: HasStreamType { async fn run_concurrent_tasks(&self, tasks: Vec) diff --git a/crates/relayer-components/src/runtime/traits/time.rs b/crates/relayer-components/src/runtime/traits/time.rs index b72d69f45..4376fd934 100644 --- a/crates/relayer-components/src/runtime/traits/time.rs +++ b/crates/relayer-components/src/runtime/traits/time.rs @@ -1,7 +1,8 @@ use core::time::Duration; -use cgp_core::Async; +use cgp_core::prelude::*; +#[derive_component(TimeComponent, ProvideTime)] pub trait HasTime: Async { type Time: Async; diff --git a/crates/relayer-cosmos/Cargo.toml b/crates/relayer-cosmos/Cargo.toml index 04313112e..b15fffd82 100644 --- a/crates/relayer-cosmos/Cargo.toml +++ b/crates/relayer-cosmos/Cargo.toml @@ -24,8 +24,8 @@ cgp-core = { workspace = true } ibc-relayer-runtime = { workspace = true } ibc-relayer-components = { workspace = true } ibc-relayer-components-extra = { workspace = true } -ibc-relayer-subscription = { workspace = true } ibc-test-components = { workspace = true } +async-runtime-components = { workspace = true } cosmos-client-components = { workspace = true } tendermint = { workspace = true, features = ["secp256k1"] } tendermint-rpc = { workspace = true, features = ["http-client", "websocket-client"] } diff --git a/crates/relayer-cosmos/src/contexts/chain.rs b/crates/relayer-cosmos/src/contexts/chain.rs index f667f53f5..d61d22409 100644 --- a/crates/relayer-cosmos/src/contexts/chain.rs +++ b/crates/relayer-cosmos/src/contexts/chain.rs @@ -1,13 +1,13 @@ use alloc::sync::Arc; +use async_runtime_components::subscription::impls::empty::EmptySubscription; +use async_runtime_components::subscription::traits::subscription::Subscription; use ibc_relayer::chain::cosmos::types::config::TxConfig; use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer::config::EventSourceMode; use ibc_relayer::event::source::queries::all as all_queries; use ibc_relayer::keyring::Secp256k1KeyPair; use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; -use ibc_relayer_subscription::impls::empty::EmptySubscription; -use ibc_relayer_subscription::traits::subscription::Subscription; use ibc_relayer_types::core::ics02_client::height::Height; use ibc_relayer_types::core::ics24_host::identifier::ChainId; use tendermint::abci::Event as AbciEvent; diff --git a/crates/relayer-cosmos/src/impls/chain/fields.rs b/crates/relayer-cosmos/src/impls/chain/fields.rs index c090da096..7844aa080 100644 --- a/crates/relayer-cosmos/src/impls/chain/fields.rs +++ b/crates/relayer-cosmos/src/impls/chain/fields.rs @@ -1,5 +1,6 @@ use alloc::sync::Arc; +use async_runtime_components::subscription::traits::subscription::Subscription; use cgp_core::Async; use cosmos_client_components::traits::message::CosmosMessage; use cosmos_client_components::types::tendermint::TendermintClientState; @@ -9,7 +10,6 @@ use ibc_relayer_components::chain::traits::types::client_state::HasClientStateFi use ibc_relayer_components::chain::traits::types::height::{CanIncrementHeight, HasHeightType}; use ibc_relayer_components::chain::traits::types::ibc::HasCounterpartyMessageHeight; use ibc_relayer_components::chain::traits::types::message::CanEstimateMessageSize; -use ibc_relayer_subscription::traits::subscription::Subscription; use ibc_relayer_types::core::ics24_host::identifier::ChainId; use ibc_relayer_types::signer::Signer; use ibc_relayer_types::Height; diff --git a/crates/relayer-cosmos/src/impls/subscription.rs b/crates/relayer-cosmos/src/impls/subscription.rs index ff634c3ed..fdd7e2d52 100644 --- a/crates/relayer-cosmos/src/impls/subscription.rs +++ b/crates/relayer-cosmos/src/impls/subscription.rs @@ -2,15 +2,15 @@ use alloc::sync::Arc; use core::pin::Pin; use core::time::Duration; +use async_runtime_components::subscription::impls::closure::CanCreateClosureSubscription; +use async_runtime_components::subscription::impls::multiplex::CanMultiplexSubscription; +use async_runtime_components::subscription::traits::subscription::Subscription; use async_trait::async_trait; use cgp_core::Async; use futures::lock::Mutex; use futures::stream::{self, Stream, StreamExt, TryStreamExt}; use ibc_relayer_components::runtime::traits::task::Task; use ibc_relayer_components_extra::runtime::traits::spawn::CanSpawnTask; -use ibc_relayer_subscription::impls::closure::CanCreateClosureSubscription; -use ibc_relayer_subscription::impls::multiplex::CanMultiplexSubscription; -use ibc_relayer_subscription::traits::subscription::Subscription; use ibc_relayer_types::core::ics02_client::height::Height; use moka::future::Cache; use tendermint::abci::Event as AbciEvent; diff --git a/crates/relayer-cosmos/src/types/batch.rs b/crates/relayer-cosmos/src/types/batch.rs index f3ae665f7..f16eb37b5 100644 --- a/crates/relayer-cosmos/src/types/batch.rs +++ b/crates/relayer-cosmos/src/types/batch.rs @@ -1,9 +1,10 @@ use alloc::sync::Arc; use cosmos_client_components::traits::message::CosmosMessage; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures::channel::oneshot::Sender as SenderOnce; +use futures::lock::Mutex; use tendermint::abci::Event as AbciEvent; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot::Sender as SenderOnce; use crate::types::error::Error; @@ -12,6 +13,6 @@ pub type CosmosBatchPayload = ( SenderOnce>>, Error>>, ); -pub type CosmosBatchSender = UnboundedSender; +pub type CosmosBatchSender = Arc>>; pub type CosmosBatchReceiver = UnboundedReceiver; diff --git a/crates/relayer-runtime/Cargo.toml b/crates/relayer-runtime/Cargo.toml index 027af0763..0dfc3c3a1 100644 --- a/crates/relayer-runtime/Cargo.toml +++ b/crates/relayer-runtime/Cargo.toml @@ -15,8 +15,8 @@ description = """ [dependencies] ibc-relayer-components = { workspace = true } ibc-relayer-components-extra = { workspace = true } -ibc-relayer-subscription = { workspace = true } ibc-test-components = { workspace = true } +async-runtime-components = { workspace = true } tokio-runtime-components = { workspace = true } async-trait = { workspace = true } @@ -24,5 +24,3 @@ cgp-core = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } futures = { workspace = true } - -tokio-stream = "0.1" diff --git a/crates/relayer-runtime/src/impls/mod.rs b/crates/relayer-runtime/src/impls/mod.rs index d23564f31..d09110da9 100644 --- a/crates/relayer-runtime/src/impls/mod.rs +++ b/crates/relayer-runtime/src/impls/mod.rs @@ -1,4 +1,3 @@ pub mod logger; pub mod runtime; -pub mod task; pub mod types; diff --git a/crates/relayer-runtime/src/impls/runtime/channel.rs b/crates/relayer-runtime/src/impls/runtime/channel.rs deleted file mode 100644 index ebe8647f7..000000000 --- a/crates/relayer-runtime/src/impls/runtime/channel.rs +++ /dev/null @@ -1,127 +0,0 @@ -use core::pin::Pin; - -use async_trait::async_trait; -use cgp_core::Async; -use futures::Stream; -use ibc_relayer_components_extra::runtime::traits::channel::{ - CanCloneSender, CanCreateChannels, CanStreamReceiver, CanUseChannels, HasChannelTypes, -}; -use ibc_relayer_components_extra::runtime::traits::channel_once::{ - CanCreateChannelsOnce, CanUseChannelsOnce, HasChannelOnceTypes, -}; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; - -use crate::types::error::TokioRuntimeError; -use crate::types::runtime::TokioRuntimeContext; - -impl HasChannelTypes for TokioRuntimeContext { - type Sender = mpsc::UnboundedSender - where - T: Async; - - type Receiver = mpsc::UnboundedReceiver - where - T: Async; -} - -impl HasChannelOnceTypes for TokioRuntimeContext { - type SenderOnce = oneshot::Sender - where - T: Async; - - type ReceiverOnce = oneshot::Receiver - where - T: Async; -} - -impl CanCreateChannels for TokioRuntimeContext { - fn new_channel() -> (Self::Sender, Self::Receiver) - where - T: Async, - { - mpsc::unbounded_channel() - } -} - -impl CanCreateChannelsOnce for TokioRuntimeContext { - fn new_channel_once() -> (Self::SenderOnce, Self::ReceiverOnce) - where - T: Async, - { - let (sender, receiver) = oneshot::channel(); - (sender, receiver) - } -} - -#[async_trait] -impl CanUseChannels for TokioRuntimeContext { - fn send(sender: &Self::Sender, value: T) -> Result<(), Self::Error> - where - T: Async, - { - sender - .send(value) - .map_err(|_| TokioRuntimeError::ChannelClosed) - } - - async fn receive(receiver: &mut Self::Receiver) -> Result - where - T: Async, - { - receiver - .recv() - .await - .ok_or(TokioRuntimeError::ChannelClosed) - } - - fn try_receive(receiver: &mut Self::Receiver) -> Result, Self::Error> - where - T: Async, - { - match receiver.try_recv() { - Ok(batch) => Ok(Some(batch)), - Err(mpsc::error::TryRecvError::Empty) => Ok(None), - Err(mpsc::error::TryRecvError::Disconnected) => Err(TokioRuntimeError::ChannelClosed), - } - } -} - -#[async_trait] -impl CanUseChannelsOnce for TokioRuntimeContext { - fn send_once(sender: Self::SenderOnce, value: T) -> Result<(), Self::Error> - where - T: Async, - { - sender - .send(value) - .map_err(|_| TokioRuntimeError::ChannelClosed) - } - - async fn receive_once(receiver: Self::ReceiverOnce) -> Result - where - T: Async, - { - receiver.await.map_err(|_| TokioRuntimeError::ChannelClosed) - } -} - -impl CanStreamReceiver for TokioRuntimeContext { - fn receiver_to_stream( - receiver: Self::Receiver, - ) -> Pin + Send + Sync + 'static>> - where - T: Async, - { - Box::pin(UnboundedReceiverStream::new(receiver)) - } -} - -impl CanCloneSender for TokioRuntimeContext { - fn clone_sender(sender: &Self::Sender) -> Self::Sender - where - T: Async, - { - sender.clone() - } -} diff --git a/crates/relayer-runtime/src/impls/runtime/components.rs b/crates/relayer-runtime/src/impls/runtime/components.rs index 26a251230..e57feab65 100644 --- a/crates/relayer-runtime/src/impls/runtime/components.rs +++ b/crates/relayer-runtime/src/impls/runtime/components.rs @@ -1,35 +1,19 @@ +use cgp_core::delegate_all; use cgp_core::prelude::*; -use ibc_test_components::runtime::traits::child_process::ChildProcessStarterComponent; -use ibc_test_components::runtime::traits::exec_command::CommandExecutorComponent; -use ibc_test_components::runtime::traits::read_file::FileAsStringReaderComponent; -use ibc_test_components::runtime::traits::reserve_port::TcpPortReserverComponent; -use ibc_test_components::runtime::traits::types::child_process::ChildProcessTypeComponent; -use ibc_test_components::runtime::traits::types::file_path::FilePathTypeComponent; -use ibc_test_components::runtime::traits::write_file::StringToFileWriterComponent; -use tokio_runtime_components::impls::child_process::StartTokioChildProcess; -use tokio_runtime_components::impls::exec_command::TokioExecCommand; -use tokio_runtime_components::impls::read_file::TokioReadFileAsString; -use tokio_runtime_components::impls::reserve_port::TokioReserveTcpPort; -use tokio_runtime_components::impls::types::child_process::ProvideTokioChildProcessType; -use tokio_runtime_components::impls::types::file_path::ProvideStdPathType; -use tokio_runtime_components::impls::write_file::TokioWriteStringToFile; +use tokio_runtime_components::components::parallel::{ + IsTokioParallelRuntimeComponent, TokioParallelRuntimeComponents, +}; use crate::types::runtime::TokioRuntimeContext; -pub struct TokioRuntimeComponents; +pub struct RelayerRuntimeComponents; impl HasComponents for TokioRuntimeContext { - type Components = TokioRuntimeComponents; + type Components = RelayerRuntimeComponents; } -delegate_components! { - TokioRuntimeComponents { - FilePathTypeComponent: ProvideStdPathType, - ChildProcessTypeComponent: ProvideTokioChildProcessType, - ChildProcessStarterComponent: StartTokioChildProcess, - FileAsStringReaderComponent: TokioReadFileAsString, - CommandExecutorComponent: TokioExecCommand, - StringToFileWriterComponent: TokioWriteStringToFile, - TcpPortReserverComponent: TokioReserveTcpPort, - } -} +delegate_all!( + IsTokioParallelRuntimeComponent, + TokioParallelRuntimeComponents, + RelayerRuntimeComponents, +); diff --git a/crates/relayer-runtime/src/impls/runtime/error.rs b/crates/relayer-runtime/src/impls/runtime/error.rs index d40ca328a..f4b74bd77 100644 --- a/crates/relayer-runtime/src/impls/runtime/error.rs +++ b/crates/relayer-runtime/src/impls/runtime/error.rs @@ -1,19 +1,20 @@ use alloc::sync::Arc; +use async_runtime_components::channel::types::ChannelClosedError; use cgp_core::{ErrorRaiser, ProvideErrorType}; use core::str::Utf8Error; use std::io::Error as IoError; use tokio_runtime_components::impls::child_process::PrematureChildProcessExitError; use tokio_runtime_components::impls::exec_command::ExecCommandFailure; -use crate::impls::runtime::components::TokioRuntimeComponents; +use crate::impls::runtime::components::RelayerRuntimeComponents; use crate::types::error::TokioRuntimeError; use crate::types::runtime::TokioRuntimeContext; -impl ProvideErrorType for TokioRuntimeComponents { +impl ProvideErrorType for RelayerRuntimeComponents { type Error = TokioRuntimeError; } -impl ErrorRaiser for TokioRuntimeComponents { +impl ErrorRaiser for RelayerRuntimeComponents { fn raise_error(e: PrematureChildProcessExitError) -> TokioRuntimeError { TokioRuntimeError::PrematureChildProcessExit { exit_status: e.exit_status, @@ -22,19 +23,25 @@ impl ErrorRaiser for TokioR } } -impl ErrorRaiser for TokioRuntimeComponents { +impl ErrorRaiser for RelayerRuntimeComponents { fn raise_error(e: IoError) -> TokioRuntimeError { TokioRuntimeError::Io(Arc::new(e)) } } -impl ErrorRaiser for TokioRuntimeComponents { +impl ErrorRaiser for RelayerRuntimeComponents { fn raise_error(e: Utf8Error) -> TokioRuntimeError { TokioRuntimeError::Utf8(e) } } -impl ErrorRaiser for TokioRuntimeComponents { +impl ErrorRaiser for RelayerRuntimeComponents { + fn raise_error(_e: ChannelClosedError) -> TokioRuntimeError { + TokioRuntimeError::ChannelClosed + } +} + +impl ErrorRaiser for RelayerRuntimeComponents { fn raise_error(e: ExecCommandFailure) -> TokioRuntimeError { TokioRuntimeError::ExecCommandFailure { command: e.command, diff --git a/crates/relayer-runtime/src/impls/runtime/mod.rs b/crates/relayer-runtime/src/impls/runtime/mod.rs index f5687b584..c45ce2f42 100644 --- a/crates/relayer-runtime/src/impls/runtime/mod.rs +++ b/crates/relayer-runtime/src/impls/runtime/mod.rs @@ -1,10 +1,3 @@ -pub mod channel; pub mod components; pub mod error; -pub mod mutex; -pub mod sleep; -pub mod spawn; -pub mod stream; -pub mod subscription; -pub mod task; -pub mod time; +pub mod tokio; diff --git a/crates/relayer-runtime/src/impls/runtime/sleep.rs b/crates/relayer-runtime/src/impls/runtime/sleep.rs deleted file mode 100644 index a97791317..000000000 --- a/crates/relayer-runtime/src/impls/runtime/sleep.rs +++ /dev/null @@ -1,14 +0,0 @@ -use core::time::Duration; - -use async_trait::async_trait; -use ibc_relayer_components::runtime::traits::sleep::CanSleep; -use tokio::time::sleep; - -use crate::types::runtime::TokioRuntimeContext; - -#[async_trait] -impl CanSleep for TokioRuntimeContext { - async fn sleep(&self, duration: Duration) { - sleep(duration).await; - } -} diff --git a/crates/relayer-runtime/src/impls/runtime/spawn.rs b/crates/relayer-runtime/src/impls/runtime/spawn.rs deleted file mode 100644 index 7a5e8e455..000000000 --- a/crates/relayer-runtime/src/impls/runtime/spawn.rs +++ /dev/null @@ -1,14 +0,0 @@ -use ibc_relayer_components_extra::runtime::traits::spawn::CanSpawnTask; - -use crate::types::runtime::TokioRuntimeContext; - -impl CanSpawnTask for TokioRuntimeContext { - fn spawn_task(&self, task: T) - where - T: ibc_relayer_components::runtime::traits::task::Task, - { - self.runtime.spawn(async move { - task.run().await; - }); - } -} diff --git a/crates/relayer-runtime/src/impls/runtime/stream.rs b/crates/relayer-runtime/src/impls/runtime/stream.rs deleted file mode 100644 index 7ba49a52d..000000000 --- a/crates/relayer-runtime/src/impls/runtime/stream.rs +++ /dev/null @@ -1,46 +0,0 @@ -use core::pin::Pin; - -use cgp_core::Async; -use futures::stream::Stream; -use futures::StreamExt; -use ibc_relayer_components::runtime::traits::stream::{CanMapStream, HasStreamType}; -use ibc_relayer_subscription::traits::stream::HasAsyncStreamType; - -use crate::types::runtime::TokioRuntimeContext; - -impl HasStreamType for TokioRuntimeContext { - type Stream = Pin + Send + Sync + 'static>>; -} - -impl CanMapStream for TokioRuntimeContext { - fn map_stream(stream: Self::Stream, mapper: M) -> Self::Stream - where - T: Async, - U: Async, - M: Fn(T) -> U + Async, - { - let mapped = stream.map(mapper); - - Box::pin(mapped) - } -} - -impl HasAsyncStreamType for TokioRuntimeContext { - fn from_async_stream( - stream: Pin + Send + Sync + 'static>>, - ) -> Self::Stream - where - T: Async, - { - stream - } - - fn to_async_stream( - stream: Self::Stream, - ) -> Pin + Send + Sync + 'static>> - where - T: Async, - { - stream - } -} diff --git a/crates/relayer-runtime/src/impls/runtime/subscription.rs b/crates/relayer-runtime/src/impls/runtime/subscription.rs deleted file mode 100644 index bde859dc9..000000000 --- a/crates/relayer-runtime/src/impls/runtime/subscription.rs +++ /dev/null @@ -1,24 +0,0 @@ -use alloc::sync::Arc; -use core::pin::Pin; - -use async_trait::async_trait; -use cgp_core::Async; -use futures::stream::Stream; -use ibc_relayer_components::runtime::traits::subscription::HasSubscriptionType; -use ibc_relayer_subscription::traits::subscription::Subscription; - -use crate::types::runtime::TokioRuntimeContext; - -#[async_trait] -impl HasSubscriptionType for TokioRuntimeContext { - type Subscription = Arc>; - - async fn subscribe( - subscription: &Self::Subscription, - ) -> Option + Send + Sync + 'static>>> - where - T: Async, - { - subscription.subscribe().await - } -} diff --git a/crates/relayer-runtime/src/impls/runtime/task.rs b/crates/relayer-runtime/src/impls/runtime/task.rs deleted file mode 100644 index 9393c5c75..000000000 --- a/crates/relayer-runtime/src/impls/runtime/task.rs +++ /dev/null @@ -1,27 +0,0 @@ -use core::pin::Pin; - -use async_trait::async_trait; -use futures::stream::Stream; -use ibc_relayer_components::runtime::traits::task::{CanRunConcurrentTasks, Task}; - -use crate::impls::task::parallel::{run_parallel_task_stream, run_parallel_tasks}; -use crate::types::runtime::TokioRuntimeContext; - -#[async_trait] -impl CanRunConcurrentTasks for TokioRuntimeContext { - async fn run_concurrent_tasks(&self, tasks: Vec) - where - T: Task, - { - run_parallel_tasks(tasks).await - } - - async fn run_concurrent_task_stream( - &self, - tasks: Pin + Send + Sync + 'static>>, - ) where - T: Task, - { - run_parallel_task_stream(tasks).await - } -} diff --git a/crates/relayer-runtime/src/impls/runtime/time.rs b/crates/relayer-runtime/src/impls/runtime/time.rs deleted file mode 100644 index ad5bbdb0c..000000000 --- a/crates/relayer-runtime/src/impls/runtime/time.rs +++ /dev/null @@ -1,18 +0,0 @@ -use core::time::Duration; -use std::time::Instant; - -use ibc_relayer_components::runtime::traits::time::HasTime; - -use crate::types::runtime::TokioRuntimeContext; - -impl HasTime for TokioRuntimeContext { - type Time = Instant; - - fn now(&self) -> Instant { - Instant::now() - } - - fn duration_since(time: &Instant, other: &Instant) -> Duration { - time.duration_since(*other) - } -} diff --git a/crates/relayer-runtime/src/impls/runtime/tokio.rs b/crates/relayer-runtime/src/impls/runtime/tokio.rs new file mode 100644 index 000000000..41fcb8dee --- /dev/null +++ b/crates/relayer-runtime/src/impls/runtime/tokio.rs @@ -0,0 +1,10 @@ +use tokio::runtime::Runtime; +use tokio_runtime_components::traits::runtime::HasTokioRuntime; + +use crate::types::runtime::TokioRuntimeContext; + +impl HasTokioRuntime for TokioRuntimeContext { + fn tokio_runtime(&self) -> &Runtime { + &self.runtime + } +} diff --git a/crates/relayer-runtime/src/impls/task/concurrent.rs b/crates/relayer-runtime/src/impls/task/concurrent.rs deleted file mode 100644 index 522c36b85..000000000 --- a/crates/relayer-runtime/src/impls/task/concurrent.rs +++ /dev/null @@ -1,12 +0,0 @@ -use futures::stream::Stream; -use futures::StreamExt; -use ibc_relayer_components::runtime::traits::task::Task; - -pub async fn run_concurrent_tasks(tasks: impl Stream) -where - T: Task, -{ - tasks - .for_each_concurrent(None, |task| Box::pin(async move { task.run().await })) - .await; -} diff --git a/crates/relayer-subscription/src/lib.rs b/crates/relayer-subscription/src/lib.rs deleted file mode 100644 index 1593dce82..000000000 --- a/crates/relayer-subscription/src/lib.rs +++ /dev/null @@ -1,8 +0,0 @@ -#![no_std] -#![allow(clippy::type_complexity)] - -mod std_prelude; -extern crate alloc; - -pub mod impls; -pub mod traits; diff --git a/crates/relayer-subscription/src/std_prelude.rs b/crates/relayer-subscription/src/std_prelude.rs deleted file mode 100644 index 2065a3d60..000000000 --- a/crates/relayer-subscription/src/std_prelude.rs +++ /dev/null @@ -1,11 +0,0 @@ -// Re-export according to alloc::prelude::v1 because it is not yet stabilized -// https://doc.rust-lang.org/src/alloc/prelude/v1.rs.html -pub use alloc::borrow::ToOwned; -pub use alloc::boxed::Box; -pub use alloc::string::{String, ToString}; -pub use alloc::vec::Vec; -pub use alloc::{format, vec}; -// Those are exported by default in the std prelude in Rust 2021 -pub use core::convert::{TryFrom, TryInto}; -pub use core::iter::FromIterator; -pub use core::prelude::v1::*; diff --git a/crates/tokio-runtime-components/Cargo.toml b/crates/tokio-runtime-components/Cargo.toml index 4cbe74c29..17bfde50b 100644 --- a/crates/tokio-runtime-components/Cargo.toml +++ b/crates/tokio-runtime-components/Cargo.toml @@ -16,9 +16,11 @@ description = """ ibc-relayer-components = { workspace = true } ibc-relayer-components-extra = { workspace = true } ibc-test-components = { workspace = true } +async-runtime-components = { workspace = true } cgp-core = { workspace = true } tokio = { workspace = true, features = ["full"] } futures = { workspace = true } -rand = { version = "0.8.5" } \ No newline at end of file +tokio-stream = { version = "0.1" } +rand = { version = "0.8.5" } \ No newline at end of file diff --git a/crates/tokio-runtime-components/src/components/concurrent.rs b/crates/tokio-runtime-components/src/components/concurrent.rs new file mode 100644 index 000000000..9f90903a4 --- /dev/null +++ b/crates/tokio-runtime-components/src/components/concurrent.rs @@ -0,0 +1,60 @@ +use async_runtime_components::task::impls::concurrent::RunConcurrentTasks; +use cgp_core::prelude::*; +use ibc_relayer_components::runtime::traits::mutex::MutexComponent; +use ibc_relayer_components::runtime::traits::sleep::SleeperComponent; +use ibc_relayer_components::runtime::traits::stream::{StreamMapperComponent, StreamTypeComponent}; +use ibc_relayer_components::runtime::traits::subscription::SubscriptionComponent; +use ibc_relayer_components::runtime::traits::task::ConcurrentTaskRunnerComponent; +use ibc_relayer_components::runtime::traits::time::TimeComponent; +use ibc_relayer_components_extra::runtime::traits::channel::{ + ChannelCreatorComponent, ChannelTypeComponent, ChannelUserComponent, ReceiverStreamerComponent, + SenderClonerComponent, +}; +use ibc_relayer_components_extra::runtime::traits::channel_once::{ + ChannelOnceCreatorComponent, ChannelOnceTypeComponent, ChannelOnceUserComponent, +}; +use ibc_relayer_components_extra::runtime::traits::spawn::TaskSpawnerComponent; +use ibc_test_components::runtime::traits::child_process::ChildProcessStarterComponent; +use ibc_test_components::runtime::traits::exec_command::CommandExecutorComponent; +use ibc_test_components::runtime::traits::read_file::FileAsStringReaderComponent; +use ibc_test_components::runtime::traits::reserve_port::TcpPortReserverComponent; +use ibc_test_components::runtime::traits::types::child_process::ChildProcessTypeComponent; +use ibc_test_components::runtime::traits::types::file_path::FilePathTypeComponent; +use ibc_test_components::runtime::traits::write_file::StringToFileWriterComponent; + +use crate::components::parallel::TokioParallelRuntimeComponents; + +pub struct TokioConcurrentRuntimeComponents; + +delegate_components! { + #[mark_component(IsTokioConcurrentRuntimeComponent)] + #[mark_delegate(DelegatesToTokioConcurrentRuntimeComponents)] + TokioConcurrentRuntimeComponents { + ConcurrentTaskRunnerComponent: RunConcurrentTasks, + [ + SleeperComponent, + TimeComponent, + MutexComponent, + StreamTypeComponent, + StreamMapperComponent, + SubscriptionComponent, + TaskSpawnerComponent, + ChannelTypeComponent, + ChannelCreatorComponent, + ChannelUserComponent, + ChannelOnceTypeComponent, + ChannelOnceCreatorComponent, + ChannelOnceUserComponent, + ReceiverStreamerComponent, + SenderClonerComponent, + FilePathTypeComponent, + ChildProcessTypeComponent, + ChildProcessStarterComponent, + FileAsStringReaderComponent, + CommandExecutorComponent, + StringToFileWriterComponent, + TcpPortReserverComponent, + ]: + TokioParallelRuntimeComponents, + } +} diff --git a/crates/relayer-runtime/src/impls/task/mod.rs b/crates/tokio-runtime-components/src/components/mod.rs similarity index 100% rename from crates/relayer-runtime/src/impls/task/mod.rs rename to crates/tokio-runtime-components/src/components/mod.rs diff --git a/crates/tokio-runtime-components/src/components/parallel.rs b/crates/tokio-runtime-components/src/components/parallel.rs new file mode 100644 index 000000000..aeea9eb31 --- /dev/null +++ b/crates/tokio-runtime-components/src/components/parallel.rs @@ -0,0 +1,77 @@ +use async_runtime_components::channel::impls::ProvideUnboundedChannelType; +use async_runtime_components::channel_once::impls::ProvideOneShotChannelType; +use async_runtime_components::mutex::impls::mutex::ProvideFuturesMutex; +use async_runtime_components::stream::impls::boxed::ProvideBoxedStreamType; +use async_runtime_components::stream::impls::map::BoxedStreamMapper; +use async_runtime_components::subscription::impls::subscription::ProvideBoxedSubscription; +use cgp_core::prelude::*; +use ibc_relayer_components::runtime::traits::mutex::MutexComponent; +use ibc_relayer_components::runtime::traits::sleep::SleeperComponent; +use ibc_relayer_components::runtime::traits::stream::{StreamMapperComponent, StreamTypeComponent}; +use ibc_relayer_components::runtime::traits::subscription::SubscriptionComponent; +use ibc_relayer_components::runtime::traits::task::ConcurrentTaskRunnerComponent; +use ibc_relayer_components::runtime::traits::time::TimeComponent; +use ibc_relayer_components_extra::runtime::traits::channel::{ + ChannelCreatorComponent, ChannelTypeComponent, ChannelUserComponent, ReceiverStreamerComponent, + SenderClonerComponent, +}; +use ibc_relayer_components_extra::runtime::traits::channel_once::{ + ChannelOnceCreatorComponent, ChannelOnceTypeComponent, ChannelOnceUserComponent, +}; +use ibc_relayer_components_extra::runtime::traits::spawn::TaskSpawnerComponent; +use ibc_test_components::runtime::traits::child_process::ChildProcessStarterComponent; +use ibc_test_components::runtime::traits::exec_command::CommandExecutorComponent; +use ibc_test_components::runtime::traits::read_file::FileAsStringReaderComponent; +use ibc_test_components::runtime::traits::reserve_port::TcpPortReserverComponent; +use ibc_test_components::runtime::traits::types::child_process::ChildProcessTypeComponent; +use ibc_test_components::runtime::traits::types::file_path::FilePathTypeComponent; +use ibc_test_components::runtime::traits::write_file::StringToFileWriterComponent; + +use crate::impls::child_process::StartTokioChildProcess; +use crate::impls::exec_command::TokioExecCommand; +use crate::impls::parallel_task::TokioRunParallelTasks; +use crate::impls::read_file::TokioReadFileAsString; +use crate::impls::reserve_port::TokioReserveTcpPort; +use crate::impls::sleep::TokioSleep; +use crate::impls::spawn::TokioSpawnTask; +use crate::impls::time::ProvideStdTime; +use crate::impls::types::child_process::ProvideTokioChildProcessType; +use crate::impls::types::file_path::ProvideStdPathType; +use crate::impls::write_file::TokioWriteStringToFile; + +pub struct TokioParallelRuntimeComponents; + +delegate_components! { + #[mark_component(IsTokioParallelRuntimeComponent)] + #[mark_delegate(DelegatesToTokioParallelRuntimeComponents)] + TokioParallelRuntimeComponents { + SleeperComponent: TokioSleep, + TimeComponent: ProvideStdTime, + MutexComponent: ProvideFuturesMutex, + StreamTypeComponent: ProvideBoxedStreamType, + StreamMapperComponent: BoxedStreamMapper, + SubscriptionComponent: ProvideBoxedSubscription, + ConcurrentTaskRunnerComponent: TokioRunParallelTasks, + TaskSpawnerComponent: TokioSpawnTask, + [ + ChannelTypeComponent, + ChannelCreatorComponent, + ChannelUserComponent, + ReceiverStreamerComponent, + SenderClonerComponent, + ]: ProvideUnboundedChannelType, + [ + ChannelOnceTypeComponent, + ChannelOnceCreatorComponent, + ChannelOnceUserComponent, + ]: + ProvideOneShotChannelType, + FilePathTypeComponent: ProvideStdPathType, + ChildProcessTypeComponent: ProvideTokioChildProcessType, + ChildProcessStarterComponent: StartTokioChildProcess, + FileAsStringReaderComponent: TokioReadFileAsString, + CommandExecutorComponent: TokioExecCommand, + StringToFileWriterComponent: TokioWriteStringToFile, + TcpPortReserverComponent: TokioReserveTcpPort, + } +} diff --git a/crates/tokio-runtime-components/src/impls/channel.rs b/crates/tokio-runtime-components/src/impls/channel.rs new file mode 100644 index 000000000..d8bf389d3 --- /dev/null +++ b/crates/tokio-runtime-components/src/impls/channel.rs @@ -0,0 +1,150 @@ +use async_runtime_components::channel::types::ChannelClosedError; +use async_runtime_components::stream::traits::boxed::HasBoxedStreamType; +use cgp_core::prelude::*; +use cgp_core::CanRaiseError; +use ibc_relayer_components_extra::runtime::traits::channel::ReceiverStreamer; +use ibc_relayer_components_extra::runtime::traits::channel::SenderCloner; +use ibc_relayer_components_extra::runtime::traits::channel::{ + ChannelCreator, ChannelUser, ProvideChannelType, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use crate::traits::channel::{HasUnboundedChannelType, UnboundedChannelTypeProvider}; + +pub struct ProvideUnboundedChannelType; + +impl ProvideChannelType for ProvideUnboundedChannelType +where + Runtime: Async, +{ + type Sender = mpsc::UnboundedSender + where + T: Async; + + type Receiver = mpsc::UnboundedReceiver + where + T: Async; +} + +impl UnboundedChannelTypeProvider for ProvideUnboundedChannelType +where + Runtime: Async, +{ + fn from_unbounded_sender(sender: mpsc::UnboundedSender) -> Self::Sender + where + T: Async, + { + sender + } + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async, + { + receiver + } + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async, + { + receiver + } + fn to_unbounded_sender_ref(sender: &Self::Sender) -> &mpsc::UnboundedSender + where + T: Async, + { + sender + } + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async, + { + receiver + } +} + +impl ChannelCreator for ProvideUnboundedChannelType +where + Runtime: HasUnboundedChannelType, +{ + fn new_channel() -> (Runtime::Sender, Runtime::Receiver) + where + T: Async, + { + let (sender, receiver) = mpsc::unbounded_channel(); + + ( + Runtime::from_unbounded_sender(sender), + Runtime::from_unbounded_receiver(receiver), + ) + } +} + +#[async_trait] +impl ChannelUser for ProvideUnboundedChannelType +where + Runtime: HasUnboundedChannelType + CanRaiseError, +{ + async fn send(sender: &Runtime::Sender, value: T) -> Result<(), Runtime::Error> + where + T: Async, + { + Runtime::to_unbounded_sender_ref(sender) + .send(value) + .map_err(|_| Runtime::raise_error(ChannelClosedError)) + } + + async fn receive(receiver: &mut Runtime::Receiver) -> Result + where + T: Async, + { + Runtime::to_unbounded_receiver_ref(receiver) + .recv() + .await + .ok_or(Runtime::raise_error(ChannelClosedError)) + } + + fn try_receive(receiver: &mut Runtime::Receiver) -> Result, Runtime::Error> + where + T: Async, + { + match Runtime::to_unbounded_receiver_ref(receiver).try_recv() { + Ok(batch) => Ok(Some(batch)), + Err(mpsc::error::TryRecvError::Empty) => Ok(None), + Err(mpsc::error::TryRecvError::Disconnected) => { + Err(Runtime::raise_error(ChannelClosedError)) + } + } + } +} + +impl ReceiverStreamer for ProvideUnboundedChannelType +where + Runtime: HasUnboundedChannelType + HasBoxedStreamType, +{ + fn receiver_to_stream(receiver: Runtime::Receiver) -> Runtime::Stream + where + T: Async, + { + Runtime::from_boxed_stream(Box::pin(UnboundedReceiverStream::new( + Runtime::to_unbounded_receiver(receiver), + ))) + } +} + +impl SenderCloner for ProvideUnboundedChannelType +where + Runtime: HasUnboundedChannelType, +{ + fn clone_sender(sender: &Runtime::Sender) -> Runtime::Sender + where + T: Async, + { + Runtime::from_unbounded_sender(Runtime::to_unbounded_sender_ref(sender).clone()) + } +} diff --git a/crates/tokio-runtime-components/src/impls/mod.rs b/crates/tokio-runtime-components/src/impls/mod.rs index 1925237e0..8c5ac70a5 100644 --- a/crates/tokio-runtime-components/src/impls/mod.rs +++ b/crates/tokio-runtime-components/src/impls/mod.rs @@ -1,6 +1,11 @@ +pub mod channel; pub mod child_process; pub mod exec_command; +pub mod parallel_task; pub mod read_file; pub mod reserve_port; +pub mod sleep; +pub mod spawn; +pub mod time; pub mod types; pub mod write_file; diff --git a/crates/relayer-runtime/src/impls/task/parallel.rs b/crates/tokio-runtime-components/src/impls/parallel_task.rs similarity index 62% rename from crates/relayer-runtime/src/impls/task/parallel.rs rename to crates/tokio-runtime-components/src/impls/parallel_task.rs index 77c477faa..5fc2ee1d1 100644 --- a/crates/relayer-runtime/src/impls/task/parallel.rs +++ b/crates/tokio-runtime-components/src/impls/parallel_task.rs @@ -1,10 +1,34 @@ use core::task::{Context, Poll}; +use async_runtime_components::stream::traits::boxed::HasBoxedStreamType; +use cgp_core::prelude::*; use futures::stream::{Stream, StreamExt}; use futures::task::noop_waker; -use ibc_relayer_components::runtime::traits::task::Task; +use ibc_relayer_components::runtime::traits::task::{ConcurrentTaskRunner, Task}; use tokio::task::JoinSet; +pub struct TokioRunParallelTasks; + +#[async_trait] +impl ConcurrentTaskRunner for TokioRunParallelTasks +where + Runtime: HasBoxedStreamType, +{ + async fn run_concurrent_tasks(_runtime: &Runtime, tasks: Vec) + where + T: Task, + { + run_parallel_tasks(tasks).await + } + + async fn run_concurrent_task_stream(_runtime: &Runtime, tasks: Runtime::Stream) + where + T: Task, + { + run_parallel_task_stream(Runtime::to_boxed_stream(tasks)).await + } +} + pub async fn run_parallel_tasks(tasks: Vec) where T: Task, diff --git a/crates/tokio-runtime-components/src/impls/sleep.rs b/crates/tokio-runtime-components/src/impls/sleep.rs new file mode 100644 index 000000000..27f65be22 --- /dev/null +++ b/crates/tokio-runtime-components/src/impls/sleep.rs @@ -0,0 +1,17 @@ +use core::time::Duration; + +use cgp_core::prelude::*; +use ibc_relayer_components::runtime::traits::sleep::Sleeper; +use tokio::time::sleep; + +pub struct TokioSleep; + +#[async_trait] +impl Sleeper for TokioSleep +where + Runtime: Async, +{ + async fn sleep(_runtime: &Runtime, duration: Duration) { + sleep(duration).await; + } +} diff --git a/crates/tokio-runtime-components/src/impls/spawn.rs b/crates/tokio-runtime-components/src/impls/spawn.rs new file mode 100644 index 000000000..dfe5fe6a1 --- /dev/null +++ b/crates/tokio-runtime-components/src/impls/spawn.rs @@ -0,0 +1,20 @@ +use ibc_relayer_components::runtime::traits::task::Task; +use ibc_relayer_components_extra::runtime::traits::spawn::TaskSpawner; + +use crate::traits::runtime::HasTokioRuntime; + +pub struct TokioSpawnTask; + +impl TaskSpawner for TokioSpawnTask +where + Runtime: HasTokioRuntime, +{ + fn spawn_task(runtime: &Runtime, task: T) + where + T: Task, + { + runtime.tokio_runtime().spawn(async move { + task.run().await; + }); + } +} diff --git a/crates/tokio-runtime-components/src/impls/time.rs b/crates/tokio-runtime-components/src/impls/time.rs new file mode 100644 index 000000000..f10a5c2a7 --- /dev/null +++ b/crates/tokio-runtime-components/src/impls/time.rs @@ -0,0 +1,22 @@ +use core::time::Duration; +use std::time::Instant; + +use cgp_core::prelude::Async; +use ibc_relayer_components::runtime::traits::time::ProvideTime; + +pub struct ProvideStdTime; + +impl ProvideTime for ProvideStdTime +where + Runtime: Async, +{ + type Time = Instant; + + fn now(_runtime: &Runtime) -> Instant { + Instant::now() + } + + fn duration_since(time: &Instant, other: &Instant) -> Duration { + time.duration_since(*other) + } +} diff --git a/crates/tokio-runtime-components/src/lib.rs b/crates/tokio-runtime-components/src/lib.rs index fb880c018..3c8cbfdc5 100644 --- a/crates/tokio-runtime-components/src/lib.rs +++ b/crates/tokio-runtime-components/src/lib.rs @@ -1,2 +1,4 @@ +pub mod components; pub mod impls; +pub mod traits; pub mod types; diff --git a/crates/tokio-runtime-components/src/traits/channel.rs b/crates/tokio-runtime-components/src/traits/channel.rs new file mode 100644 index 000000000..43a9cb310 --- /dev/null +++ b/crates/tokio-runtime-components/src/traits/channel.rs @@ -0,0 +1,143 @@ +use cgp_core::prelude::*; +use ibc_relayer_components_extra::runtime::traits::channel::{ + ChannelTypeComponent, HasChannelTypes, ProvideChannelType, +}; +use tokio::sync::mpsc; + +pub trait HasUnboundedChannelType: HasChannelTypes { + fn from_unbounded_sender(sender: mpsc::UnboundedSender) -> Self::Sender + where + T: Async; + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async; + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async; + + fn to_unbounded_sender_ref(sender: &Self::Sender) -> &mpsc::UnboundedSender + where + T: Async; + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async; +} + +pub trait UnboundedChannelTypeProvider: ProvideChannelType +where + Runtime: Async, +{ + fn from_unbounded_sender(sender: mpsc::UnboundedSender) -> Self::Sender + where + T: Async; + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async; + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async; + + fn to_unbounded_sender_ref(sender: &Self::Sender) -> &mpsc::UnboundedSender + where + T: Async; + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async; +} + +impl HasUnboundedChannelType for Runtime +where + Runtime: HasComponents, + Components: UnboundedChannelTypeProvider + ProvideChannelType, +{ + fn from_unbounded_sender(sender: mpsc::UnboundedSender) -> Self::Sender + where + T: Async, + { + Components::from_unbounded_sender(sender) + } + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async, + { + Components::from_unbounded_receiver(receiver) + } + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async, + { + Components::to_unbounded_receiver(receiver) + } + + fn to_unbounded_sender_ref(sender: &Self::Sender) -> &mpsc::UnboundedSender + where + T: Async, + { + Components::to_unbounded_sender_ref(sender) + } + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async, + { + Components::to_unbounded_receiver_ref(receiver) + } +} + +impl UnboundedChannelTypeProvider for Component +where + Runtime: Async, + Component: DelegateComponent, + Delegate: UnboundedChannelTypeProvider, +{ + fn from_unbounded_sender(sender: mpsc::UnboundedSender) -> Self::Sender + where + T: Async, + { + Delegate::from_unbounded_sender(sender) + } + + fn from_unbounded_receiver(receiver: mpsc::UnboundedReceiver) -> Self::Receiver + where + T: Async, + { + Delegate::from_unbounded_receiver(receiver) + } + + fn to_unbounded_receiver(receiver: Self::Receiver) -> mpsc::UnboundedReceiver + where + T: Async, + { + Delegate::to_unbounded_receiver(receiver) + } + + fn to_unbounded_sender_ref(sender: &Self::Sender) -> &mpsc::UnboundedSender + where + T: Async, + { + Delegate::to_unbounded_sender_ref(sender) + } + + fn to_unbounded_receiver_ref( + receiver: &mut Self::Receiver, + ) -> &mut mpsc::UnboundedReceiver + where + T: Async, + { + Delegate::to_unbounded_receiver_ref(receiver) + } +} diff --git a/crates/tokio-runtime-components/src/traits/mod.rs b/crates/tokio-runtime-components/src/traits/mod.rs new file mode 100644 index 000000000..cf28a2c88 --- /dev/null +++ b/crates/tokio-runtime-components/src/traits/mod.rs @@ -0,0 +1,2 @@ +pub mod channel; +pub mod runtime; diff --git a/crates/tokio-runtime-components/src/traits/runtime.rs b/crates/tokio-runtime-components/src/traits/runtime.rs new file mode 100644 index 000000000..4f0924f27 --- /dev/null +++ b/crates/tokio-runtime-components/src/traits/runtime.rs @@ -0,0 +1,6 @@ +use cgp_core::Async; +use tokio::runtime::Runtime; + +pub trait HasTokioRuntime: Async { + fn tokio_runtime(&self) -> &Runtime; +}