Skip to content

Commit

Permalink
finish binding/linking/starting of connectors
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wahl <[email protected]>
  • Loading branch information
Matthias Wahl committed Aug 18, 2021
1 parent f005d88 commit 185a121
Show file tree
Hide file tree
Showing 16 changed files with 1,111 additions and 92 deletions.
9 changes: 6 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lto = "thin"
opt-level = 3

[dependencies]
anyhow = "1"
async-channel = "1"
async-compat = "0.2"
async-compression = { version = "0.3", features = [
Expand Down Expand Up @@ -74,6 +75,7 @@ serde_yaml = "0.8"
simd-json = { version = "0.4", features = ["known-key"] }
simd-json-derive = "0.2"
snap = "1"
surf = "=2.2.0"
syslog_loose = "0.14"
tremor-common = { path = "tremor-common" }
tremor-influx = { path = "tremor-influx" }
Expand Down Expand Up @@ -106,7 +108,7 @@ lapin = "1.7.1"
elastic = "0.21.0-pre.5"

# ws
async-tungstenite = { version = "0.13.1", features = ["async-std-runtime"] }
async-tungstenite = { version = "0.14.0", features = ["async-std-runtime"] }
tungstenite = { version = "0.13", default-features = false }

# for tcp
Expand Down Expand Up @@ -138,6 +140,8 @@ grok = "1"
# on features for these (see static-ssl feature here)
openssl = { version = "0.10", features = ["vendored"] }

#rest onramp
tide = "0.16"

# sse-onramp
surf-sse = { git = "https://github.com/dak-x/surf-sse", tag = "2.0" }
Expand Down Expand Up @@ -171,7 +175,7 @@ googapis = { version = "0.5", default-features = false, features = [
] }
gouth = { version = "0.2" }
http = "0.2.4"
reqwest = "0.11.3"
reqwest = "0.11.4"

[dev-dependencies]
matches = "0.1"
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ pub struct Connector {

#[serde(default)]
pub(crate) on_pause: PauseBehaviour,

#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) metrics_interval_s: Option<u64>,
}

/// Configuration for a Binding
Expand Down
50 changes: 44 additions & 6 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ pub(crate) mod pb;
/// tcp server connector impl
pub(crate) mod tcp_server;

/// Home of the famous metrics collector
pub(crate) mod metrics;

use async_std::task::{self, JoinHandle};
use beef::Cow;

use crate::config::Connector as ConnectorConfig;
use crate::connectors::metrics::{MetricsMsg, MetricsSinkReporter, MetricsSourceReporter};
use crate::connectors::sink::{SinkAddr, SinkContext, SinkMsg};
use crate::connectors::source::{SourceAddr, SourceContext, SourceMsg};
use crate::errors::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -69,6 +73,10 @@ pub struct Addr {
}

impl Addr {
pub async fn send(&self, msg: Msg) -> Result<()> {
Ok(self.sender.send(msg).await?)
}

pub(crate) async fn send_sink(&self, msg: SinkMsg) -> Result<()> {
if let Some(sink) = self.sink.as_ref() {
sink.addr.send(msg).await?
Expand Down Expand Up @@ -168,12 +176,16 @@ pub enum ManagerMsg {
/// and handles available connector types
pub struct Manager {
qsize: usize,
metrics_sender: async_channel::Sender<MetricsMsg>,
}

impl Manager {
/// constructor
pub fn new(qsize: usize) -> Self {
Self { qsize }
pub fn new(qsize: usize, metrics_sender: async_channel::Sender<MetricsMsg>) -> Self {
Self {
qsize,
metrics_sender,
}
}

/// start the manager
Expand Down Expand Up @@ -303,13 +315,32 @@ impl Manager {
let mut connector_state = ConnectorState::Stopped;
dbg!(connector_state);

let source_metrics_reporter = MetricsSourceReporter::new(
url.clone(),
self.metrics_sender.clone(),
config.metrics_interval_s,
);

let default_codec = connector.default_codec();
let source_builder = source::builder(uid, &config, default_codec, self.qsize)?;
let source_builder = source::builder(
uid,
&config,
default_codec,
self.qsize,
source_metrics_reporter,
)?;
let source_ctx = SourceContext {
uid,
url: url.clone(),
};
let sink_builder = sink::builder(&config, default_codec, self.qsize)?;

let sink_metrics_reporter = MetricsSinkReporter::new(
url.clone(),
self.metrics_sender.clone(),
config.metrics_interval_s,
);
let sink_builder =
sink::builder(&config, default_codec, self.qsize, sink_metrics_reporter)?;
let sink_ctx = SinkContext {
uid,
url: url.clone(),
Expand Down Expand Up @@ -338,6 +369,7 @@ impl Manager {
connector_state = ConnectorState::Initialized;
dbg!(&connector_state);

// TODO: add connector metrics reporter (e.g. for reconnect attempts, cb's received, uptime, etc.)
task::spawn::<_, Result<()>>(async move {
// typical 1 pipeline connected to IN, OUT, ERR
let mut pipelines: HashMap<Cow<'static, str>, Vec<(TremorUrl, pipeline::Addr)>> =
Expand Down Expand Up @@ -497,7 +529,7 @@ impl Manager {
}
Msg::Start if connector_state == ConnectorState::Initialized => {
info!("[Connector::{}] Starting...", &addr.url);
// TODO: start connector
// start connector
connector_state = match connector.on_start(&ctx).await {
Ok(new_state) => new_state,
Err(e) => {
Expand Down Expand Up @@ -659,7 +691,7 @@ pub trait Connector: Send {
Ok(None)
}

/// Attempt to connect to the outside world
/// Attempt to connect to the outside world.
/// Return `Ok(true)` if a connection could be established.
/// This method will be retried if it fails or returns `Ok(false)`.
///
Expand Down Expand Up @@ -695,6 +727,12 @@ pub trait ConnectorBuilder: Sync + Send {

#[cfg(not(tarpaulin_include))]
pub async fn register_builtin_connectors(world: &World) -> Result<()> {
world
.register_builtin_connector_type(
"metrics",
Box::new(metrics::Builder::new(world.metrics_channel.clone())),
)
.await?;
world
.register_builtin_connector_type("tcp_server", Box::new(tcp_server::Builder {}))
.await?;
Expand Down
Loading

0 comments on commit 185a121

Please sign in to comment.