Skip to content

Commit

Permalink
feat!: made ComponentError anyhow::Error
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Box<StdError...> may need to be converted into anyhow::Error
  • Loading branch information
jsoverson committed Sep 8, 2023
1 parent 1c93902 commit 534d209
Show file tree
Hide file tree
Showing 47 changed files with 1,452 additions and 408 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/components/wick-http-client/src/component.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;

use flow_component::{BoxFuture, Component, ComponentError, IntoComponentResult, RuntimeCallback};
use anyhow::anyhow;
use flow_component::{BoxFuture, Component, ComponentError, RuntimeCallback};
use futures::{Stream, StreamExt, TryStreamExt};
use reqwest::header::CONTENT_TYPE;
use reqwest::{ClientBuilder, Method, Request, RequestBuilder};
Expand Down Expand Up @@ -44,8 +45,7 @@ impl HttpClientComponent {
validate(&config, resolver)?;
let addr: UrlResource = resolver(config.resource())
.and_then(|r| r.try_resource())
.and_then(|r| r.try_url())
.into_component_error()?;
.and_then(|r| r.try_url())?;

let mut sig = ComponentSignature::new_named("wick/component/http");
sig.metadata.version = metadata.map(|v| v.version().to_owned());
Expand All @@ -56,7 +56,7 @@ impl HttpClientComponent {
.url()
.value()
.cloned()
.ok_or_else(|| ComponentError::message("Internal Error - Invalid resource"))?;
.ok_or_else(|| anyhow!("Internal Error - Invalid resource"))?;

let mut path_templates = HashMap::new();
for ops in config.operations() {
Expand Down
7 changes: 0 additions & 7 deletions crates/components/wick-http-client/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use flow_component::ComponentError;
use url::Url;

#[derive(thiserror::Error, Debug, PartialEq)]
Expand All @@ -19,9 +18,3 @@ pub enum Error {
#[error("Invalid baseurl: {0}")]
InvalidBaseUrl(Url),
}

impl From<Error> for ComponentError {
fn from(value: Error) -> Self {
ComponentError::new(value)
}
}
7 changes: 0 additions & 7 deletions crates/components/wick-sql/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use flow_component::ComponentError;
use wick_config::error::ManifestError;
use wick_packet::TypeWrapper;

Expand Down Expand Up @@ -93,12 +92,6 @@ pub enum Error {
NoRow,
}

impl From<Error> for ComponentError {
fn from(value: Error) -> Self {
ComponentError::new(value)
}
}

#[derive(thiserror::Error, Debug, Copy, Clone)]
pub enum ConversionError {
#[error("i8")]
Expand Down
37 changes: 0 additions & 37 deletions crates/wick/flow-component/src/config.rs

This file was deleted.

63 changes: 2 additions & 61 deletions crates/wick/flow-component/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@
// Add exceptions here
#![allow()]

// mod config;
// pub use config::*;

mod context;
pub use context::*;
#[cfg(feature = "invocation")]
Expand All @@ -128,65 +125,9 @@ pub use traits::*;
/// A boxed future that can be sent across threads.
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn futures::Future<Output = T> + Send + 'a>>;

pub use serde_json::Value;

#[derive(Debug)]
#[must_use]
/// A generic error type for components.
pub struct ComponentError {
source: Box<dyn std::error::Error + Send + Sync>,
}

impl std::error::Error for ComponentError {}
impl std::fmt::Display for ComponentError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.source.to_string().as_str())
}
}
impl ComponentError {
/// Create a new error from a boxed error.
pub fn new(source: impl std::error::Error + Send + Sync + 'static) -> Self {
Self {
source: Box::new(source),
}
}

/// Create a new error from a string.
pub fn message(msg: &str) -> Self {
Self {
source: Box::new(GenericError(msg.to_owned())),
}
}
}
impl From<Box<dyn std::error::Error + Send + Sync>> for ComponentError {
fn from(source: Box<dyn std::error::Error + Send + Sync>) -> Self {
Self { source }
}
}

impl From<anyhow::Error> for ComponentError {
fn from(source: anyhow::Error) -> Self {
Self::message(&source.to_string())
}
}

/// Trait that allows for conversion of a result into a component error.
pub trait IntoComponentResult<T, E>
where
E: std::error::Error + Send + Sync + 'static,
{
/// Convert a Result<T,E> into a Result<T, ComponentError>.
fn into_component_error(self) -> Result<T, ComponentError>;
}

impl<T, E> IntoComponentResult<T, E> for Result<T, E>
where
E: std::error::Error + Send + Sync + 'static,
{
fn into_component_error(self) -> Result<T, ComponentError> {
self.map_err(ComponentError::new)
}
}
pub use anyhow::Error as ComponentError;
pub use serde_json::Value;

#[derive(Debug)]
struct GenericError(String);
Expand Down
4 changes: 2 additions & 2 deletions crates/wick/flow-component/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ pub trait Component {
invocation: Invocation,
data: Option<RuntimeConfig>,
callback: Arc<RuntimeCallback>,
) -> BoxFuture<Result<PacketStream, ComponentError>>;
) -> BoxFuture<Result<PacketStream, anyhow::Error>>;

