From 75e0b3005d19776199dc9cdbeeff430d4eeec254 Mon Sep 17 00:00:00 2001 From: crStiv Date: Sun, 19 Jan 2025 23:34:44 +0100 Subject: [PATCH 1/4] Update try_op.rs --- rig-core/src/pipeline/try_op.rs | 88 +++++++++++++++++++++++---------- 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/rig-core/src/pipeline/try_op.rs b/rig-core/src/pipeline/try_op.rs index 6e37af9a..312bc27f 100644 --- a/rig-core/src/pipeline/try_op.rs +++ b/rig-core/src/pipeline/try_op.rs @@ -345,33 +345,36 @@ where } } -// TODO: Implement TryParallel -// pub struct TryParallel { -// op1: Op1, -// op2: Op2, -// } - -// impl TryParallel { -// pub fn new(op1: Op1, op2: Op2) -> Self { -// Self { op1, op2 } -// } -// } - -// impl TryOp for TryParallel -// where -// Op1: TryOp, -// Op2: TryOp, -// { -// type Input = Op1::Input; -// type Output = (Op1::Output, Op2::Output); -// type Error = Op1::Error; - -// #[inline] -// async fn try_call(&self, input: Self::Input) -> Result { -// let (output1, output2) = tokio::join!(self.op1.try_call(input.clone()), self.op2.try_call(input)); -// Ok((output1?, output2?)) -// } -// } +pub struct TryParallel { + op1: Op1, + op2: Op2, +} + +impl TryParallel { + pub fn new(op1: Op1, op2: Op2) -> Self { + Self { op1, op2 } + } +} + +impl TryOp for TryParallel +where + Op1: TryOp, + Op1::Input: Clone, + Op2: TryOp, +{ + type Input = Op1::Input; + type Output = (Op1::Output, Op2::Output); + type Error = Op1::Error; + + #[inline] + async fn try_call(&self, input: Self::Input) -> Result { + use futures::try_join; + try_join!( + self.op1.try_call(input.clone()), + self.op2.try_call(input) + ) + } +} #[cfg(test)] mod tests { @@ -472,4 +475,35 @@ mod tests { let result = pipeline.try_call(1).await.unwrap(); assert_eq!(result, 15); } + + #[tokio::test] + async fn test_try_parallel() { + let op1 = map(|x: i32| if x % 2 == 0 { Ok(x + 1) } else { Err("x is odd") }); + let op2 = map(|x: i32| if x % 2 == 0 { Ok(x * 2) } else { Err("x is odd") }); + + let pipeline = TryParallel::new(op1, op2); + + // Test success case + let result = pipeline.try_call(2).await; + assert_eq!(result, Ok((3, 4))); + + // Test error case + let result = pipeline.try_call(1).await; + assert_eq!(result, Err("x is odd")); + } + + #[tokio::test] + async fn test_try_parallel_nested() { + let op1 = map(|x: i32| Ok::<_, &str>(x + 1)); + let op2 = map(|x: i32| Ok::<_, &str>(x * 2)); + let op3 = map(|x: i32| Ok::<_, &str>(x * 3)); + + let pipeline = TryParallel::new( + TryParallel::new(op1, op2), + op3 + ); + + let result = pipeline.try_call(2).await; + assert_eq!(result, Ok(((3, 4), 6))); + } } From a3deab2c37c86976f10e3450313d1031ce29186a Mon Sep 17 00:00:00 2001 From: crStiv Date: Sat, 1 Feb 2025 13:21:59 +0100 Subject: [PATCH 2/4] Update try_op.rs --- rig-core/src/pipeline/try_op.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/rig-core/src/pipeline/try_op.rs b/rig-core/src/pipeline/try_op.rs index 312bc27f..ec51d1ea 100644 --- a/rig-core/src/pipeline/try_op.rs +++ b/rig-core/src/pipeline/try_op.rs @@ -480,14 +480,11 @@ mod tests { async fn test_try_parallel() { let op1 = map(|x: i32| if x % 2 == 0 { Ok(x + 1) } else { Err("x is odd") }); let op2 = map(|x: i32| if x % 2 == 0 { Ok(x * 2) } else { Err("x is odd") }); - let pipeline = TryParallel::new(op1, op2); - - // Test success case + let result = pipeline.try_call(2).await; assert_eq!(result, Ok((3, 4))); - - // Test error case + let result = pipeline.try_call(1).await; assert_eq!(result, Err("x is odd")); } @@ -497,12 +494,8 @@ mod tests { let op1 = map(|x: i32| Ok::<_, &str>(x + 1)); let op2 = map(|x: i32| Ok::<_, &str>(x * 2)); let op3 = map(|x: i32| Ok::<_, &str>(x * 3)); - - let pipeline = TryParallel::new( - TryParallel::new(op1, op2), - op3 - ); - + let pipeline = TryParallel::new(TryParallel::new(op1, op2), op3); + let result = pipeline.try_call(2).await; assert_eq!(result, Ok(((3, 4), 6))); } From af884299aaeeda0721f891334224325516816a45 Mon Sep 17 00:00:00 2001 From: crStiv Date: Tue, 11 Feb 2025 02:38:58 +0100 Subject: [PATCH 3/4] Update try_op.rs --- rig-core/src/pipeline/try_op.rs | 84 ++++++++++++++++----------------- 1 file changed, 41 insertions(+), 43 deletions(-) diff --git a/rig-core/src/pipeline/try_op.rs b/rig-core/src/pipeline/try_op.rs index ec51d1ea..a99fef1a 100644 --- a/rig-core/src/pipeline/try_op.rs +++ b/rig-core/src/pipeline/try_op.rs @@ -345,36 +345,33 @@ where } } -pub struct TryParallel { - op1: Op1, - op2: Op2, -} - -impl TryParallel { - pub fn new(op1: Op1, op2: Op2) -> Self { - Self { op1, op2 } - } -} - -impl TryOp for TryParallel -where - Op1: TryOp, - Op1::Input: Clone, - Op2: TryOp, -{ - type Input = Op1::Input; - type Output = (Op1::Output, Op2::Output); - type Error = Op1::Error; - - #[inline] - async fn try_call(&self, input: Self::Input) -> Result { - use futures::try_join; - try_join!( - self.op1.try_call(input.clone()), - self.op2.try_call(input) - ) - } -} +// TODO: Implement TryParallel +// pub struct TryParallel { +// op1: Op1, +// op2: Op2, +// } + +// impl TryParallel { +// pub fn new(op1: Op1, op2: Op2) -> Self { +// Self { op1, op2 } +// } +// } + +// impl TryOp for TryParallel +// where +// Op1: TryOp, +// Op2: TryOp, +// { +// type Input = Op1::Input; +// type Output = (Op1::Output, Op2::Output); +// type Error = Op1::Error; + +// #[inline] +// async fn try_call(&self, input: Self::Input) -> Result { +// let (output1, output2) = tokio::join!(self.op1.try_call(input.clone()), self.op2.try_call(input)); +// Ok((output1?, output2?)) +// } +// } #[cfg(test)] mod tests { @@ -478,8 +475,20 @@ mod tests { #[tokio::test] async fn test_try_parallel() { - let op1 = map(|x: i32| if x % 2 == 0 { Ok(x + 1) } else { Err("x is odd") }); - let op2 = map(|x: i32| if x % 2 == 0 { Ok(x * 2) } else { Err("x is odd") }); + let op1 = map(|x: i32| { + if x % 2 == 0 { + Ok(x + 1) + } else { + Err("x is odd") + } + }); + let op2 = map(|x: i32| { + if x % 2 == 0 { + Ok(x * 2) + } else { + Err("x is odd") + } + }); let pipeline = TryParallel::new(op1, op2); let result = pipeline.try_call(2).await; @@ -488,15 +497,4 @@ mod tests { let result = pipeline.try_call(1).await; assert_eq!(result, Err("x is odd")); } - - #[tokio::test] - async fn test_try_parallel_nested() { - let op1 = map(|x: i32| Ok::<_, &str>(x + 1)); - let op2 = map(|x: i32| Ok::<_, &str>(x * 2)); - let op3 = map(|x: i32| Ok::<_, &str>(x * 3)); - let pipeline = TryParallel::new(TryParallel::new(op1, op2), op3); - - let result = pipeline.try_call(2).await; - assert_eq!(result, Ok(((3, 4), 6))); - } } From 59540fbb901bab83e829f228889117bee6c737eb Mon Sep 17 00:00:00 2001 From: crStiv Date: Tue, 11 Feb 2025 02:39:57 +0100 Subject: [PATCH 4/4] Update try_op.rs --- rig-core/src/pipeline/try_op.rs | 56 +++++++++++++++++---------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/rig-core/src/pipeline/try_op.rs b/rig-core/src/pipeline/try_op.rs index a99fef1a..488a317f 100644 --- a/rig-core/src/pipeline/try_op.rs +++ b/rig-core/src/pipeline/try_op.rs @@ -345,33 +345,35 @@ where } } -// TODO: Implement TryParallel -// pub struct TryParallel { -// op1: Op1, -// op2: Op2, -// } - -// impl TryParallel { -// pub fn new(op1: Op1, op2: Op2) -> Self { -// Self { op1, op2 } -// } -// } - -// impl TryOp for TryParallel -// where -// Op1: TryOp, -// Op2: TryOp, -// { -// type Input = Op1::Input; -// type Output = (Op1::Output, Op2::Output); -// type Error = Op1::Error; - -// #[inline] -// async fn try_call(&self, input: Self::Input) -> Result { -// let (output1, output2) = tokio::join!(self.op1.try_call(input.clone()), self.op2.try_call(input)); -// Ok((output1?, output2?)) -// } -// } +// Implement TryParallel +pub struct TryParallel { + op1: Op1, + op2: Op2, +} + +impl TryParallel { + pub fn new(op1: Op1, op2: Op2) -> Self { + Self { op1, op2 } + } +} + +impl op::Op for TryParallel +where + Op1: TryOp, + Op2: TryOp, +{ + type Input = Op1::Input; + type Output = Result<(Op1::Output, Op2::Output), Op1::Error>; + + #[inline] + async fn try_call(&self, input: Self::Input) -> Result<(Op1::Output, Op2::Output), Op1::Error> { + use futures::try_join; + try_join!( + self.op1.try_call(input.clone()), + self.op2.try_call(input) + ) + } +} #[cfg(test)] mod tests {