Skip to content

Commit

Permalink
feat: Add prefix and suffix and reorganize configuration file structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Day-OS committed Mar 27, 2024
1 parent 14175cb commit 9b9fc45
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 18 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ http:
| method | POST | String | POST, PUT |
| endpoint | - | String | HTTP URL endpoint |
| headers | - | Array\<String\> | Request header(s) "Key:Value" pairs |
| params | - | Array\<String\> | Dynamic URL parameters gathered from a json response. Set any key from a json message that you want to include into the URL as a parameter. You can also replace the parameter name by adding ": <param name> in the end of the string, like "json_key : new_key_name"|
| params | - | Array\<Mapping\>| Dynamic URL parameters gathered from a json response|
| user-agent | `fluvio/http-sink 0.2.2` | String | Request user-agent |
| http_request_timeout | 1s | String | HTTP Request Timeout |
| http_connect_timeout | 15s | String | HTTP Connect Timeout |

#### Parameter Configuration
| Option | default | type | description |
| :--------------------| :--------------------------| :-------------- | :-------------------------------------------------|
| key | - | String | The JSON key to get the value for the parameter |
| replace | - | String | Parameter's new name |
| prefix | - | String | String to be added before the value |
| suffix | - | String | String to be added after the value |

Set any key from a json message that you want to include into the URL as a parameter. You can also replace the parameter name by adding ": <param name> in the end of the string, like "json_key : new_key_name"

> By default HTTP headers will use `Content-Type: text/html` unless anothed value
> is provided to the Headers configuration.

Expand Down
1 change: 1 addition & 0 deletions crates/http-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ humantime-serde = { version = "1.1", default-features = false }
fluvio = { workspace = true }
fluvio-connector-common = { workspace = true, features = ["derive"]}
serde_json = {workspace = true}
urlencoding = "2.1.3"
13 changes: 11 additions & 2 deletions crates/http-sink/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use fluvio_connector_common::connector;
use serde::Deserialize;
use url::Url;

const DEFAULT_USER_AGENT: &str = concat!("fluvio/http-sink ", env!("CARGO_PKG_VERSION"));
Expand Down Expand Up @@ -35,7 +36,15 @@ pub(crate) struct HttpConfig {

//HTTP Parameters that can be gattered from a Message if the message is a json file
#[serde(default = "default_http_params")]
pub params: Vec<String>,
pub params: Vec<Parameter>,
}

#[derive(Deserialize, Debug, Clone)]
pub struct Parameter{
pub key: String,
pub replace: Option<String>,
pub prefix: Option<String>,
pub suffix: Option<String>
}

#[inline]
Expand Down Expand Up @@ -63,6 +72,6 @@ fn default_http_headers() -> Vec<String> {
DEFAULT_HTTP_HEADERS.map(String::from).into_iter().collect()
}

fn default_http_params () -> Vec<String>{
fn default_http_params () -> Vec<Parameter>{
vec![]
}
27 changes: 12 additions & 15 deletions crates/http-sink/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::collections::HashMap;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use reqwest::{Client, RequestBuilder};

use urlencoding::encode;
use fluvio::Offset;
use fluvio_connector_common::{tracing, LocalBoxSink, Sink};
use crate::HttpConfig;
use crate::{config::Parameter, HttpConfig};

#[derive(Debug)]
pub(crate) struct HttpSink {
Expand All @@ -15,7 +15,7 @@ pub(crate) struct HttpSink {
#[derive(Debug)]
struct Body{
request: RequestBuilder,
params: Vec<String>,
params: Vec<Parameter>,
}
impl Clone for Body {
fn clone(&self) -> Self {
Expand Down Expand Up @@ -55,19 +55,16 @@ impl Sink<String> for HttpSink {
if params.len() > 0 {
if let Ok(json_message) = serde_json::from_str::<HashMap<String, serde_json::Value>>(&record){
for param in params.into_iter() {
let (key, replace_key) = match param.split_once(':') {
Some(map) =>{
(
map.0.trim().to_owned(),
map.1.trim().to_owned()
)
let key = param.replace.unwrap_or(param.key.clone());
if json_message.contains_key(&param.key){
let mut value = json_message.get(&param.key).unwrap().to_string();
if let Some(prefix) = param.prefix{
value = prefix + &value;
}
if let Some(suffix) = param.suffix{
value = value + &suffix;
}
None => (param.clone(), param.clone())
};
if json_message.contains_key(&key){
tracing::info!("Using URL parameter: {key}");
let value = json_message.get(&key).unwrap().to_string();
body.request = body.request.query(&[(replace_key, value)]);
body.request = body.request.query(&[(encode(&key), encode(&value))]);

}
}
Expand Down

0 comments on commit 9b9fc45

Please sign in to comment.