Skip to content

Commit

Permalink
refactor: use StreamConfig in Serializer (#296)
Browse files Browse the repository at this point in the history
* refactor: use `StreamConfig` in `Serializer`

* test: fix stream config

* refactor: rm compression from `Stream`/`Buffer`

* style: `stream_name`/`stream_config`

* refactor: `new` replaces `with_config`
  • Loading branch information
Devdutt Shenoi authored Dec 5, 2023
1 parent c1affbc commit f1dbca6
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 155 deletions.
22 changes: 10 additions & 12 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ mod metrics;
pub(crate) mod stream;
mod streams;

use crate::{Action, ActionResponse, ActionRoute, Config};

use self::actions_lane::Error;
pub use self::{
actions_lane::{ActionsBridge, ActionsBridgeTx},
data_lane::{DataBridge, DataBridgeTx},
};

use super::Compression;
pub use actions_lane::ActionsBridgeTx;
use actions_lane::{ActionsBridge, Error};
use data_lane::DataBridge;
pub use data_lane::DataBridgeTx;

use super::StreamConfig;
use crate::base::ActionRoute;
use crate::{Action, ActionResponse, Config};
pub use metrics::StreamMetrics;

pub trait Point: Send + Debug + Serialize + 'static {
Expand All @@ -30,8 +29,8 @@ pub trait Point: Send + Debug + Serialize + 'static {
}