/// The `signature` method returns the [ComponentSignature] for the component.
fn signature(&self) -> &ComponentSignature;

/// The `shutdown` method is called when the component is shutdown.
fn shutdown(&self) -> BoxFuture<Result<(), ComponentError>> {
fn shutdown(&self) -> BoxFuture<Result<(), anyhow::Error>> {
// Override if you need a more explicit shutdown.
Box::pin(async move { Ok(()) })
}
Expand Down
2 changes: 0 additions & 2 deletions crates/wick/flow-expression-parser/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use thiserror::Error;

// type BoxedSyncSendError = Box<dyn std::error::Error + Sync + std::marker::Send>;

/// Error type for the flow expression parser.
#[derive(Error, Debug, Clone, PartialEq)]
#[non_exhaustive]
Expand Down
1 change: 1 addition & 0 deletions crates/wick/flow-graph-interpreter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ uuid = { workspace = true, features = ["v4"] }
parking_lot = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true, features = ["derive"] }
anyhow = { version = "1.0" }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use anyhow::{anyhow, bail};
use flow_component::{ComponentError, Context, Operation, RenderConfiguration};
use futures::FutureExt;
use serde_json::{json, Value};
Expand Down Expand Up @@ -150,10 +151,10 @@ fn get_inner_array(value: &mut Value, depth: i16) -> Result<&mut Value, Componen
Value::Array(ref mut array) => {
let inner = array
.last_mut()
.ok_or_else(|| ComponentError::message("Invalid structured in bracketed streams"))?;
.ok_or_else(|| anyhow!("Invalid structure in bracketed streams"))?;
get_inner_array(inner, depth - 1)
}
_ => Err(ComponentError::message("Value is not an array")),
_ => bail!("Value {} is not an array", value),
}
}

