diff --git a/CHANGELOG.md b/CHANGELOG.md index e33a0ef743..eb58afa604 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ ## Unreleased +### New features +* kafka schema registry codec + +### Breaking Changes +* remove schema_registry preprocessor +* remove defunct schema_registry support for avro codec + ## [0.13.0-rc.16] ### New features diff --git a/Cargo.lock b/Cargo.lock index 49f8efedac..5fbdcdc047 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3230,9 +3230,9 @@ dependencies = [ [[package]] name = "lru" -version = "0.11.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" +checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" dependencies = [ "hashbrown 0.14.1", ] @@ -4658,7 +4658,7 @@ dependencies = [ [[package]] name = "schema_registry_converter" version = "3.1.0" -source = "git+https://github.com/tremor-rs/schema_registry_converter.git?branch=housekeeping#9e88b5845cd1f0142786815d23844f3c4fa7820a" +source = "git+https://github.com/tremor-rs/schema_registry_converter.git?branch=housekeeping#8df31e3e250ead445349432a5b5803d0c9985f2d" dependencies = [ "apache-avro", "byteorder", @@ -6257,6 +6257,7 @@ dependencies = [ "base64 0.21.4", "beef", "bimap", + "bytes", "chrono", "chrono-tz", "clickhouse-rs", diff --git a/Cargo.toml b/Cargo.toml index d29346089e..a1e43791e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,12 +141,6 @@ cron = "0.12" # logstash grok patterns grok = "2" -# sse-onramp -#surf-sse = { git = "https://github.com/dak-x/surf-sse", tag = "2.0", default-features = false } - -# nats -#async-nats = "0.10.1" - # discord serenity = { version = "0.11", default-features = false, features = [ "client", @@ -203,7 +197,6 @@ simdutf8 = "0.1" [dev-dependencies] port_scanner = "0.1" serial_test = { version = "2.0", features = ["logging"] } -# path = "../serial_test/serial_test" env_logger = "0.10" matches = "0.1" pretty_assertions = "1.4" @@ -217,7 +210,7 @@ tempfile = { version = "3.8" } test-case = "3.1" testcontainers = { version = "0.14", features = ["watchdog"] } num_cpus = "1" - +bytes = "1" [features] default = [] diff --git a/docs/Makefile b/docs/Makefile index 5b60552e7c..87909f27a0 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -4,10 +4,10 @@ operator-docs: scripts/operators.sh preprocessor-docs: - scripts/gen.sh src/preprocessor preprocessors + scripts/gen.sh tremor-interceptor/src/preprocessor preprocessors postprocessor-docs: - scripts/gen.sh src/postprocessor postprocessors + scripts/gen.sh tremor-interceptor/src/postprocessor postprocessors codec-docs: scripts/gen.sh tremor-codec/src/codec codecs diff --git a/src/connectors/tests/http/client.rs b/src/connectors/tests/http/client.rs index f20ad29a1b..7450290f66 100644 --- a/src/connectors/tests/http/client.rs +++ b/src/connectors/tests/http/client.rs @@ -17,7 +17,6 @@ use crate::{ impls::http::{self as http_impl, meta::content_type}, prelude::Url, tests::{free_port::find_free_tcp_port, ConnectorHarness}, - utils::url::HttpDefaults, }, errors::Result, }; @@ -34,6 +33,7 @@ use std::{ }; use tokio::task::{spawn, JoinHandle}; use tremor_common::ports::IN; +use tremor_common::url::HttpDefaults; use tremor_pipeline::Event; use tremor_script::ValueAndMeta; use tremor_value::{literal, Value}; diff --git a/src/connectors/tests/ws.rs b/src/connectors/tests/ws.rs index 72a03c29e8..1efb4399db 100644 --- a/src/connectors/tests/ws.rs +++ b/src/connectors/tests/ws.rs @@ -14,8 +14,8 @@ use super::{free_port::find_free_tcp_port, setup_for_tls, ConnectorHarness}; use crate::channel::{bounded, Receiver, Sender, TryRecvError}; +use crate::connectors::impls::ws::WsDefaults; use crate::connectors::{impls::ws, utils::tls::TLSClientConfig}; -use crate::connectors::{impls::ws::WsDefaults, utils::url::Url}; use crate::errors::{Result, ResultExt}; use futures::SinkExt; use futures::StreamExt; @@ -46,6 +46,7 @@ use tokio_tungstenite::{ WebSocketStream, }; use tremor_common::ports::IN; +use tremor_common::url::Url; use tremor_pipeline::{Event, EventId}; use tremor_value::{literal, prelude::*, Value}; diff --git a/src/lib.rs b/src/lib.rs index 74049bd278..f1a1eb7964 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,10 +24,6 @@ clippy::pedantic, clippy::mod_module_files )] -// TODO this is needed due to a false positive in clippy -// https://github.com/rust-lang/rust/issues/83125 -// we will need this in 1.53.1 -#![allow(proc_macro_back_compat)] #[macro_use] extern crate serde; diff --git a/tremor-cli/Cargo.toml b/tremor-cli/Cargo.toml index 8cdafb8111..37409bdc49 100644 --- a/tremor-cli/Cargo.toml +++ b/tremor-cli/Cargo.toml @@ -22,7 +22,6 @@ pretty_assertions = "1.4" [dependencies] tokio = { version = "1.32", features = ["full"] } -# tokio-metrics = { version = "0.1.0", default-features = true } anyhow = "1" clap = { version = "4", features = ["color", "derive"] } clap_complete = "4" @@ -31,7 +30,6 @@ env_logger = "0.10" futures = "0.3" halfbrown = "0.2" http-types = "2" -# jemallocator = {version = "0.3", optional = false} log = "0.4" log4rs = "1" serde = "1" @@ -56,8 +54,6 @@ tremor-runtime = { version = "0.13.0-rc.16", path = "../" } tremor-script = { version = "0.13.0-rc.16", path = "../tremor-script" } tremor-value = { version = "0.13.0-rc.16", path = "../tremor-value" } url = "2" -# mimalloc-rs = { version = "0.1", default-features = true, optional = true } -# allocator_api = "0.6.0" error-chain = "0.12" globwalk = "0.8" port_scanner = "0.1" diff --git a/tremor-codec/Cargo.toml b/tremor-codec/Cargo.toml index d4f22cac81..fc99b61921 100644 --- a/tremor-codec/Cargo.toml +++ b/tremor-codec/Cargo.toml @@ -39,8 +39,6 @@ schema_registry_converter = { version = "3", default-features = false, features "easy", ], git = "https://github.com/tremor-rs/schema_registry_converter.git", branch = "housekeeping" } -# path = "schema_registry_converter" - # codecs reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls", diff --git a/tremor-codec/src/codec/avro.rs b/tremor-codec/src/codec/avro.rs index 59a610d70c..e82745357e 100644 --- a/tremor-codec/src/codec/avro.rs +++ b/tremor-codec/src/codec/avro.rs @@ -16,6 +16,13 @@ //! //! The codec is configured with a codec following the avro json codec specification //! +//! ## Configuration +//! +//! | value | optional | description | +//! |-------|----------|-------------| +//! | `schema` | no | The avro schema to use | +//! | `compression` | yes | The compression codec to use, one of `deflate`, `snappy`, `zstd`, `bzip2`, `xz`, `none` | +//! //! ## Mappings //! //! | avro | tremor (to) | tremor (from) | @@ -670,34 +677,6 @@ mod test { Ok(()) } - #[tokio::test(flavor = "multi_thread")] - async fn decode_smaple() -> Result<()> { - // [b'O', b'b', b'j', 1u8] - let from_kafka = vec![0_u8, 0, 0, 0, 1, 12, 115, 116, 114, 105, 110, 103]; - // let from_kafka = vec![b'O', b'b', b'j', 1_u8, 12, 115, 116, 114, 105, 110, 103]; - // let mut from_kafka = vec![12, 115, 116, 114, 105, 110, 103_u8]; - - let mut codec = test_codec(literal!( - { - "type": "record", - "name": "record", - "fields": [ - {"name": "one", "type": "string"}, - ] - } - ))?; - - let decoded = literal!({"one": "string"}); - - let mut encoded = codec.encode(&decoded, &Value::const_null()).await?; - assert_eq!(encoded, from_kafka); - - codec - .decode(&mut encoded, 0, Value::object()) - .await? - .expect("no data"); - Ok(()) - } #[tokio::test(flavor = "multi_thread")] async fn round_robin() -> Result<()> { let mut codec = test_codec(literal!( diff --git a/tremor-codec/src/codec/kafka_schema_registry.rs b/tremor-codec/src/codec/kafka_schema_registry.rs index 4de5aa7633..9fe339f487 100644 --- a/tremor-codec/src/codec/kafka_schema_registry.rs +++ b/tremor-codec/src/codec/kafka_schema_registry.rs @@ -16,6 +16,10 @@ //! //! The codec is configured with a codec following the avro json codec specification //! +//! ## Configuration +//! +//! - `url`: the `url` configuration is used to point to the root of the schema registry server +//! //! ## Mappings //! //! The same as the [`avro` codec](../avro) @@ -101,7 +105,7 @@ impl SchemaResover for RecordResolver<'_> { #[async_trait::async_trait()] impl Codec for Ksr { fn name(&self) -> &str { - todo!() + "kafka-schema-registry" } async fn decode<'input>( diff --git a/tremor-common/Cargo.toml b/tremor-common/Cargo.toml index 9184f005dc..2f6061fc45 100644 --- a/tremor-common/Cargo.toml +++ b/tremor-common/Cargo.toml @@ -13,7 +13,7 @@ tokio = { version = "1", features = ["full"] } rand = { version = "0.8", features = ["small_rng"] } beef = { version = "0.5", features = ["impl_serde"] } serde = "1" -url = "2" +url = { version = "2", features = ["serde"] } simd-json = { version = "0.11", features = ["known-key"] } simd-json-derive = "0.11" base64 = "0.21" diff --git a/tremor-config/src/lib.rs b/tremor-config/src/lib.rs index 92c3e0c2db..8dd37f41c1 100644 --- a/tremor-config/src/lib.rs +++ b/tremor-config/src/lib.rs @@ -12,6 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Tremor shared configuration + +#![deny(warnings)] +#![deny(missing_docs)] +#![recursion_limit = "1024"] +#![deny( + clippy::all, + clippy::unwrap_used, + clippy::unnecessary_unwrap, + clippy::pedantic, + clippy::mod_module_files +)] + use serde::Deserialize; use tremor_value::prelude::*; @@ -78,6 +91,7 @@ impl<'v> TryFrom<&Value<'v>> for NameWithConfig { /// Error for confdig #[derive(Debug, Clone, PartialEq)] pub enum Error { + /// malformed configuration InvalidConfig(String), } diff --git a/tremor-pipeline/Cargo.toml b/tremor-pipeline/Cargo.toml index 79277442af..20e16ae39a 100644 --- a/tremor-pipeline/Cargo.toml +++ b/tremor-pipeline/Cargo.toml @@ -16,7 +16,7 @@ indexmap = "2" rand = { version = "0.8", features = ["small_rng"] } lazy_static = "1" log = "0.4" -lru = "0.11" +lru = "0.12" petgraph = "0.6" regex = "1" rust-bert = { version = "0.21.0", optional = true } diff --git a/tremor-script-nif/Cargo.toml b/tremor-script-nif/Cargo.toml index 6181ec5083..2e2f00d20e 100644 --- a/tremor-script-nif/Cargo.toml +++ b/tremor-script-nif/Cargo.toml @@ -16,7 +16,7 @@ name = "tremor" crate-type = ["dylib"] [dependencies] -rustler = "0.29" +rustler = "0.30" tremor-script = { path = "../tremor-script" } [features]