pub trait Package: Send + Debug {
fn topic(&self) -> Arc<String>;
fn stream(&self) -> Arc<String>;
fn stream_config(&self) -> Arc<StreamConfig>;
fn stream_name(&self) -> Arc<String>;
// TODO: Implement a generic Return type that can wrap
// around custom serialization error types.
fn serialize(&self) -> serde_json::Result<Vec<u8>>;
Expand All @@ -41,7 +40,6 @@ pub trait Package: Send + Debug {
fn is_empty(&self) -> bool {
self.len() == 0
}
fn compression(&self) -> Compression;
}

// TODO Don't do any deserialization on payload. Read it a Vec<u8> which is in turn a json
Expand Down
108 changes: 34 additions & 74 deletions uplink/src/base/bridge/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use flume::{SendError, Sender};
use log::{debug, trace};
use serde::Serialize;

use crate::base::{Compression, StreamConfig, DEFAULT_TIMEOUT};
use crate::base::StreamConfig;

use super::{Package, Point, StreamMetrics};

Expand All @@ -27,15 +27,12 @@ pub const MAX_BUFFER_SIZE: usize = 100;
#[derive(Debug)]
pub struct Stream<T> {
pub name: Arc<String>,
pub max_buffer_size: usize,
pub flush_period: Duration,
topic: Arc<String>,
pub config: Arc<StreamConfig>,
last_sequence: u32,
last_timestamp: u64,
buffer: Buffer<T>,
tx: Sender<Box<dyn Package>>,
pub metrics: StreamMetrics,
compression: Compression,
}

impl<T> Stream<T>
Expand All @@ -44,50 +41,25 @@ where
Buffer<T>: Package,
{
pub fn new(
stream: impl Into<String>,
topic: impl Into<String>,
max_buffer_size: usize,
stream_name: impl Into<String>,
stream_config: StreamConfig,
tx: Sender<Box<dyn Package>>,
compression: Compression,
) -> Stream<T> {
let name = Arc::new(stream.into());
let topic = Arc::new(topic.into());
let buffer = Buffer::new(name.clone(), topic.clone(), compression);
let flush_period = Duration::from_secs(DEFAULT_TIMEOUT);
let metrics = StreamMetrics::new(&name, max_buffer_size);
let name = Arc::new(stream_name.into());
let config = Arc::new(stream_config);
let buffer = Buffer::new(name.clone(), config.clone());
let metrics = StreamMetrics::new(&name, config.buf_size);

Stream {
name,
max_buffer_size,
flush_period,
topic,
last_sequence: 0,
last_timestamp: 0,
buffer,
tx,
metrics,
compression,
}
}

pub fn with_config(
name: &str,
config: &StreamConfig,
tx: Sender<Box<dyn Package>>,
) -> Stream<T> {
let mut stream = Stream::new(name, &config.topic, config.buf_size, tx, config.compression);
stream.flush_period = config.flush_period;
stream
Stream { name, config, last_sequence: 0, last_timestamp: 0, buffer, tx, metrics }
}

pub fn dynamic(
stream: impl Into<String>,
stream_name: impl Into<String>,
project_id: impl Into<String>,
device_id: impl Into<String>,
max_buffer_size: usize,
tx: Sender<Box<dyn Package>>,
) -> Stream<T> {
let stream = stream.into();
let stream_name = stream_name.into();
let project_id = project_id.into();
let device_id = device_id.into();

Expand All @@ -96,10 +68,11 @@ where
+ "/devices/"
+ &device_id
+ "/events/"
+ &stream
+ &stream_name
+ "/jsonarray";
let config = StreamConfig { topic, ..Default::default() };

Stream::new(stream, topic, max_buffer_size, tx, Compression::Disabled)
Stream::new(stream_name, config, tx)
}

fn add(&mut self, data: T) -> Result<Option<Buffer<T>>, Error> {
Expand Down Expand Up @@ -127,7 +100,7 @@ where
self.last_timestamp = current_timestamp;

// if max_buffer_size is breached, flush
let buf = if self.buffer.buffer.len() >= self.max_buffer_size {
let buf = if self.buffer.buffer.len() >= self.config.buf_size {
self.metrics.add_batch();
Some(self.take_buffer())
} else {
Expand All @@ -140,10 +113,10 @@ where
// Returns buffer content, replacing with empty buffer in-place
fn take_buffer(&mut self) -> Buffer<T> {
let name = self.name.clone();
let topic = self.topic.clone();
trace!("Flushing stream name: {}, topic: {}", name, topic);
let config = self.config.clone();
trace!("Flushing stream name: {}, topic: {}", name, config.topic);

mem::replace(&mut self.buffer, Buffer::new(name, topic, self.compression))
mem::replace(&mut self.buffer, Buffer::new(name, config))
}

/// Triggers flush and async channel send if not empty
Expand Down Expand Up @@ -175,7 +148,7 @@ where
}

let status = match self.len() {
1 => StreamStatus::Init(self.flush_period),
1 => StreamStatus::Init(self.config.flush_period),
len => StreamStatus::Partial(len),
};

Expand All @@ -192,7 +165,7 @@ where
}

let status = match self.len() {
1 => StreamStatus::Init(self.flush_period),
1 => StreamStatus::Init(self.config.flush_period),
len => StreamStatus::Partial(len),
};

Expand All @@ -210,23 +183,21 @@ where
/// Buffer doesn't put any restriction on type of `T`
#[derive(Debug)]
pub struct Buffer<T> {
pub stream: Arc<String>,
pub topic: Arc<String>,
pub stream_name: Arc<String>,
pub stream_config: Arc<StreamConfig>,
pub buffer: Vec<T>,
pub anomalies: String,
pub anomaly_count: usize,
pub compression: Compression,
}

impl<T> Buffer<T> {
pub fn new(stream: Arc<String>, topic: Arc<String>, compression: Compression) -> Buffer<T> {
pub fn new(stream_name: Arc<String>, stream_config: Arc<StreamConfig>) -> Buffer<T> {
Buffer {
stream,
topic,
buffer: vec![],
buffer: Vec::with_capacity(stream_config.buf_size),
stream_name,
stream_config,
anomalies: String::with_capacity(100),
anomaly_count: 0,
compression,
}
}

Expand All @@ -236,7 +207,7 @@ impl<T> Buffer<T> {
return;
}

let error = String::from(self.stream.as_ref())
let error = self.stream_name.to_string()
+ ".sequence: "
+ &last.to_string()
+ ", "
Expand Down Expand Up @@ -268,12 +239,12 @@ where
T: Point,
Vec<T>: Serialize,
{
fn topic(&self) -> Arc<String> {
self.topic.clone()
fn stream_config(&self) -> Arc<StreamConfig> {
self.stream_config.clone()
}

fn stream(&self) -> Arc<String> {
self.stream.clone()
fn stream_name(&self) -> Arc<String> {
self.stream_name.clone()
}

fn serialize(&self) -> serde_json::Result<Vec<u8>> {
Expand All @@ -291,29 +262,18 @@ where
fn latency(&self) -> u64 {
0
}

fn compression(&self) -> Compression {
self.compression
}
}

impl<T> Clone for Stream<T> {
fn clone(&self) -> Self {
Stream {
name: self.name.clone(),
flush_period: self.flush_period,
max_buffer_size: self.max_buffer_size,
topic: self.topic.clone(),
config: self.config.clone(),
last_sequence: 0,
last_timestamp: 0,
buffer: Buffer::new(
self.buffer.stream.clone(),
self.buffer.topic.clone(),
self.compression,
),
metrics: StreamMetrics::new(&self.name, self.max_buffer_size),
buffer: Buffer::new(self.buffer.stream_name.clone(), self.buffer.stream_config.clone()),
metrics: StreamMetrics::new(&self.name, self.config.buf_size),
tx: self.tx.clone(),
compression: self.compression,
}
}
}
7 changes: 3 additions & 4 deletions uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use flume::Sender;
use log::{error, info, trace};

use super::stream::{self, StreamStatus, MAX_BUFFER_SIZE};
use super::stream::{self, StreamStatus};
use super::{Point, StreamMetrics};
use crate::base::StreamConfig;
use crate::{Config, Package, Stream};
Expand All @@ -30,7 +30,7 @@ impl<T: Point> Streams<T> {

pub fn config_streams(&mut self, streams_config: HashMap<String, StreamConfig>) {
for (name, stream) in streams_config {
let stream = Stream::with_config(&name, &stream, self.data_tx.clone());
let stream = Stream::new(&name, stream, self.data_tx.clone());
self.map.insert(name.to_owned(), stream);
}
}
Expand All @@ -50,15 +50,14 @@ impl<T: Point> Streams<T> {
&stream_name,
&self.config.project_id,
&self.config.device_id,
MAX_BUFFER_SIZE,
self.data_tx.clone(),
);

self.map.entry(stream_name.to_owned()).or_insert(stream)
}
};

let max_stream_size = stream.max_buffer_size;
let max_stream_size = stream.config.buf_size;
let state = match stream.fill(data).await {
Ok(s) => s,
Err(e) => {
Expand Down
19 changes: 16 additions & 3 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::env::current_dir;
use std::hash::Hash;
use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, fmt::Debug};
Expand Down Expand Up @@ -59,15 +60,15 @@ pub fn clock() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
}

#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default)]
#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, Hash, PartialEq, Eq)]
pub enum Compression {
#[default]
Disabled,
Lz4,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, Eq)]
pub struct StreamConfig {
pub topic: String,
#[serde(default = "max_buf_size")]
Expand Down Expand Up @@ -95,7 +96,19 @@ impl Default for StreamConfig {
}
}

#[derive(Debug, Clone, Deserialize)]
impl Hash for StreamConfig {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.topic.hash(state)
}
}

impl PartialEq for StreamConfig {
fn eq(&self, other: &Self) -> bool {
self.topic == other.topic
}
}

#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)]
pub struct Persistence {
#[serde(default = "default_file_size")]
pub max_file_size: usize,
Expand Down
Loading

0 comments on commit f1dbca6

Please sign in to comment.