Expand All @@ -162,12 +163,11 @@ impl RenderConfiguration for Op {
type ConfigSource = RuntimeConfig;

fn decode_config(data: Option<Self::ConfigSource>) -> Result<Self::Config, ComponentError> {
let config = data.ok_or_else(|| {
ComponentError::message("Collect component requires configuration, please specify configuration.")
})?;
let config =
data.ok_or_else(|| anyhow!("Collect component requires configuration, please specify configuration."))?;

Ok(Self::Config {
inputs: config.coerce_key("inputs").map_err(ComponentError::new)?,
inputs: config.coerce_key("inputs")?,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use anyhow::anyhow;
use flow_component::{ComponentError, Context, Operation, RenderConfiguration};
use futures::FutureExt;
use wasmrs_rx::Observer;
Expand Down Expand Up @@ -109,12 +110,11 @@ impl RenderConfiguration for Op {
type ConfigSource = RuntimeConfig;

fn decode_config(data: Option<Self::ConfigSource>) -> Result<Self::Config, ComponentError> {
let config = data.ok_or_else(|| {
ComponentError::message("Merge component requires configuration, please specify configuration.")
})?;
let config =
data.ok_or_else(|| anyhow!("Merge component requires configuration, please specify configuration."))?;

Ok(Self::Config {
inputs: config.coerce_key("inputs").map_err(ComponentError::new)?,
inputs: config.coerce_key("inputs")?,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use flow_component::{ComponentError, Context, Operation, RenderConfiguration};
use futures::{FutureExt, StreamExt};
use serde_json::Value;
Expand Down Expand Up @@ -120,26 +121,23 @@ impl RenderConfiguration for Op {
type ConfigSource = RuntimeConfig;

fn decode_config(data: Option<Self::ConfigSource>) -> Result<Self::Config, ComponentError> {
let config = data.ok_or_else(|| {
ComponentError::message("Pluck component requires configuration, please specify configuration.")
})?;
let config =
data.ok_or_else(|| anyhow!("Pluck component requires configuration, please specify configuration."))?;

for (k, v) in config {
if k == "field" {
let field: String = serde_json::from_value(v).map_err(ComponentError::new)?;
let field: String = serde_json::from_value(v)?;
warn!("pluck should be configured with 'path' as an array of strings, 'field' is deprecated and will be removed in a future release.");
return Ok(Self::Config {
field: field.split('.').map(|s| s.to_owned()).collect(),
});
}
if k == "path" {
let field: Vec<String> = serde_json::from_value(v).map_err(ComponentError::new)?;
let field: Vec<String> = serde_json::from_value(v)?;
return Ok(Self::Config { field });
}
}
Err(ComponentError::message(
"invalid configuration for pluck, 'path' field is required",
))
Err(anyhow!("invalid configuration for pluck, 'path' field is required",))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use flow_component::{ComponentError, Context, Operation, RenderConfiguration};
use serde_json::Value;
use wick_interface_types::{operation, OperationSignature};
Expand Down Expand Up @@ -63,12 +64,11 @@ impl RenderConfiguration for Op {
type ConfigSource = RuntimeConfig;

fn decode_config(data: Option<Self::ConfigSource>) -> Result<Self::Config, ComponentError> {
let config = data.ok_or_else(|| {
ComponentError::message("Sender component requires configuration, please specify configuration.")
})?;
let config =
data.ok_or_else(|| anyhow!("Sender component requires configuration, please specify configuration."))?;

Ok(Self::Config {
output: config.coerce_key("output").map_err(ComponentError::new)?,
output: config.coerce_key("output")?,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::hash::Hash;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicUsize, Ordering};
use std::sync::Arc;

use anyhow::anyhow;
use flow_component::{ComponentError, Context, Operation, RenderConfiguration, RuntimeCallback};
use futures::{FutureExt, StreamExt};
use parking_lot::Mutex;
Expand Down Expand Up @@ -719,30 +720,26 @@ impl RenderConfiguration for Op {
type ConfigSource = RuntimeConfig;

fn decode_config(data: Option<Self::ConfigSource>) -> Result<Self::Config, ComponentError> {
let config = data.ok_or_else(|| {
ComponentError::message("Switch component requires configuration, please specify configuration.")
})?;
let config =
data.ok_or_else(|| anyhow!("Switch component requires configuration, please specify configuration."))?;
Ok(Self::Config {
inputs: if config.has("context") {
config.coerce_key("context")
} else if config.has("inputs") {
config.coerce_key("inputs")
} else {
Ok(Vec::new())
}
.map_err(ComponentError::new)?,
}?,
outputs: if config.has("outputs") {
config.coerce_key("outputs")
} else {
Ok(Vec::new())
}
.map_err(ComponentError::new)?,
}?,
cases: if config.has("cases") {
config.coerce_key("cases")
} else {
Ok(Vec::new())
}
.map_err(ComponentError::new)?,
}?,
default: config.coerce_key("default").map_err(ComponentError::new)?,
})
}
Expand Down
Loading

0 comments on commit 534d209

Please sign in to comment.