Skip to content

Commit

Permalink
feat: output errors with contexts and root causes (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
galibey authored Jan 22, 2024
1 parent fd44d05 commit 853b336
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 69 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ jobs:
version: stable
- name: Check Fluvio Installation
run: make test_fluvio_install
- name: Install Fluvio SMDK
run: fluvio install smdk
- uses: actions/checkout@v4
- name: Install Rust stable
uses: actions-rs/toolchain@v1
Expand Down
93 changes: 47 additions & 46 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ This is an example of simple connector config file:
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.3.0
version: 0.3.1
name: cat-facts
type: http-source
topic: cat-facts
Expand Down Expand Up @@ -73,7 +73,7 @@ Fluvio HTTP Source Connector supports Secrets in the `endpoint` and in the `head
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.3.0
version: 0.3.1
name: cat-facts
type: http-source
topic: cat-facts
Expand All @@ -99,7 +99,7 @@ The previous example can be extended to add extra transformations to outgoing re
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.3.0
version: 0.3.1
name: cat-facts
type: http-source
topic: cat-facts
Expand Down Expand Up @@ -139,7 +139,7 @@ Provide the `stream` configuration option to enable streaming mode with `delimit
# config-example.yaml
apiVersion: 0.1.0
meta:
version: 0.3.0
version: 0.3.1
name: wiki-updates
type: http-source
topic: wiki-updates
Expand All @@ -148,4 +148,4 @@ http:
method: GET
stream: true
delimiter: "\n\n"
```
```
4 changes: 2 additions & 2 deletions crates/http-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ tokio = { version = "1.28", default-features = false, features = ["time"] }
encoding_rs = { version = "0.8", default-features = false }
mime = { version = "0.3", default-features = false }

fluvio = { git = "https://github.com/infinyon/fluvio", tag = "v0.11.2" }
fluvio-connector-common = { git = "https://github.com/infinyon/fluvio", tag = "v0.11.2", features = ["derive"] }
fluvio = { git = "https://github.com/infinyon/fluvio", tag = "v0.11.3" }
fluvio-connector-common = { git = "https://github.com/infinyon/fluvio", tag = "v0.11.3", features = ["derive"] }
4 changes: 2 additions & 2 deletions crates/http-source/Connector.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "http-source"
group = "infinyon"
version = "0.3.0"
version = "0.3.1"
apiVersion = "0.1.0"
fluvio = "0.10.16"
fluvio = "0.11.3"
description = "HTTP source connector"
license = "Apache-2.0"
visibility = "public"
Expand Down
13 changes: 6 additions & 7 deletions crates/http-source/src/http_streaming_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<'a> Source<'a, String> for HttpStreamingSource {
.try_clone()
.context("request must be cloneable")?;

let response = request.send().await.context("request failed")?;
let response = request.send().await.context("send request")?;

let response_metadata = HttpResponseMetadata::new(&response)?;
let encoding = transfer_encoding(&response);
Expand All @@ -42,8 +42,7 @@ impl HttpStreamingSource {
pub(crate) fn new(config: &HttpConfig) -> Result<Self> {
let client = Client::new();
let method = config.method.parse()?;
let url =
Url::parse(&config.endpoint.resolve()?).context("unable to parse http endpoint")?;
let url = Url::parse(&config.endpoint.resolve()?).context("parse http endpoint")?;
let mut request = client.request(method, url);

request = request.header(reqwest::header::USER_AGENT, config.user_agent.clone());
Expand Down Expand Up @@ -116,7 +115,7 @@ async fn read_http_stream(
dequeue_and_forward_records(&mut buf, &tx, &delimiter, encoding)
}
Err(e) => {
error!("could not read data from http response stream: {}", e);
error!("could not read data from http response stream: {e}");
}
}
}
Expand All @@ -134,7 +133,7 @@ fn dequeue_and_forward_records(

let stream_result = tx.send(decoded_record);
if let Err(e) = stream_result {
error!("Couldn't send bytes to formatting task: {}", e);
error!("Couldn't send bytes to formatting task: {e}");
}
}
}
Expand All @@ -153,11 +152,11 @@ async fn write_to_output_stream(
let stream_result = tx.send(record);

if let Err(e) = stream_result {
error!("Couldn't send records to output stream: {}", e);
error!("Couldn't send records to output stream: {e}");
}
}
Err(err) => {
error!("Error formatting record: {}", err);
error!("Error formatting record: {err:?}");
}
}
}
Expand Down
Loading

0 comments on commit 853b336

Please sign in to comment.