Skip to content

Commit

Permalink
Update HTTP server library
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Feb 14, 2023
1 parent 16856f1 commit 11c1cd1
Show file tree
Hide file tree
Showing 22 changed files with 488 additions and 459 deletions.
301 changes: 149 additions & 152 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ tremor-script = { path = "tremor-script", features = ["arena-delete"] }
tremor-value = { path = "tremor-value" }
url = "2.3"
value-trait = "0.5"
tide = "0.16"
zstd = "*"
axum = "0.6"
axum-macros = "0.3"

# blaster / blackhole
hdrhistogram = "7"
Expand Down Expand Up @@ -232,7 +233,7 @@ num_cpus = "*"


[features]
default = []
default = ["integration"]

# support for 128bit numbers in tremor-value
128bit = ["tremor-value/128bit"]
Expand Down
8 changes: 3 additions & 5 deletions src/connectors/google.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub(crate) mod tests {

use super::*;
use crate::{
connectors::utils::EnvHelper,
connectors::{tests::free_port, utils::EnvHelper},
errors::{Error, Result},
};
use std::{convert::Infallible, io::Write, net::ToSocketAddrs};
Expand Down Expand Up @@ -199,7 +199,7 @@ PX8efvDMhv16QqDFF0k80d0=
async fn gouth_token() -> Result<()> {
let mut file = tempfile::NamedTempFile::new()?;

let port = crate::utils::free_port::find_free_tcp_port().await?;
let port = free_port::find_free_tcp_port().await?;
let sa = ServiceAccount {
client_email: "[email protected]".to_string(),
private_key_id: "badger".to_string(),
Expand Down Expand Up @@ -243,9 +243,7 @@ PX8efvDMhv16QqDFF0k80d0=
if let Ok(token) = provider.get_token() {
break token;
}
if attempt >= 20 {
panic!("Failed to get token");
}
assert!(attempt < 20, "Failed to get token");
attempt += 1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
};
Expand Down
15 changes: 14 additions & 1 deletion src/connectors/impls/gbq/writer/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ mod test {
scale: 0,
}],
&SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -864,6 +865,7 @@ mod test {
scale: 0,
}],
&SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -904,6 +906,7 @@ mod test {
scale: 0,
}],
&SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1129,6 +1132,7 @@ mod test {
let (rx, _tx) = bounded(1024);

let ctx = SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new(FlowInstanceId::new(AppId::default(), "flow"), "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1178,6 +1182,7 @@ mod test {
let (rx, _tx) = bounded(1024);

let ctx = SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1223,6 +1228,7 @@ mod test {
let (rx, _tx) = bounded(1024);

let ctx = SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1269,6 +1275,7 @@ mod test {
let (rx, _tx) = bounded(1024);

let ctx = SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1313,6 +1320,7 @@ mod test {
let (rx, _tx) = bounded(1024);

let ctx = SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1348,6 +1356,7 @@ mod test {
let (rx, _tx) = bounded(1024);

let ctx = SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1408,6 +1417,7 @@ mod test {
"",
Event::signal_tick(),
&SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1450,6 +1460,7 @@ mod test {
"",
Event::signal_tick(),
&SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1523,6 +1534,7 @@ mod test {
);

let ctx = SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new("flow", "connector"),
ConnectorType::default(),
Expand Down Expand Up @@ -1605,8 +1617,9 @@ mod test {
);

let ctx = SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
AAlias::new(FlowInstanceId::new(AppId::default(), "flow"), "connector"),
Alias::new(FlowInstanceId::new(AppId::default(), "flow"), "connector"),
ConnectorType::default(),
QuiescenceBeacon::default(),
ConnectionLostNotifier::new(crate::channel::bounded(1024).0),
Expand Down
7 changes: 4 additions & 3 deletions src/connectors/impls/gcl/writer/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ mod test {
google::tests::TestTokenProvider, utils::quiescence::QuiescenceBeacon,
};
use crate::ids::{AppId, FlowInstanceId};
use bytes::Bytes;
use futures::future::Ready;
use googapis::google::logging::r#type::LogSeverity;
use googapis::google::logging::v2::WriteLogEntriesResponse;
Expand Down Expand Up @@ -371,7 +372,7 @@ mod test {
ConnectionLostNotifier::new(connection_lost_tx),
);

sink.connect(&ctx, &Attempt::default()).await?;
sink.connect(&sink_context, &Attempt::default()).await?;

let event = Event {
id: EventId::new(1, 2, 3, 4),
Expand All @@ -388,7 +389,7 @@ mod test {
sink.on_event(
"",
event.clone(),
&ctx,
&sink_context,
&mut EventSerializer::new(
None,
CodecReq::Structured,
Expand Down Expand Up @@ -460,7 +461,7 @@ mod test {
"",
Event::signal_tick(),
&SinkContext::new(
// NodeId::default()
openraft::NodeId::default(),
SinkUId::default(),
Alias::new(FlowInstanceId::new(AppId::default(), ""), ""),
ConnectorType::default(),
Expand Down
6 changes: 4 additions & 2 deletions src/connectors/tests/clickhouse/more_complex_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ use tremor_value::literal;

use super::utils;
use crate::{
connectors::{impls::clickhouse, tests::ConnectorHarness},
connectors::{
impls::clickhouse,
tests::{free_port, ConnectorHarness},
},
errors::{Error, Result},
utils::free_port,
};

macro_rules! assert_row_equals {
Expand Down
3 changes: 1 addition & 2 deletions src/connectors/tests/clickhouse/simple_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
use crate::{
connectors::{
impls::clickhouse,
tests::{clickhouse::utils, ConnectorHarness},
tests::{clickhouse::utils, free_port, ConnectorHarness},
},
errors::{Error, Result},
utils::free_port,
};

use std::time::{Duration, Instant};
Expand Down
14 changes: 7 additions & 7 deletions src/connectors/tests/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::time::{Duration, Instant};

use super::{setup_for_tls, ConnectorHarness};
use crate::connectors::impls::elastic;
use crate::connectors::impls::http::auth::Auth;
use crate::connectors::{impls::elastic, tests::free_port};
use crate::errors::{Error, Result};
use elasticsearch::auth::{ClientCertificate, Credentials};
use elasticsearch::cert::{Certificate, CertificateValidation};
Expand Down Expand Up @@ -44,7 +44,7 @@ async fn connector_elastic() -> Result<()> {
let _ = env_logger::try_init();

let docker = clients::Cli::default();
let port = crate::utils::free_port::find_free_tcp_port().await?;
let port = free_port::find_free_tcp_port().await?;
let image = RunnableImage::from(
GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION)
.with_env_var("discovery.type", "single-node")
Expand Down Expand Up @@ -468,7 +468,7 @@ async fn elastic_routing() -> Result<()> {
let _ = env_logger::try_init();

let docker = clients::Cli::default();
let port = super::free_port::find_free_tcp_port().await?;
let port = free_port::find_free_tcp_port().await?;
let image = RunnableImage::from(
GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION)
.with_env_var("discovery.type", "single-node")
Expand Down Expand Up @@ -792,7 +792,7 @@ async fn elastic_routing() -> Result<()> {
async fn auth_basic() -> Result<()> {
let _ = env_logger::try_init();
let docker = clients::Cli::default();
let port = crate::utils::free_port::find_free_tcp_port().await?;
let port = free_port::find_free_tcp_port().await?;
let password = "snot";
let image = RunnableImage::from(
GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION)
Expand Down Expand Up @@ -856,7 +856,7 @@ async fn auth_basic() -> Result<()> {
async fn auth_api_key() -> Result<()> {
let _ = env_logger::try_init();
let docker = clients::Cli::default();
let port = crate::utils::free_port::find_free_tcp_port().await?;
let port = free_port::find_free_tcp_port().await?;
let password = "snot";
let image = RunnableImage::from(
GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION)
Expand Down Expand Up @@ -948,7 +948,7 @@ async fn auth_client_cert() -> Result<()> {
};

let docker = clients::Cli::default();
let port = crate::utils::free_port::find_free_tcp_port().await?;
let port = free_port::find_free_tcp_port().await?;
let password = "snot";
let image = RunnableImage::from(
GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION)
Expand Down Expand Up @@ -1089,7 +1089,7 @@ async fn elastic_https() -> Result<()> {
};

let docker = clients::Cli::default();
let port = crate::utils::free_port::find_free_tcp_port().await?;
let port = free_port::find_free_tcp_port().await?;
let password = "snot";
let image = RunnableImage::from(
GenericImage::new("elasticsearch", ELASTICSEARCH_VERSION)
Expand Down
1 change: 0 additions & 1 deletion src/connectors/tests/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::{
utils::url::HttpDefaults,
},
errors::Result,
utils::free_port::find_free_tcp_port,
};
use hyper::StatusCode;
use hyper::{
Expand Down
1 change: 0 additions & 1 deletion src/connectors/tests/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{
utils::tls::TLSClientConfig,
},
errors::Result,
utils::free_port,
};
use http::StatusCode;
use http_body::Body as BodyTrait;
Expand Down
3 changes: 2 additions & 1 deletion src/connectors/tests/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
mod consumer;
mod producer;

use crate::{errors::Result, utils::free_port::find_free_tcp_port};
use super::free_port::find_free_tcp_port;
use crate::errors::Result;
use std::time::Duration;
use testcontainers::{
clients::Cli as DockerCli, core::WaitFor, images::generic::GenericImage, Container,
Expand Down
6 changes: 3 additions & 3 deletions src/connectors/tests/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use crate::{
connectors::{
impls::kafka,
tests::{
free_port::find_free_tcp_port,
kafka::{redpanda_container, PRODUCE_TIMEOUT},
ConnectorHarness,
},
},
errors::Result,
utils::free_port,
};
use beef::Cow;
use rdkafka::{
Expand Down Expand Up @@ -687,7 +687,7 @@ async fn performance() -> Result<()> {
#[tokio::test(flavor = "multi_thread")]
#[serial(kafka)]
async fn connector_kafka_consumer_unreachable() -> Result<()> {
let kafka_port = free_port::find_free_tcp_port().await?;
let kafka_port = find_free_tcp_port().await?;
let _ = env_logger::try_init();
let connector_config = literal!({
"reconnect": {
Expand Down Expand Up @@ -725,7 +725,7 @@ async fn connector_kafka_consumer_unreachable() -> Result<()> {
#[tokio::test(flavor = "multi_thread")]
async fn invalid_rdkafka_options() -> Result<()> {
let _ = env_logger::try_init();
let kafka_port = free_port::find_free_tcp_port().await?;
let kafka_port = find_free_tcp_port().await?;
let broker = format!("127.0.0.1:{kafka_port}");
let topic = "tremor_test_pause_resume";
let group_id = "invalid_rdkafka_options";
Expand Down
3 changes: 2 additions & 1 deletion src/connectors/tests/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use rand::{distributions::Alphanumeric, Rng};
use std::time::{Duration, Instant};
use testcontainers::{clients::Cli, images::generic::GenericImage, Container, RunnableImage};

use crate::utils::free_port::find_free_tcp_port;
use super::free_port::find_free_tcp_port;

const IMAGE: &str = "minio/minio";
const TAG: &str = "RELEASE.2023-01-12T02-06-16Z";

Expand Down
5 changes: 2 additions & 3 deletions src/connectors/tests/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
use crate::{
connectors::{
impls::tcp,
tests::{setup_for_tls, tcp::EchoServer, ConnectorHarness},
tests::{free_port::find_free_tcp_port, setup_for_tls, tcp::EchoServer, ConnectorHarness},
},
errors::Result,
utils::free_port,
};
use std::time::Duration;
use tokio::net::lookup_host;
Expand All @@ -41,7 +40,7 @@ async fn tcp_client() -> Result<()> {
async fn tcp_client_test(use_tls: bool) -> Result<()> {
let _ = env_logger::try_init();

let free_port = free_port::find_free_tcp_port().await?;
let free_port = find_free_tcp_port().await?;

let server_addr = format!("localhost:{free_port}");

Expand Down
Loading

0 comments on commit 11c1cd1

Please sign in to comment.