Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update for new version of simd-json and value trait #2357

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
425 changes: 229 additions & 196 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ rand = "0.8.5"
regex = "1.9"
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
simd-json = { version = "0.12", features = ["known-key"] }
simd-json-derive = "0.12"
simd-json = { version = "0.13", features = ["known-key"] }
simd-json-derive = "0.13"
socket2 = { version = "0.5", features = ["all"] }
tremor-common = { path = "tremor-common" }
tremor-config = { path = "tremor-config" }
Expand All @@ -95,7 +95,7 @@ tremor-script = { path = "tremor-script" }
tremor-value = { path = "tremor-value" }
tremor-interceptor = { path = "tremor-interceptor" }
url = "2.4"
value-trait = "0.6"
value-trait = "0.8"


# blaster / blackhole# codecs
Expand Down
2 changes: 1 addition & 1 deletion depricated/sink/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Sink for Amqp {
for (key, val) in
headers.iter().filter_map(|(k, v)| Some((k, v.as_array()?)))
{
for ele in val.iter().filter_map(value_trait::ValueAccess::as_str) {
for ele in val.iter().filter_map(value_trait::ValueAsScalar::as_str) {
key_val.push((key, ele));
}
}
Expand Down
2 changes: 1 addition & 1 deletion depricated/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl Sink for Nats {
for (key, val) in
headers.iter().filter_map(|(k, v)| Some((k, v.as_array()?)))
{
for ele in val.iter().filter_map(value_trait::ValueAccess::as_str) {
for ele in val.iter().filter_map(value_trait::ValueAsScalar::as_str) {
key_val.push((key, ele));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use tremor_value::Value;
use utils::reconnect::{Attempt, ConnectionLostNotifier, ReconnectRuntime};
/// quiescence stuff
pub(crate) use utils::{metrics, reconnect};
use value_trait::{Builder, Mutable, ValueAccess};
use value_trait::prelude::*;

/// Accept timeout
pub(crate) const ACCEPT_TIMEOUT: Duration = Duration::from_millis(100);
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/google.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl TokenSrc {
}
}

pub trait TokenProvider: Clone + Send + From<TokenSrc> {
pub trait TokenProvider: Clone + Send + Sync + From<TokenSrc> {
fn get_token(&mut self) -> ::std::result::Result<Arc<String>, Status>;
}

Expand Down
50 changes: 28 additions & 22 deletions src/connectors/impls/clickhouse/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::{
use chrono_tz::Tz;
pub(super) use clickhouse_rs::types::Value as CValue;
use either::Either;
use simd_json::{Value, ValueAccess};
use tremor_value::Value as TValue;
use uuid::Uuid;
use value_trait::prelude::*;

use super::DummySqlType;
use crate::errors::{Error, ErrorKind, Result};
Expand All @@ -46,7 +46,7 @@ pub(super) fn convert_value(
// We don't check that all elements of the array have the same type.
// Instead, we check that every element can be converted to the expected
// array type.
wrap_getter_error(context, ValueAccess::as_array)?
wrap_getter_error(context, ValueAsContainer::as_array)?
.iter()
.map(|value| convert_value(column_name, value, expected_inner_type))
.collect::<Result<Vec<_>>>()
Expand All @@ -72,23 +72,23 @@ pub(super) fn convert_value(
Ok(CValue::Nullable(Either::Right(Box::new(inner_value))))
}

DummySqlType::UInt8 => get_and_wrap(context, ValueAccess::as_u8, CValue::UInt8),
DummySqlType::UInt8 => get_and_wrap(context, ValueAsScalar::as_u8, CValue::UInt8),

DummySqlType::UInt16 => get_and_wrap(context, ValueAccess::as_u16, CValue::UInt16),
DummySqlType::UInt16 => get_and_wrap(context, ValueAsScalar::as_u16, CValue::UInt16),

DummySqlType::UInt32 => get_and_wrap(context, ValueAccess::as_u32, CValue::UInt32),
DummySqlType::UInt32 => get_and_wrap(context, ValueAsScalar::as_u32, CValue::UInt32),

DummySqlType::UInt64 => get_and_wrap(context, ValueAccess::as_u64, CValue::UInt64),
DummySqlType::UInt64 => get_and_wrap(context, ValueAsScalar::as_u64, CValue::UInt64),

DummySqlType::Int8 => get_and_wrap(context, ValueAccess::as_i8, CValue::Int8),
DummySqlType::Int8 => get_and_wrap(context, ValueAsScalar::as_i8, CValue::Int8),

DummySqlType::Int16 => get_and_wrap(context, ValueAccess::as_i16, CValue::Int16),
DummySqlType::Int16 => get_and_wrap(context, ValueAsScalar::as_i16, CValue::Int16),

DummySqlType::Int32 => get_and_wrap(context, ValueAccess::as_i32, CValue::Int32),
DummySqlType::Int32 => get_and_wrap(context, ValueAsScalar::as_i32, CValue::Int32),

DummySqlType::Int64 => get_and_wrap(context, ValueAccess::as_i64, CValue::Int64),
DummySqlType::Int64 => get_and_wrap(context, ValueAsScalar::as_i64, CValue::Int64),

DummySqlType::String => get_and_wrap(context, ValueAccess::as_str, |s| {
DummySqlType::String => get_and_wrap(context, ValueAsScalar::as_str, |s| {
CValue::String(Arc::new(s.as_bytes().to_vec()))
}),

Expand Down Expand Up @@ -116,25 +116,31 @@ pub(super) fn convert_value(
ErrorKind::MalformedUuid,
),

DummySqlType::DateTime => get_and_wrap(context, ValueAccess::as_u32, |timestamp| {
DummySqlType::DateTime => get_and_wrap(context, ValueAsScalar::as_u32, |timestamp| {
CValue::DateTime(timestamp, UTC)
}),

DummySqlType::DateTime64Secs => get_and_wrap(context, ValueAccess::as_i64, |timestamp| {
DummySqlType::DateTime64Secs => get_and_wrap(context, ValueAsScalar::as_i64, |timestamp| {
CValue::DateTime64(timestamp, (0, UTC))
}),

DummySqlType::DateTime64Millis => get_and_wrap(context, ValueAccess::as_i64, |timestamp| {
CValue::DateTime64(timestamp, (3, UTC))
}),
DummySqlType::DateTime64Millis => {
get_and_wrap(context, ValueAsScalar::as_i64, |timestamp| {
CValue::DateTime64(timestamp, (3, UTC))
})
}

DummySqlType::DateTime64Micros => get_and_wrap(context, ValueAccess::as_i64, |timestamp| {
CValue::DateTime64(timestamp, (6, UTC))
}),
DummySqlType::DateTime64Micros => {
get_and_wrap(context, ValueAsScalar::as_i64, |timestamp| {
CValue::DateTime64(timestamp, (6, UTC))
})
}

DummySqlType::DateTime64Nanos => get_and_wrap(context, ValueAccess::as_i64, |timestamp| {
CValue::DateTime64(timestamp, (9, UTC))
}),
DummySqlType::DateTime64Nanos => {
get_and_wrap(context, ValueAsScalar::as_i64, |timestamp| {
CValue::DateTime64(timestamp, (9, UTC))
})
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/connectors/impls/crononome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ use crate::{connectors::prelude::*, errors::err_connector_def, system::KillSwitc
use handler::{ChronomicQueue, CronEntryInt};
use serde_yaml::Value as YamlValue;
use tremor_common::time::nanotime;
use value_trait::ValueBuilder;

#[derive(Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down
6 changes: 4 additions & 2 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ use tokio::task;
use tremor_common::time::nanotime;
use tremor_value::utils::sorted_serialize;
use tremor_value::value::StaticValue;
use value_trait::Mutable;

#[derive(Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -746,7 +745,10 @@ async fn handle_response(
) -> Result<()> {
let correlation_values = event.correlation_metas();
let payload_iter = event.value_iter();
if let Some(items) = response.get_mut("items").and_then(Mutable::as_array_mut) {
if let Some(items) = response
.get_mut("items")
.and_then(ValueAsMutContainer::as_array_mut)
{
for ((mut item, correlation), payload) in items
.drain(..)
.zip(correlation_values.into_iter())
Expand Down
1 change: 0 additions & 1 deletion src/connectors/impls/exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
use crate::connectors::prelude::*;
use crate::system::{KillSwitch, ShutdownMode};
use std::time::Duration;
use value_trait::ValueAccess;

#[derive(Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down
4 changes: 1 addition & 3 deletions src/connectors/impls/gcl/writer/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::errors::Result;
use crate::connectors::prelude::*;
use googapis::google::logging::{
r#type::HttpRequest,
v2::{LogEntryOperation, LogEntrySourceLocation},
};
use prost_types::Timestamp;
use tremor_value::Value;
use value_trait::ValueAccess;

pub(crate) fn get_or_default(meta: Option<&Value>, key: &str) -> String {
meta.get_str(key).unwrap_or_default().to_string()
Expand Down
1 change: 0 additions & 1 deletion src/connectors/impls/gpubsub/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use tonic::Code;
use tremor_common::url::HttpsDefaults;
use tremor_pipeline::Event;
use tremor_value::Value;
use value_trait::ValueAccess;

#[derive(Deserialize, Clone)]
#[serde(deny_unknown_fields)]
Expand Down
1 change: 0 additions & 1 deletion src/connectors/impls/http/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use http::{
use hyper::{header::HeaderValue, Body, Method, Request, Response};
use mime::Mime;
use tremor_value::Value;
use value_trait::{Builder, ValueAccess};

/// Utility for building an HTTP request from a possibly batched event
/// and some configuration values
Expand Down
1 change: 0 additions & 1 deletion src/connectors/impls/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Request,
};
use simd_json::ValueAccess;
use std::{
convert::Infallible,
net::{SocketAddr, ToSocketAddrs},
Expand Down
8 changes: 2 additions & 6 deletions src/connectors/impls/otel/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@

#![allow(dead_code)]

use crate::connectors::prelude::*;
use crate::connectors::utils::pb;
use crate::errors::Result;
use simd_json::Builder;
use tremor_common::url;
use tremor_otelapis::opentelemetry::proto::common::v1::{
any_value, AnyValue, ArrayValue, InstrumentationLibrary, KeyValue, KeyValueList, StringKeyValue,
};
use tremor_value::{literal, StaticNode, Value};
use value_trait::ValueAccess;

pub(crate) struct OtelDefaults;
impl url::Defaults for OtelDefaults {
impl Defaults for OtelDefaults {
// We do add the port here since it's different from http's default

const SCHEME: &'static str = "https";
Expand Down
4 changes: 1 addition & 3 deletions src/connectors/impls/otel/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

#![allow(dead_code)]

use crate::errors::Result;
use crate::connectors::prelude::*;
use ::rand::Rng;
use simd_json::ValueAccess;
use std::fmt::Write;
use tremor_common::rand;
use tremor_value::Value;

pub(crate) fn random_span_id_bytes(ingest_ns_seed: u64) -> Vec<u8> {
let mut rng = tremor_common::rand::make_prng(ingest_ns_seed);
Expand Down
4 changes: 1 addition & 3 deletions src/connectors/impls/otel/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
#![allow(dead_code)]

use super::common;
use crate::connectors::prelude::*;
use crate::connectors::utils::pb;
use crate::errors::Result;
use simd_json::ValueAccess;
use tremor_otelapis::opentelemetry::proto::resource::v1::Resource;
use tremor_value::{literal, Value};

pub(crate) fn resource_to_json(pb: Resource) -> Value<'static> {
literal!({
Expand Down
8 changes: 1 addition & 7 deletions src/connectors/impls/otel/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ use super::{
id,
resource::{self, resource_to_pb},
};
use crate::connectors::prelude::*;
use crate::connectors::utils::pb::{
maybe_int_to_pbi32, maybe_int_to_pbu32, maybe_int_to_pbu64, maybe_string_to_pb,
};
use crate::errors::Result;
use simd_json::Mutable;
use tremor_value::literal;

use tremor_otelapis::opentelemetry::proto::{
collector::trace::v1::ExportTraceServiceRequest,
trace::v1::{
Expand All @@ -34,9 +31,6 @@ use tremor_otelapis::opentelemetry::proto::{
},
};

use tremor_value::Value;
use value_trait::ValueAccess;

#[allow(deprecated)]
pub(crate) fn status_to_json<'event>(data: Option<Status>) -> Value<'event> {
data.map_or_else(
Expand Down
1 change: 0 additions & 1 deletion src/connectors/impls/tcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{
errors::err_connector_def,
};
use rustls::ServerConfig;
use simd_json::ValueAccess;
use std::sync::{atomic::AtomicBool, Arc};
use tokio::{task::JoinHandle, time::timeout};
use tokio_rustls::TlsAcceptor;
Expand Down
1 change: 0 additions & 1 deletion src/connectors/impls/ws/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::connectors::{
};
use futures::StreamExt;
use rustls::ServerConfig;
use simd_json::ValueAccess;
use std::{
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
Expand Down
1 change: 0 additions & 1 deletion src/connectors/sink/channel_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use tokio::{
use tremor_common::{ids::Id, time::nanotime};
use tremor_pipeline::{CbAction, Event, SignalKind};
use tremor_value::Value;
use value_trait::ValueAccess;

/// Behavioral trait for defining if a Channel Sink needs metadata or not
pub(crate) trait SinkMetaBehaviour: Send + Sync {
Expand Down
24 changes: 17 additions & 7 deletions src/connectors/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
/// A simple source that is fed with `SourceReply` via a channel.
pub mod channel_source;

use super::{CodecReq, Connectivity};
use crate::channel::{unbounded, Sender, UnboundedReceiver, UnboundedSender};
use crate::connectors::{
metrics::SourceReporter,
prelude::*,
utils::reconnect::{Attempt, ConnectionLostNotifier},
ConnectorType, Context, Msg, QuiescenceBeacon, StreamDone,
};
Expand All @@ -29,7 +31,6 @@
use crate::pipeline::InputTarget;
pub(crate) use channel_source::{ChannelSource, ChannelSourceRuntime};
use hashbrown::HashSet;
use simd_json::Mutable;
use std::collections::{btree_map::Entry, BTreeMap};
use std::fmt::Display;
use tokio::task;
Expand All @@ -48,9 +49,6 @@
};
use tremor_script::{ast::DeployEndpoint, prelude::BaseExpr, EventPayload, ValueAndMeta};
use tremor_value::{literal, Value};
use value_trait::Builder;

use super::{CodecReq, Connectivity};

#[derive(Debug)]
/// Messages a Source can receive
Expand Down Expand Up @@ -343,7 +341,7 @@
/// if the source can not be spawned into a own process
pub(crate) fn spawn<S>(self, source: S, ctx: SourceContext) -> SourceAddr
where
S: Source + Send + 'static,
S: Source + Send + Sync + 'static,
{
// We use a unbounded channel for counterflow, while an unbounded channel seems dangerous
// there is soundness to this.
Expand Down Expand Up @@ -855,8 +853,20 @@
}

/// send a signal to all connected pipelines
async fn send_signal(&mut self, signal: Event) -> Result<()> {
for (_url, addr) in self.pipelines_out.iter().chain(self.pipelines_err.iter()) {
async fn send_signal(&self, signal: Event) -> Result<()> {
for (_url, addr) in self
.pipelines_out
.as_slice()
.iter()
.chain(self.pipelines_err.as_slice().iter())
/* */
{
addr.send(Box::new(pipeline::Msg::Signal(signal.clone())))
.await?;

Check warning on line 865 in src/connectors/source.rs

View check run for this annotation

Codecov / codecov/patch

src/connectors/source.rs#L865

Added line #L865 was not covered by tests
}
for (_url, addr) in &self.pipelines_out
/*.chain(self.pipelines_err.iter()) */
{
addr.send(Box::new(pipeline::Msg::Signal(signal.clone())))
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/tests/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tokio::process;
use tremor_common::ports::IN;
use tremor_pipeline::{CbAction, Event, EventId};
use tremor_value::{literal, value::StaticValue};
use value_trait::{Mutable, Value, ValueAccess};
use value_trait::prelude::*;

const IMAGE: &str = "elasticsearch";
const VERSION: &str = "8.6.2";
Expand Down
Loading
Loading