Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add backoff and reliability #172

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,225 changes: 730 additions & 495 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 15 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,20 @@ inherits = "release"
lto = true

[workspace.dependencies]
adaptive_backoff = "0.2.1"
anyhow = { version = "1.0", default-features = false }
async-std = { version = "1.13", default-features = false, features = ["attributes", "tokio1"] }
async-trait = { version = "0.1", default-features = false}
futures = { version = "0.3", default-features = false }
reqwest = { version = "0.12" }
humantime = "2.1.0"
url = { version = "2.5", default-features = false, features = ["serde"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
humantime-serde = { version = "1.1", default-features = false }
tiny_http = "0.12"

fluvio = { git = "https://github.com/infinyon/fluvio", tag = "v0.12.0" }
fluvio-connector-common = { git = "https://github.com/infinyon/fluvio", tag = "v0.12.0" }
fluvio-smartmodule = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.0" }
fluvio-smartengine = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.0" }

fluvio = { git = "https://github.com/infinyon/fluvio", branch = "fix_shutdown_cdk_future" }
sehz marked this conversation as resolved.
Show resolved Hide resolved
fluvio-connector-common = { git = "https://github.com/infinyon/fluvio", branch = "fix_shutdown_cdk_future" }
fluvio-smartmodule = { git = "https://github.com/infinyon/fluvio.git", branch = "fix_shutdown_cdk_future" }
fluvio-smartengine = { git = "https://github.com/infinyon/fluvio.git", branch = "fix_shutdown_cdk_future" }
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,26 @@ In this case, additional transformation will be performed before records are sen
Read more about [JSON to JSON transformations](https://www.fluvio.io/smartmodules/certified/jolt/).

### Offset Management

Fluvio Consumer Offset feature allows for a connector to store the offset in the Fluvio cluster and use it on restart.
To activate it, you need to provide the `consumer` name and set the `strategy: auto`.
See the example below:

```yaml
apiVersion: 0.2.0
apiVersion: 0.1.0
meta:
version: 0.2.11
name: my-http-sink
type: http-sink
topic:
meta:
name: http-sink-topic
topic: http-sink-topic
consumer:
id: my-http-sink
offset:
strategy: auto
start: beginning
flush-period:
secs: 10
nanos: 0
http:
endpoint: "http://127.0.0.1/post"
```
Expand All @@ -185,6 +189,7 @@ $ fluvio consumer list
CONSUMER TOPIC PARTITION OFFSET LAST SEEN
my-http-sink http-sink-topic 0 0 3s
```

## Contributing

Follow on the conventional `CONTRIBUTING.md` file to setup your environment and
Expand Down
19 changes: 9 additions & 10 deletions crates/http-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ description = "Connector that reads data from a topic and sends to external HTTP
edition = "2021"

[dependencies]
async-trait = { version = "0.1", default-features = false}
futures = { version = "0.3", default-features = false }
adaptive_backoff = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true, default-features = false}
futures = { workspace = true, default-features = false }
async-std = { workspace = true }
anyhow = { version = "1.0" }
reqwest = { version = "0.12" }
url = { version = "2.5", default-features = false, features = ["serde"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
humantime-serde = { version = "1.1", default-features = false }
reqwest = { workspace = true }
humantime = {workspace = true}
url = { workspace = true, default-features = false, features = ["serde"] }
serde = { workspace = true, default-features = false, features = ["derive"] }
humantime-serde = { workspace = true, default-features = false }

fluvio = { workspace = true }
fluvio-connector-common = { workspace = true, features = ["derive"] }

# transitive dependency selection (resolve conflict in ver selection among deps)
bytes = { version = "1.8.0" }
2 changes: 1 addition & 1 deletion crates/http-sink/Connector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "http-sink"
group = "infinyon"
version = "0.2.11"
apiVersion = "0.1.0"
fluvio = "0.12.0"
fluvio = "0.13.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this work with previous version of fluvio? Is there reason to bump this up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is only metadata of which fluvio version this connector is using.

Should work with previous fluvio version too.

description = "HTTP sink connector reads records from data streaming and generates an HTTP request"
license = "Apache-2.0"
visibility = "public"
Expand Down
18 changes: 18 additions & 0 deletions crates/http-sink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ pub(crate) struct HttpConfig {
/// Http connect timeout in milliseconds
#[serde(with = "humantime_serde", default = "default_http_connect_timeout")]
pub http_connect_timeout: Duration,

/// Maximum backoff duration to reconnect to the database
#[serde(with = "humantime_serde", default = "default_backoff_max")]
pub backoff_max: Duration,

/// Minimum backoff duration to reconnect to the database
#[serde(with = "humantime_serde", default = "default_backoff_min")]
pub backoff_min: Duration,
}

#[inline]
Expand All @@ -58,3 +66,13 @@ fn default_http_method() -> String {
fn default_http_headers() -> Vec<String> {
DEFAULT_HTTP_HEADERS.map(String::from).into_iter().collect()
}

#[inline]
fn default_backoff_max() -> Duration {
Duration::from_secs(60)
}

#[inline]
fn default_backoff_min() -> Duration {
Duration::from_secs(1)
}
73 changes: 66 additions & 7 deletions crates/http-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,88 @@
mod config;
mod sink;

use anyhow::Result;
use adaptive_backoff::prelude::{
Backoff, BackoffBuilder, ExponentialBackoff, ExponentialBackoffBuilder,
};
use anyhow::{anyhow, Result};
use config::HttpConfig;
use fluvio::{consumer::Record, dataplane::link::ErrorCode};
use futures::{SinkExt, StreamExt};
use sink::HttpSink;

use fluvio_connector_common::{connector, consumer::ConsumerStream, tracing, Sink};
use fluvio_connector_common::{
connector,
consumer::ConsumerStream,
tracing::{self, debug, error, info, warn},
LocalBoxSink, Sink,
};

const SIGNATURES: &str = concat!("InfinyOn HTTP Sink Connector ", env!("CARGO_PKG_VERSION"));

#[connector(sink)]
async fn start(config: HttpConfig, mut stream: impl ConsumerStream) -> Result<()> {
tracing::debug!(?config);
let mut backoff = backoff_init(&config)?;
debug!(?config);

let sink = HttpSink::new(&config)?;
let mut sink = sink.connect(None).await?;

tracing::info!("Starting {SIGNATURES}");
info!("Starting {SIGNATURES}");
while let Some(item) = stream.next().await {
tracing::debug!("Received record in consumer");
let str = String::from_utf8(item?.as_ref().to_vec())?;
sink.send(str).await?;
if let Err(err) = process_item(&mut sink, &mut backoff, &config, item).await {
error!("Error processing item: {}", err);
}
}
tracing::info!("Consumer loop finished");
info!("Consumer loop finished");

Ok(())
}

async fn process_item(
sink: &mut LocalBoxSink<String>,
backoff: &mut ExponentialBackoff,
config: &HttpConfig,
item: Result<Record, ErrorCode>,
) -> Result<()> {
let str = String::from_utf8(item?.as_ref().to_vec())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this need to be string? why not binary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! It should be, it was like it already, but I changed to use bytes instead.

loop {
match sink.send(str.clone()).await {
Ok(_) => {
backoff.reset();
break;
}
Err(err) => {
error!("Error sending operation to sink: {}", err);
*sink = HttpSink::new(config)?.connect(None).await?;
backoff_and_wait(backoff, config).await?;
}
}
}

Ok(())
}

sehz marked this conversation as resolved.
Show resolved Hide resolved
async fn backoff_and_wait(backoff: &mut ExponentialBackoff, config: &HttpConfig) -> Result<()> {
let wait = backoff.wait();
if wait < config.backoff_max {
warn!(
"Waiting {} before next attempting to db",
humantime::format_duration(wait)
);
async_std::task::sleep(wait).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use fluvio sleep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need add fluvio_future? is it worth?

I think that we should have a sleep public in fluvio-connector-common. wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to keep dep minimize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I changed to use sleep from fluvio_future

Ok(())
} else {
let err_msg = "Max retry on SQL Execution, shutting down";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not SQL sink

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

error!(err_msg);
Err(anyhow!(err_msg))
}
}

fn backoff_init(config: &HttpConfig) -> Result<ExponentialBackoff> {
ExponentialBackoffBuilder::default()
.factor(1.5)
.min(config.backoff_min)
.max(config.backoff_max)
.build()
}
3 changes: 3 additions & 0 deletions crates/http-sink/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ mod test {
headers: vec!["Content-Type: text/html".into()],
http_connect_timeout: Duration::from_secs(1),
http_request_timeout: Duration::from_secs(15),
backoff_max: Duration::from_secs(60),
backoff_min: Duration::from_secs(1),
};

let sink = HttpSink::new(&config).unwrap();
let req = sink.request.build().unwrap();

Expand Down
5 changes: 2 additions & 3 deletions crates/tiny-http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ license = "Apache 2.0"
authors = ["Fluvio Contributors <[email protected]>"]

[dependencies]
tiny_http = "0.12"
tiny_http = { workspace = true }
async-std = { workspace = true }
serde = { version = "1.0", default-features = false, features = ["derive"]}

serde = { workspace = true, default-features = false, features = ["derive"]}
67 changes: 58 additions & 9 deletions tests/auto-offset-management.bats
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ load './bats-helpers/bats-assert/load'
load './bats-helpers/tools_check.bash'

setup_file() {
CDK_BIN=${CDK_BIN:-cdk}
export CDK_BIN
FLUVIO_BIN=${FLUVIO_BIN:-fluvio}
export FLUVIO_BIN
TEST_DIR="$(mktemp -d -t auto-offset-mngt-test.XXXXX)"
export TEST_DIR
}
Expand All @@ -20,7 +24,7 @@ setup() {
./target/debug/tiny-http-server & disown
MOCK_PID=$!

fluvio topic create $TOPIC
$FLUVIO_BIN topic create $TOPIC
}

@test "Read stream from the beginning" {
Expand All @@ -44,10 +48,10 @@ http:
interval: 3s
EOF

echo "RecordOne" | fluvio produce $TOPIC
echo "RecordTwo" | fluvio produce $TOPIC
echo "RecordOne" | $FLUVIO_BIN produce $TOPIC
echo "RecordTwo" | $FLUVIO_BIN produce $TOPIC

cdk deploy -p http-sink start --config $CONFIG_PATH --log-level info
$CDK_BIN deploy -p http-sink start --config $CONFIG_PATH --log-level info

wait_for_line_in_file "monitoring started" $LOG_PATH 30

Expand Down Expand Up @@ -75,23 +79,68 @@ http:
interval: 3s
EOF

cdk deploy -p http-sink start --config $CONFIG_PATH --log-level info
$CDK_BIN deploy -p http-sink start --config $CONFIG_PATH --log-level info

wait_for_line_in_file "monitoring started" $LOG_PATH 30

echo "RecordOne" | fluvio produce $TOPIC
echo "RecordOne" | $FLUVIO_BIN produce $TOPIC
sleep 15
echo "RecordTwo" | fluvio produce $TOPIC
echo "RecordTwo" | $FLUVIO_BIN produce $TOPIC

wait_for_line_in_file "RecordOne" $LOGGER_FILENAME 30
wait_for_line_in_file "RecordTwo" $LOGGER_FILENAME 30

OFFSET=$(fluvio consumer list -O json | jq ".[] | select(.consumer_id == \"$CONNECTOR_NAME\") | .offset")
OFFSET=$($FLUVIO_BIN consumer list -O json | jq ".[] | select(.consumer_id == \"$CONNECTOR_NAME\") | .offset")
assert [ ! -z $OFFSET ]

}

@test "Backoff and retry on failure" {
CONFIG_PATH="$TEST_DIR/$TOPIC.yaml"
cat <<EOF >$CONFIG_PATH
apiVersion: 0.2.0
meta:
version: 0.1.0
name: $CONNECTOR_NAME
type: http-sink
topic:
meta:
name: $TOPIC
consumer:
id: $CONNECTOR_NAME
offset:
strategy: auto
start: beginning
http:
endpoint: http://localhost:8080
interval: 3s
EOF

echo "RecordOne" | $FLUVIO_BIN produce $TOPIC
echo "RecordTwo" | $FLUVIO_BIN produce $TOPIC

$CDK_BIN deploy -p http-sink start --config $CONFIG_PATH --log-level info

wait_for_line_in_file "monitoring started" $LOG_PATH 30

wait_for_line_in_file "RecordOne" $LOGGER_FILENAME 30
wait_for_line_in_file "RecordTwo" $LOGGER_FILENAME 30

kill $MOCK_PID

echo "RecordThree" | $FLUVIO_BIN produce $TOPIC
echo "RecordFour" | $FLUVIO_BIN produce $TOPIC

sleep 20

./target/debug/tiny-http-server & disown
MOCK_PID=$!

wait_for_line_in_file "RecordThree" $LOGGER_FILENAME 30
wait_for_line_in_file "RecordFour" $LOGGER_FILENAME 30
}

teardown() {
cdk deploy shutdown --name $CONNECTOR_NAME
$CDK_BIN deploy shutdown --name $CONNECTOR_NAME
kill $MOCK_PID
}
Loading