Skip to content

Commit

Permalink
include feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Oct 16, 2023
1 parent 19485b4 commit 941c945
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
2 changes: 1 addition & 1 deletion tremor-codec/src/codec/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! The `avro` codec supports Apache Avro binary encoding.
//!
//! The codec is configured with a codec following the avro json codec specification
//! The codec is configured with a schema following the avro schema specification
//!
//! ## Configuration
//!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! The `kafka-schema-registry` codec supports Apache Avro binary encoding.
//! The `confluent-schema-registry` codec allows using the [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
//! as a source for avro decoding information.
//!
//! The codec is configured with a codec following the avro json codec specification
//! It can be used in combination with a kafka topic that encodes it's content in avro format and stores it's schema in the schema registry.
//!
//! For decoding avro data (from kafka or otherwise) that is manually encoded please use the [avro](./avro) codec.
//!
//! ## Configuration
//!
Expand All @@ -25,7 +28,7 @@
//! The same as the [`avro` codec](./avro)

use crate::{
avro::{avro_to_value, value_to_avro, SchemaResover, SchemaWrapper},
avro::{avro_to_value, value_to_avro, SchemaResolver, SchemaWrapper},
prelude::*,
};
use apache_avro::schema::Name;
Expand All @@ -36,24 +39,24 @@ use schema_registry_converter::{
};
use tremor_common::url::{HttpDefaults, Url};

pub struct Ksr {
pub struct Csr {
registry: Url<HttpDefaults>,
settings: SrSettings,
decoder: AvroDecoder,
encoder: AvroEncoder,
}

#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for Ksr {
impl std::fmt::Debug for Csr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KSR")
f.debug_struct("Csr")
.field("registry", &self.registry)
.field("settings", &self.settings)
.finish()
}
}

impl Clone for Ksr {
impl Clone for Csr {
fn clone(&self) -> Self {
Self {
registry: self.registry.clone(),
Expand All @@ -64,7 +67,7 @@ impl Clone for Ksr {
}
}

impl Ksr {
impl Csr {
pub(crate) fn from_config(config: Option<&Value>) -> Result<Box<dyn Codec>> {
let url = config
.get_str("url")
Expand All @@ -75,7 +78,7 @@ impl Ksr {
let settings = SrSettings::new(url.to_string());
let decoder = AvroDecoder::new(settings.clone());
let encoder = AvroEncoder::new(settings.clone());
Ok(Box::new(Ksr {
Ok(Box::new(Csr {
registry,
settings,
decoder,
Expand Down Expand Up @@ -103,9 +106,9 @@ impl SchemaResolver for RecordResolver<'_> {
}

#[async_trait::async_trait()]
impl Codec for Ksr {
impl Codec for Csr {
fn name(&self) -> &str {
"kafka-schema-registry"
"confluent-schema-registry"
}

async fn decode<'input>(
Expand Down Expand Up @@ -161,20 +164,20 @@ mod test {
#[test]
fn test_codec_creation() {
let config = literal!({"url":"http://localhost:8081"});
let codec = Ksr::from_config(Some(&config)).expect("invalid config");
assert_eq!(codec.name(), "kafka-schema-registry");
let codec = Csr::from_config(Some(&config)).expect("invalid config");
assert_eq!(codec.name(), "confluent-schema-registry");
}

#[test]
fn invalid_config() {
let config = literal!({});
let codec = Ksr::from_config(Some(&config));
let codec = Csr::from_config(Some(&config));
assert!(codec.is_err());
}
#[test]
fn invalid_url() {
let config = literal!({"url":"loc alhost:8081"});
let codec = Ksr::from_config(Some(&config));
let codec = Csr::from_config(Some(&config));
assert!(codec.is_err());
}
}
6 changes: 4 additions & 2 deletions tremor-codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ mod codec {
pub(crate) mod avro;
pub(crate) mod binary;
pub(crate) mod binflux;
pub(crate) mod confluent_schema_registry;
pub(crate) mod csv;
pub(crate) mod dogstatsd;
pub(crate) mod influx;
/// JSON codec
pub mod json;
pub(crate) mod kafka_schema_registry;
pub(crate) mod msgpack;
pub(crate) mod null;
pub(crate) mod statsd;
Expand Down Expand Up @@ -122,7 +122,9 @@ impl Debug for dyn Codec {
pub fn resolve(config: &Config) -> Result<Box<dyn Codec>> {
match config.name.as_str() {
"avro" => avro::Avro::from_config(config.config.as_ref()),
"kafka-schema-registry" => kafka_schema_registry::Ksr::from_config(config.config.as_ref()),
"confluent-schema-registry" => {
confluent_schema_registry::Csr::from_config(config.config.as_ref())
}
"binary" => Ok(Box::new(binary::Binary {})),
"binflux" => Ok(Box::<binflux::BInflux>::default()),
"csv" => Ok(Box::new(csv::Csv {})),
Expand Down

0 comments on commit 941c945

Please sign in to comment.