diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 66cdd3cdd..0c688c51a 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -60,9 +60,9 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result Result { let mut stream_exists = false; if STREAM_INFO.stream_exists(stream_name) { @@ -199,6 +201,7 @@ pub async fn create_stream_if_not_exists( "", Arc::new(Schema::empty()), stream_type, + schema_type, ) .await?; diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 8ac283cb2..dfdf5b8f6 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -505,6 +505,7 @@ fn remove_id_from_alerts(value: &mut Value) { } } +#[allow(clippy::too_many_arguments)] pub async fn create_stream( stream_name: String, time_partition: &str, @@ -513,6 +514,7 @@ pub async fn create_stream( static_schema_flag: &str, schema: Arc, stream_type: &str, + schema_type: &str, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal.to_string() { @@ -530,6 +532,7 @@ pub async fn create_stream( static_schema_flag, schema.clone(), stream_type, + schema_type, ) .await { @@ -553,6 +556,7 @@ pub async fn create_stream( static_schema_flag.to_string(), static_schema, stream_type, + schema_type, ); } Err(err) => { @@ -602,6 +606,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result Result Result<(), StreamError> { - if let Ok(stream_exists) = - create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await + if let Ok(stream_exists) = create_stream_if_not_exists( + INTERNAL_STREAM_NAME, + &StreamType::Internal.to_string(), + INTERNAL_STREAM_NAME, + ) + .await { if stream_exists { return Ok(()); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index c21f22a10..84f23c3ef 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -31,7 +31,7 @@ use crate::{ }, handlers::{ http::{ingest::PostError, kinesis}, - LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR, + LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, }, metadata::STREAM_INFO, storage::StreamType, @@ -41,34 +41,36 @@ use crate::{ pub async fn flatten_and_push_logs( req: HttpRequest, body: Bytes, - stream_name: String, + stream_name: &str, ) -> Result<(), PostError> { let log_source = req .headers() .get(LOG_SOURCE_KEY) .map(|header| header.to_str().unwrap_or_default()) .unwrap_or_default(); - if log_source == LOG_SOURCE_KINESIS { - let json = kinesis::flatten_kinesis_logs(&body); - for record in json.iter() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name.clone(), req.clone(), body.clone()).await?; + match log_source { + LOG_SOURCE_KINESIS => { + let json = kinesis::flatten_kinesis_logs(&body); + for record in json.iter() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(stream_name, req.clone(), body.clone()).await?; + } + return Ok(()); } - } else { - push_logs(stream_name, req, body).await?; + LOG_SOURCE_OTEL => { + STREAM_INFO.set_schema_type(stream_name, Some(LOG_SOURCE_OTEL.to_string()))? + } + _ => {} } + push_logs(stream_name, req.clone(), body.clone()).await?; Ok(()) } -pub async fn push_logs( - stream_name: String, - req: HttpRequest, - body: Bytes, -) -> Result<(), PostError> { - let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; - let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?; - let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?; - let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?; +pub async fn push_logs(stream_name: &str, req: HttpRequest, body: Bytes) -> Result<(), PostError> { + let time_partition = STREAM_INFO.get_time_partition(stream_name)?; + let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?; + let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; + let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?; let body_val: Value = serde_json::from_slice(&body)?; let size: usize = body.len(); let mut parsed_timestamp = Utc::now().naive_utc(); @@ -76,7 +78,7 @@ pub async fn push_logs( if custom_partition.is_none() { let size = size as u64; create_process_record_batch( - stream_name.clone(), + stream_name, req.clone(), body_val.clone(), static_schema_flag.clone(), @@ -98,7 +100,7 @@ pub async fn push_logs( let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( - stream_name.clone(), + stream_name, req.clone(), value.clone(), static_schema_flag.clone(), @@ -121,7 +123,7 @@ pub async fn push_logs( parsed_timestamp = get_parsed_timestamp(&value, &time_partition); let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( - stream_name.clone(), + stream_name, req.clone(), value.clone(), static_schema_flag.clone(), @@ -149,7 +151,7 @@ pub async fn push_logs( parsed_timestamp = get_parsed_timestamp(&value, &time_partition); let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( - stream_name.clone(), + stream_name, req.clone(), value.clone(), static_schema_flag.clone(), @@ -167,7 +169,7 @@ pub async fn push_logs( #[allow(clippy::too_many_arguments)] pub async fn create_process_record_batch( - stream_name: String, + stream_name: &str, req: HttpRequest, value: Value, static_schema_flag: Option, @@ -177,7 +179,7 @@ pub async fn create_process_record_batch( origin_size: u64, ) -> Result<(), PostError> { let (rb, is_first_event) = get_stream_schema( - stream_name.clone(), + stream_name, req.clone(), value.clone(), static_schema_flag.clone(), @@ -185,7 +187,7 @@ pub async fn create_process_record_batch( )?; event::Event { rb, - stream_name: stream_name.clone(), + stream_name: stream_name.to_string(), origin_format: "json", origin_size, is_first_event, @@ -201,7 +203,7 @@ pub async fn create_process_record_batch( } pub fn get_stream_schema( - stream_name: String, + stream_name: &str, req: HttpRequest, body: Value, static_schema_flag: Option, @@ -209,8 +211,8 @@ pub fn get_stream_schema( ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let hash_map = STREAM_INFO.read().unwrap(); let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name))? + .get(stream_name) + .ok_or(PostError::StreamNotFound(stream_name.to_string()))? .schema .clone(); into_event_batch(req, body, schema, static_schema_flag, time_partition) diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index caa111e6a..702d4c168 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -26,8 +26,8 @@ use http::StatusCode; use crate::{ handlers::{ http::logstream::error::{CreateStreamError, StreamError}, - CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, - TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, + CUSTOM_PARTITION_KEY, SCHEMA_TYPE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, + TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }, metadata::{self, STREAM_INFO}, option::{Mode, CONFIG}, @@ -48,6 +48,7 @@ pub async fn create_update_stream( static_schema_flag, update_stream_flag, stream_type, + schema_type, ) = fetch_headers_from_put_stream_request(req); if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream_flag != "true" { @@ -113,6 +114,7 @@ pub async fn create_update_stream( &static_schema_flag, schema, &stream_type, + &schema_type, ) .await?; @@ -167,13 +169,14 @@ async fn validate_and_update_custom_partition( pub fn fetch_headers_from_put_stream_request( req: &HttpRequest, -) -> (String, String, String, String, String, String) { +) -> (String, String, String, String, String, String, String) { let mut time_partition = String::default(); let mut time_partition_limit = String::default(); let mut custom_partition = String::default(); let mut static_schema_flag = String::default(); let mut update_stream = String::default(); let mut stream_type = StreamType::UserDefined.to_string(); + let mut schema_type = String::default(); req.headers().iter().for_each(|(key, value)| { if key == TIME_PARTITION_KEY { time_partition = value.to_str().unwrap().to_string(); @@ -193,6 +196,9 @@ pub fn fetch_headers_from_put_stream_request( if key == STREAM_TYPE_KEY { stream_type = value.to_str().unwrap().to_string(); } + if key == SCHEMA_TYPE_KEY { + schema_type = value.to_str().unwrap().to_string(); + } }); ( @@ -202,6 +208,7 @@ pub fn fetch_headers_from_put_stream_request( static_schema_flag, update_stream, stream_type, + schema_type, ) } @@ -378,6 +385,8 @@ pub async fn update_custom_partition_in_stream( Ok(()) } +#[allow(clippy::too_many_arguments)] + pub async fn create_stream( stream_name: String, time_partition: &str, @@ -386,6 +395,7 @@ pub async fn create_stream( static_schema_flag: &str, schema: Arc, stream_type: &str, + schema_type: &str, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal.to_string() { @@ -403,6 +413,7 @@ pub async fn create_stream( static_schema_flag, schema.clone(), stream_type, + schema_type, ) .await { @@ -426,6 +437,7 @@ pub async fn create_stream( static_schema_flag.to_string(), static_schema, stream_type, + schema_type, ); } Err(err) => { @@ -475,7 +487,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or(""); let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); - + let schema_type = stream_metadata.schema_type.as_deref().unwrap_or(""); metadata::STREAM_INFO.add_stream( stream_name.to_string(), stream_metadata.created_at, @@ -485,6 +497,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< static_schema_flag.to_string(), static_schema, stream_type, + schema_type, ); } else { return Ok(false); diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 4a4259354..14ece9918 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -35,6 +35,7 @@ const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; const STREAM_TYPE_KEY: &str = "x-p-stream-type"; +const SCHEMA_TYPE_KEY: &str = "x-p-schema-type"; const OIDC_SCOPE: &str = "openid profile email"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; @@ -47,6 +48,6 @@ const TRINO_USER: &str = "x-trino-user"; // constants for log Source values for known sources and formats const LOG_SOURCE_KINESIS: &str = "kinesis"; - +const LOG_SOURCE_OTEL: &str = "otel"; // AWS Kinesis constants const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes"; diff --git a/src/metadata.rs b/src/metadata.rs index f768a4e88..b71e479b2 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -58,6 +58,7 @@ pub struct LogStreamMetadata { pub static_schema_flag: Option, pub hot_tier_enabled: Option, pub stream_type: Option, + pub schema_type: Option, } // It is very unlikely that panic will occur when dealing with metadata. @@ -137,6 +138,14 @@ impl StreamInfo { .map(|metadata| metadata.static_schema_flag.clone()) } + #[allow(dead_code)] + pub fn get_schema_type(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.schema_type.clone()) + } + pub fn get_retention(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -199,6 +208,20 @@ impl StreamInfo { }) } + #[allow(dead_code)] + pub fn set_schema_type( + &self, + stream_name: &str, + schema_type: Option, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.schema_type = schema_type; + }) + } + pub fn update_time_partition_limit( &self, stream_name: &str, @@ -265,40 +288,24 @@ impl StreamInfo { static_schema_flag: String, static_schema: HashMap>, stream_type: &str, + schema_type: &str, ) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { - created_at: if created_at.is_empty() { - Local::now().to_rfc3339() - } else { - created_at - }, - time_partition: if time_partition.is_empty() { - None - } else { - Some(time_partition) - }, - time_partition_limit: if time_partition_limit.is_empty() { - None - } else { - Some(time_partition_limit) - }, - custom_partition: if custom_partition.is_empty() { - None - } else { - Some(custom_partition) - }, - static_schema_flag: if static_schema_flag != "true" { - None - } else { - Some(static_schema_flag) - }, - schema: if static_schema.is_empty() { - HashMap::new() - } else { - static_schema - }, + created_at: created_at + .is_empty() + .then(|| Local::now().to_rfc3339()) + .unwrap_or(created_at), + time_partition: (!time_partition.is_empty()).then_some(time_partition), + time_partition_limit: (!time_partition_limit.is_empty()) + .then_some(time_partition_limit), + custom_partition: (!custom_partition.is_empty()).then_some(custom_partition), + static_schema_flag: (static_schema_flag == "true").then_some(static_schema_flag), + schema: (!static_schema.is_empty()) + .then_some(static_schema) + .unwrap_or_default(), stream_type: Some(stream_type.to_string()), + schema_type: (!schema_type.is_empty()).then(|| schema_type.to_string()), ..Default::default() }; map.insert(stream_name, metadata); @@ -340,6 +347,7 @@ impl StreamInfo { static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, stream_type: meta.stream_type, + schema_type: meta.schema_type, }; let mut map = self.write().expect(LOCK_EXPECT); @@ -493,6 +501,7 @@ pub async fn load_stream_metadata_on_server_start( static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, stream_type: meta.stream_type, + schema_type: meta.schema_type, }; let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a018c2b1c..e1dc5b042 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -104,7 +104,10 @@ pub struct ObjectStoreFormat { pub static_schema_flag: Option, #[serde(skip_serializing_if = "Option::is_none")] pub hot_tier_enabled: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub stream_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_type: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -124,7 +127,10 @@ pub struct StreamInfo { pub custom_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub stream_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_type: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] @@ -191,6 +197,7 @@ impl Default for ObjectStoreFormat { custom_partition: None, static_schema_flag: None, hot_tier_enabled: None, + schema_type: None, } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 78f51685d..8c0aea289 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -145,6 +145,7 @@ pub trait ObjectStorage: Send + Sync + 'static { static_schema_flag: &str, schema: Arc, stream_type: &str, + schema_type: &str, ) -> Result { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); @@ -172,6 +173,11 @@ pub trait ObjectStorage: Send + Sync + 'static { } else { format.static_schema_flag = Some(static_schema_flag.to_string()); } + if schema_type.is_empty() { + format.schema_type = None; + } else { + format.schema_type = Some(schema_type.to_string()); + } let format_json = to_bytes(&format); self.put_object(&schema_path(stream_name), to_bytes(&schema)) .await?;