diff --git a/src/connectors.rs b/src/connectors.rs index 47022535c2..33e9b2e981 100644 --- a/src/connectors.rs +++ b/src/connectors.rs @@ -38,6 +38,9 @@ pub(crate) mod pb; /// tcp server connector impl pub(crate) mod tcp_server; +/// std streams connector (stdout, stderr, stdin) +pub(crate) mod std_streams; + /// Home of the famous metrics collector pub(crate) mod metrics; @@ -736,5 +739,8 @@ pub async fn register_builtin_connectors(world: &World) -> Result<()> { world .register_builtin_connector_type("tcp_server", Box::new(tcp_server::Builder {})) .await?; + world + .register_builtin_connector_type("std_stream", Box::new(std_streams::Builder {})) + .await?; Ok(()) } diff --git a/src/connectors/prelude.rs b/src/connectors/prelude.rs index 3d538762ae..c5a2407ea6 100644 --- a/src/connectors/prelude.rs +++ b/src/connectors/prelude.rs @@ -12,9 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use crate::connectors::sink::{Sink, SinkContext, SinkData}; -pub use crate::connectors::source::{Source, SourceContext, SourceReply}; +pub use crate::connectors::reconnect::ConnectionLostNotifier; +pub use crate::connectors::sink::{ + EventSerializer, ResultVec, Sink, SinkAddr, SinkContext, SinkData, SinkManagerBuilder, +}; +pub use crate::connectors::source::{ + Source, SourceAddr, SourceContext, SourceManagerBuilder, SourceReply, +}; pub use crate::connectors::{Connector, ConnectorBuilder, ConnectorContext, ConnectorState}; +pub use crate::errors::{Error, ErrorKind, Result}; pub use crate::url::TremorUrl; pub use crate::OpConfig; pub use tremor_pipeline::ConfigImpl; diff --git a/src/connectors/sink.rs b/src/connectors/sink.rs index 4548fd03f7..9025b62fdb 100644 --- a/src/connectors/sink.rs +++ b/src/connectors/sink.rs @@ -696,6 +696,7 @@ where ); self.pipelines.retain(|(url, _)| url != &id); } + // FIXME: only handle those if in the right state (see source part) SinkMsg::Start => self.sink.on_start(&mut self.ctx).await, SinkMsg::Resume => { self.paused = false; diff --git a/src/connectors/source.rs b/src/connectors/source.rs index 1a0ad39f15..5c03594a15 100644 --- a/src/connectors/source.rs +++ b/src/connectors/source.rs @@ -748,7 +748,8 @@ where } Err(e) => { warn!("[Source::{}] Error pulling data: {}", &self.ctx.url, e); - // TODO: increment metrics err + // TODO: increment error metric + // FIXME: emit event to err port } } pull_counter += 1; diff --git a/src/connectors/std_streams.rs b/src/connectors/std_streams.rs new file mode 100644 index 0000000000..855a5390ee --- /dev/null +++ b/src/connectors/std_streams.rs @@ -0,0 +1,221 @@ +// Copyright 2021, The Tremor Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use crate::connectors::prelude::*; +use crate::utils::hostname; +use async_std::io::{stderr, stdin, stdout, ReadExt, Stderr, Stdin, Stdout, Write}; +use futures::AsyncWriteExt; +use tremor_pipeline::{EventOriginUri, DEFAULT_STREAM_ID}; + +#[derive(Deserialize, Debug, Clone)] +pub enum StdStream { + Stdout, + Stderr, + Stdin, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct Config { + stream: StdStream, + #[serde(default = "Default::default")] + prefix: String, + /// print non-string payloads as raw bytes, not in debug formatting + #[serde(default = "Default::default")] + raw: bool, +} + +impl ConfigImpl for Config {} + +/// connector handling 1 std stream (stdout, stderr or stdin) +pub struct StdStreamConnector { + stream: StdStream, + prefix: String, + raw: bool, +} + +pub(crate) struct Builder {} +impl ConnectorBuilder for Builder { + fn from_config( + &self, + _id: &TremorUrl, + raw_config: &Option, + ) -> Result> { + if let Some(raw) = raw_config { + let config = Config::new(raw)?; + Ok(Box::new(StdStreamConnector { + stream: config.stream, + prefix: config.prefix, + raw: config.raw, + })) + } else { + Err(ErrorKind::MissingConfiguration(String::from("std_stream")).into()) + } + } +} + +/// stdstream source (stdin) +pub struct StdStreamSource { + stream: Stdin, + buffer: Vec, + origin_uri: EventOriginUri, +} + +impl StdStreamSource { + const INPUT_SIZE_BYTES: usize = 8192; + + fn new() -> Self { + Self { + stream: stdin(), + buffer: vec![0; Self::INPUT_SIZE_BYTES], + origin_uri: EventOriginUri { + scheme: "tremor-stdin".to_string(), + host: hostname(), + port: None, + path: vec![], + }, + } + } +} + +#[async_trait::async_trait()] +impl Source for StdStreamSource { + async fn pull_data(&mut self, _pull_id: u64, _ctx: &SourceContext) -> Result { + let len = self.stream.read(&mut self.buffer).await?; + if len == 0 { + // reached the end of stdin + // FIXME: initiate state change to stop this source + Ok(SourceReply::Empty(1000)) + } else { + Ok(SourceReply::Data { + origin_uri: self.origin_uri.clone(), + // ALLOW: len cannot be > INPUT_SIZE_BYTES + data: self.buffer[0..len].to_vec(), + meta: None, + stream: DEFAULT_STREAM_ID, + }) + } + } + + fn is_transactional(&self) -> bool { + false + } +} + +/// stdstream sink +pub struct StdStreamSink +where + T: Write + std::marker::Unpin + Send, +{ + stream: T, + prefix: String, + raw: bool, +} + +#[async_trait::async_trait()] +impl Sink for StdStreamSink +where + T: Write + std::marker::Unpin + Send, +{ + async fn on_event( + &mut self, + _input: &str, + event: tremor_pipeline::Event, + _ctx: &SinkContext, + serializer: &mut EventSerializer, + _start: u64, + ) -> ResultVec { + for (value, _meta) in event.value_meta_iter() { + let data = serializer.serialize(value, event.ingest_ns)?; + for chunk in data { + self.stream.write_all(self.prefix.as_bytes()).await?; + if self.raw { + self.stream.write_all(&chunk).await?; + } else if let Ok(s) = std::str::from_utf8(&chunk) { + self.stream.write_all(s.as_bytes()).await?; + } else { + self.stream + .write_all(format!("{:?}", &chunk).as_bytes()) + .await? + } + self.stream.write_all(b"\n").await? + } + } + self.stream.flush().await?; + Ok(vec![]) + } + + fn auto_ack(&self) -> bool { + true + } +} + +#[async_trait::async_trait()] +impl Connector for StdStreamConnector { + async fn connect( + &mut self, + _ctx: &ConnectorContext, + _notifier: ConnectionLostNotifier, + ) -> Result { + Ok(true) + } + + /// create sink if we have a stdout or stderr stream + async fn create_sink( + &mut self, + sink_context: SinkContext, + builder: SinkManagerBuilder, + ) -> Result> { + let addr = match self.stream { + StdStream::Stdout => { + let sink: StdStreamSink = StdStreamSink { + stream: stdout(), + prefix: self.prefix.clone(), + raw: self.raw, + }; + builder.spawn(sink, sink_context)? + } + StdStream::Stderr => { + let sink: StdStreamSink = StdStreamSink { + stream: stderr(), + prefix: self.prefix.clone(), + raw: self.raw, + }; + builder.spawn(sink, sink_context)? + } + StdStream::Stdin => return Ok(None), + }; + Ok(Some(addr)) + } + + async fn create_source( + &mut self, + source_context: SourceContext, + builder: SourceManagerBuilder, + ) -> Result> { + Ok(if let StdStream::Stdin = self.stream { + let source = StdStreamSource::new(); + let addr = builder.spawn(source, source_context)?; + Some(addr) + } else { + None + }) + } + + async fn on_start(&mut self, _ctx: &ConnectorContext) -> Result { + Ok(ConnectorState::Running) + } + + fn default_codec(&self) -> &str { + "json" + } +} diff --git a/src/system.rs b/src/system.rs index fd1c18f5ea..7912c9d614 100644 --- a/src/system.rs +++ b/src/system.rs @@ -46,6 +46,21 @@ lazy_static! { //ALLOW: We want this to panic, it only happens at startup time .expect("Failed to initialize id for metrics connector") }; + pub(crate) static ref STDOUT_CONNECTOR: TremorUrl = { + TremorUrl::parse("/connector/system::stdout/system/in") + //ALLOW: We want this to panic, it only happens at startup time + .expect("Failed to initialize id for stdout connector") + }; + pub(crate) static ref STDERR_CONNECTOR: TremorUrl = { + TremorUrl::parse("/connector/system::stderr/system/in") + //ALLOW: We want this to panic, it only happens at startup time + .expect("Failed to initialize id for stderr connector") + }; + pub(crate) static ref STDIN_CONNECTOR: TremorUrl = { + TremorUrl::parse("/connector/system::stdin/system/out") + //ALLOW: We want this to panic, it only happens at startup time + .expect("Failed to initialize id for stderr connector") + }; pub(crate) static ref METRICS_PIPELINE: TremorUrl = { TremorUrl::parse("/pipeline/system::metrics/system/in") //ALLOW: We want this to panic, it only happens at startup time @@ -988,6 +1003,26 @@ type: metrics .publish_pipeline(&PASSTHROUGH_PIPELINE, true, artefact_passthrough) .await?; + // Register stdout connector + let artefact: ConnectorArtefact = serde_yaml::from_str( + r#" +id: system::stdout +type: std_stream +config: + stream: stdout + "#, + )?; + self.repo + .publish_connector(&STDOUT_CONNECTOR, true, artefact) + .await?; + self.bind_connector(&STDOUT_CONNECTOR).await?; + self.reg + .find_connector(&STDOUT_CONNECTOR) + .await? + .ok_or_else(|| Error::from("Failed to initialize system::stdout connector"))?; + + // FIXME: stderr and stdin connectors + // Register stdout offramp let artefact: OfframpArtefact = serde_yaml::from_str( r#" diff --git a/src/version.rs b/src/version.rs index 0926e17b7e..5b71b39596 100644 --- a/src/version.rs +++ b/src/version.rs @@ -55,6 +55,13 @@ pub fn log() { info!("rd_kafka version: 0x{:08x}, {}", version_n, version_s); } +/// Gets the librdkafka version string +#[must_use] +pub fn rdkafka() -> String { + let (_, version) = get_rdkafka_version(); + version +} + #[cfg(test)] mod test { use super::*; diff --git a/tests/query_runtime_error.rs b/tests/query_runtime_error.rs index cc3d0bde55..1dfd370ed4 100644 --- a/tests/query_runtime_error.rs +++ b/tests/query_runtime_error.rs @@ -83,7 +83,7 @@ macro_rules! test_cases { let mut results = Vec::new(); for (id, json) in in_json.into_iter().enumerate() { let event = Event { - id: EventId::new(0, 0, (id as u64)), + id: EventId::from_id(0, 0, (id as u64)), data: json.clone_static().into(), ingest_ns: id as u64, ..Event::default() diff --git a/tests/script.rs b/tests/script.rs index 0e437e73cb..2302f0b99c 100644 --- a/tests/script.rs +++ b/tests/script.rs @@ -57,8 +57,6 @@ macro_rules! test_cases { path: vec!["snot".into()], port: Some(23), scheme: "snot".into(), - uid: 42 - }; let context = EventContext::new(id as u64, Some(&uri)); let mut meta = Value::from(Object::default()); diff --git a/tremor-cli/src/env.rs b/tremor-cli/src/env.rs index 8365057e6f..5a409e5fc4 100644 --- a/tremor-cli/src/env.rs +++ b/tremor-cli/src/env.rs @@ -31,7 +31,6 @@ pub(crate) fn setup() -> Result { // Install runtime extensions from a single source of truth tremor_runtime::functions::install(&mut fun)?; - tremor_connectors::connectors::functions::install(&mut fun); Ok(TremorCliEnv { module_path, diff --git a/tremor-cli/src/run.rs b/tremor-cli/src/run.rs index 802789522d..706aa3342b 100644 --- a/tremor-cli/src/run.rs +++ b/tremor-cli/src/run.rs @@ -348,7 +348,7 @@ fn run_trickle_source(matches: &ArgMatches, src: String) -> Result<()> { if let Err(e) = runnable.enqueue( "in", Event { - id: EventId::new(0, 0, *id), + id: EventId::from_id(0, 0, *id), data: value.clone(), ingest_ns: at, ..Event::default() diff --git a/tremor-cli/src/server.rs b/tremor-cli/src/server.rs index 91bc3074a2..5b6b106422 100644 --- a/tremor-cli/src/server.rs +++ b/tremor-cli/src/server.rs @@ -119,7 +119,6 @@ pub(crate) async fn run_dun(matches: &ArgMatches) -> Result<()> { .map(std::string::ToString::to_string); // TODO: Allow configuring this for offramps and pipelines let (world, handle) = World::start(64, storage_directory).await?; - tremor_connectors::register_builtins(&world).await?; if let Some(config_files) = matches.values_of("artefacts") { let mut yaml_files = Vec::with_capacity(16);