diff --git a/src/event/detect_schema.rs b/src/event/detect_schema.rs new file mode 100644 index 00000000..e8e0ff73 --- /dev/null +++ b/src/event/detect_schema.rs @@ -0,0 +1,75 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use arrow_json::reader::infer_json_schema_from_iterator; +use arrow_schema::Schema; +use once_cell::sync::OnceCell; +use std::collections::HashMap; + +use crate::{event::format::update_data_type_to_datetime, utils::json::flatten_json_body}; + +// Expose some static variables for internal usage +pub static KNOWN_SCHEMA_LIST: OnceCell> = OnceCell::new(); + +pub fn detect_schema() -> HashMap { + let mut known_schema_list: HashMap = HashMap::new(); + //read file formats.json + let formats_file = std::fs::File::open("src/event/known-formats/formats.json").unwrap(); + let formats_reader = std::io::BufReader::new(formats_file); + let formats: serde_json::Value = serde_json::from_reader(formats_reader).unwrap(); + //iterate over the formats + for format in formats.as_array().unwrap() { + let schema_type = format["schema_type"].as_str().unwrap(); + let sample_json_path = format["sample_json_path"].as_str().unwrap(); + let sample_file = std::fs::File::open(sample_json_path).unwrap(); + let sample_reader = std::io::BufReader::new(sample_file); + let sample_json: serde_json::Value = serde_json::from_reader(sample_reader).unwrap(); + let flattened_json = flatten_json_body(sample_json, None, None, None, false).unwrap(); + let sample_json_records = [flattened_json.clone()]; + let mut schema = + infer_json_schema_from_iterator(sample_json_records.iter().map(Ok)).unwrap(); + schema = update_data_type_to_datetime(schema, flattened_json, Vec::new()); + known_schema_list.insert(schema_type.to_string(), schema); + } + prepare_known_schema_list(known_schema_list.clone()); + known_schema_list +} + +pub fn prepare_known_schema_list(known_schema_list: HashMap) { + KNOWN_SCHEMA_LIST + .set(known_schema_list) + .expect("only set once") +} + +pub fn get_known_schema_list() -> &'static HashMap { + KNOWN_SCHEMA_LIST + .get() + .expect("fetch schema list from static variable") +} + +pub fn validate_schema_type(schema: &Schema) -> String { + let known_schema_list = get_known_schema_list(); + let mut schema_type = String::default(); + for (known_schema_type, known_schema) in known_schema_list.iter() { + if known_schema == schema { + schema_type = known_schema_type.to_string(); + break; + } + } + schema_type +} diff --git a/src/event/known-formats/formats.json b/src/event/known-formats/formats.json new file mode 100644 index 00000000..681b95e0 --- /dev/null +++ b/src/event/known-formats/formats.json @@ -0,0 +1,7 @@ +[ + { + "name": "kubernetes", + "schema_type": "kubernetes-events", + "sample_json_path": "src/event/known-formats/kubernetes-events-sample/kubernetes-events-sample.json" + } +] \ No newline at end of file diff --git a/src/event/known-formats/kubernetes-events-sample/kubernetes-events-sample.json b/src/event/known-formats/kubernetes-events-sample/kubernetes-events-sample.json new file mode 100644 index 00000000..cd85c2a1 --- /dev/null +++ b/src/event/known-formats/kubernetes-events-sample/kubernetes-events-sample.json @@ -0,0 +1,236 @@ +{ + "apiVersion": "v1", + "items": [ + { + "apiVersion": "v1", + "count": 1, + "eventTime": null, + "firstTimestamp": "2024-11-08T10:17:18Z", + "involvedObject": { + "apiVersion": "v1", + "fieldPath": "spec.containers{vantage-kubernetes-agent}", + "kind": "Pod", + "name": "vka-vantage-kubernetes-agent-0", + "namespace": "vantage", + "resourceVersion": "15629581", + "uid": "3fa579b0-0c6f-4f44-a320-69389c8f607a" + }, + "kind": "Event", + "lastTimestamp": "2024-11-08T10:17:18Z", + "message": "Stopping container vantage-kubernetes-agent", + "metadata": { + "creationTimestamp": "2024-11-08T10:17:18Z", + "name": "vka-vantage-kubernetes-agent-0.1805f6d7de4bc710", + "namespace": "vantage", + "resourceVersion": "25741805", + "uid": "629a5864-06de-414d-8ad7-b7637b8cbfa0" + }, + "reason": "Killing", + "reportingComponent": "kubelet", + "reportingInstance": "ip-10-0-2-170.ec2.internal", + "source": { + "component": "kubelet", + "host": "ip-10-0-2-170.ec2.internal" + }, + "type": "Normal" + }, + { + "apiVersion": "v1", + "count": 1, + "eventTime": null, + "firstTimestamp": "2024-11-08T10:17:19Z", + "involvedObject": { + "apiVersion": "v1", + "kind": "Pod", + "name": "vka-vantage-kubernetes-agent-0", + "namespace": "vantage", + "resourceVersion": "25741822", + "uid": "0118c8be-55df-40bf-96ed-41bb11b5a771" + }, + "kind": "Event", + "lastTimestamp": "2024-11-08T10:17:19Z", + "message": "Successfully assigned vantage/vka-vantage-kubernetes-agent-0 to ip-10-0-2-170.ec2.internal", + "metadata": { + "creationTimestamp": "2024-11-08T10:17:19Z", + "name": "vka-vantage-kubernetes-agent-0.1805f6d80c652af1", + "namespace": "vantage", + "resourceVersion": "25741826", + "uid": "e1dab7eb-ab65-44be-9b75-2f400cd70275" + }, + "reason": "Scheduled", + "reportingComponent": "default-scheduler", + "reportingInstance": "", + "source": { + "component": "default-scheduler" + }, + "type": "Normal" + }, + { + "apiVersion": "v1", + "count": 1, + "eventTime": null, + "firstTimestamp": "2024-11-08T10:17:22Z", + "involvedObject": { + "apiVersion": "v1", + "fieldPath": "spec.containers{vantage-kubernetes-agent}", + "kind": "Pod", + "name": "vka-vantage-kubernetes-agent-0", + "namespace": "vantage", + "resourceVersion": "25741823", + "uid": "0118c8be-55df-40bf-96ed-41bb11b5a771" + }, + "kind": "Event", + "lastTimestamp": "2024-11-08T10:17:22Z", + "message": "Container image \"quay.io/vantage-sh/kubernetes-agent:1.0.26\" already present on machine", + "metadata": { + "creationTimestamp": "2024-11-08T10:17:22Z", + "name": "vka-vantage-kubernetes-agent-0.1805f6d8d0c1d741", + "namespace": "vantage", + "resourceVersion": "25741846", + "uid": "6c9c24bb-4ff3-486f-8151-91d1dad159ee" + }, + "reason": "Pulled", + "reportingComponent": "kubelet", + "reportingInstance": "ip-10-0-2-170.ec2.internal", + "source": { + "component": "kubelet", + "host": "ip-10-0-2-170.ec2.internal" + }, + "type": "Normal" + }, + { + "apiVersion": "v1", + "count": 1, + "eventTime": null, + "firstTimestamp": "2024-11-08T10:17:22Z", + "involvedObject": { + "apiVersion": "v1", + "fieldPath": "spec.containers{vantage-kubernetes-agent}", + "kind": "Pod", + "name": "vka-vantage-kubernetes-agent-0", + "namespace": "vantage", + "resourceVersion": "25741823", + "uid": "0118c8be-55df-40bf-96ed-41bb11b5a771" + }, + "kind": "Event", + "lastTimestamp": "2024-11-08T10:17:22Z", + "message": "Created container vantage-kubernetes-agent", + "metadata": { + "creationTimestamp": "2024-11-08T10:17:22Z", + "name": "vka-vantage-kubernetes-agent-0.1805f6d8d271c600", + "namespace": "vantage", + "resourceVersion": "25741847", + "uid": "d23e308a-b17e-42ba-a5ed-3a55c3d9e0d2" + }, + "reason": "Created", + "reportingComponent": "kubelet", + "reportingInstance": "ip-10-0-2-170.ec2.internal", + "source": { + "component": "kubelet", + "host": "ip-10-0-2-170.ec2.internal" + }, + "type": "Normal" + }, + { + "apiVersion": "v1", + "count": 1, + "eventTime": null, + "firstTimestamp": "2024-11-08T10:17:22Z", + "involvedObject": { + "apiVersion": "v1", + "fieldPath": "spec.containers{vantage-kubernetes-agent}", + "kind": "Pod", + "name": "vka-vantage-kubernetes-agent-0", + "namespace": "vantage", + "resourceVersion": "25741823", + "uid": "0118c8be-55df-40bf-96ed-41bb11b5a771" + }, + "kind": "Event", + "lastTimestamp": "2024-11-08T10:17:22Z", + "message": "Started container vantage-kubernetes-agent", + "metadata": { + "creationTimestamp": "2024-11-08T10:17:23Z", + "name": "vka-vantage-kubernetes-agent-0.1805f6d8d87a3795", + "namespace": "vantage", + "resourceVersion": "25741848", + "uid": "e48c06da-3fbf-41a1-8685-6224854f0391" + }, + "reason": "Started", + "reportingComponent": "kubelet", + "reportingInstance": "ip-10-0-2-170.ec2.internal", + "source": { + "component": "kubelet", + "host": "ip-10-0-2-170.ec2.internal" + }, + "type": "Normal" + }, + { + "apiVersion": "v1", + "count": 1, + "eventTime": null, + "firstTimestamp": "2024-11-08T10:17:23Z", + "involvedObject": { + "apiVersion": "v1", + "fieldPath": "spec.containers{vantage-kubernetes-agent}", + "kind": "Pod", + "name": "vka-vantage-kubernetes-agent-0", + "namespace": "vantage", + "resourceVersion": "25741823", + "uid": "0118c8be-55df-40bf-96ed-41bb11b5a771" + }, + "kind": "Event", + "lastTimestamp": "2024-11-08T10:17:23Z", + "message": "Readiness probe failed: Get \"http://10.0.2.143:9010/healthz\": dial tcp 10.0.2.143:9010: connect: connection refused", + "metadata": { + "creationTimestamp": "2024-11-08T10:17:23Z", + "name": "vka-vantage-kubernetes-agent-0.1805f6d8f61959d7", + "namespace": "vantage", + "resourceVersion": "25741851", + "uid": "6199c62b-9ca5-4c46-abcb-53137ed24c47" + }, + "reason": "Unhealthy", + "reportingComponent": "kubelet", + "reportingInstance": "ip-10-0-2-170.ec2.internal", + "source": { + "component": "kubelet", + "host": "ip-10-0-2-170.ec2.internal" + }, + "type": "Warning" + }, + { + "apiVersion": "v1", + "count": 1, + "eventTime": null, + "firstTimestamp": "2024-11-08T10:17:19Z", + "involvedObject": { + "apiVersion": "apps/v1", + "kind": "StatefulSet", + "name": "vka-vantage-kubernetes-agent", + "namespace": "vantage", + "resourceVersion": "25741814", + "uid": "3f91d728-f31f-4582-8639-df259d97ac55" + }, + "kind": "Event", + "lastTimestamp": "2024-11-08T10:17:19Z", + "message": "create Pod vka-vantage-kubernetes-agent-0 in StatefulSet vka-vantage-kubernetes-agent successful", + "metadata": { + "creationTimestamp": "2024-11-08T10:17:19Z", + "name": "vka-vantage-kubernetes-agent.1805f6d80bd97994", + "namespace": "vantage", + "resourceVersion": "25741827", + "uid": "c5bf4dee-649f-48ba-b6da-c6ccf4e9262c" + }, + "reason": "SuccessfulCreate", + "reportingComponent": "statefulset-controller", + "reportingInstance": "", + "source": { + "component": "statefulset-controller" + }, + "type": "Normal" + } + ], + "kind": "List", + "metadata": { + "resourceVersion": "" + } +} \ No newline at end of file diff --git a/src/event/kubernetes_events.rs b/src/event/kubernetes_events.rs new file mode 100644 index 00000000..67bdad12 --- /dev/null +++ b/src/event/kubernetes_events.rs @@ -0,0 +1,413 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::BTreeMap; + +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct KubernetesEvents { + api_version: Option, + items: Vec, + kind: Option, + metadata: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Items { + api_version: Option, + count: Option, + event_time: Option, + first_timestamp: Option, + involved_object: Option, + kind: Option, + last_timestamp: Option, + message: Option, + metadata: Option, + reason: Option, + reporting_component: Option, + reporting_nstance: Option, + source: Option, + type_: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InvolvedObject { + api_version: Option, + field_path: Option, + kind: Option, + name: Option, + namespace: Option, + resource_version: Option, + uid: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Metadata { + creation_timestamp: Option, + name: Option, + namespace: Option, + resource_version: Option, + uid: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Source { + component: Option, + host: Option, +} + +pub async fn flatten_kubernetes_events_log(body: &Bytes) -> Result, anyhow::Error> { + let body_str = std::str::from_utf8(body).unwrap(); + if let Ok(message) = serde_json::from_str::(body_str) { + let mut vec_kubernetes_events_json: Vec> = Vec::new(); + let key_items = "items"; + let key_involved_object = "involved_object"; + let key_metadata = "metadata"; + let key_source = "source"; + for record in message.items.iter() { + let mut kubernetes_events_json: BTreeMap = BTreeMap::new(); + + if let Some(api_version) = &message.api_version { + kubernetes_events_json + .insert("api_version".to_owned(), Value::String(api_version.clone())); + } + + if record.api_version.is_some() { + kubernetes_events_json.insert( + format!("{}_api_version", key_items), + Value::String(record.api_version.clone().unwrap()), + ); + } + + if record.count.is_some() { + kubernetes_events_json.insert( + format!("{}_count", key_items), + Value::Number(serde_json::Number::from(record.count.unwrap())), + ); + } + + if record.event_time.is_some() { + kubernetes_events_json.insert( + format!("{}_event_time", key_items), + Value::String(record.event_time.clone().unwrap()), + ); + } + + if record.first_timestamp.is_some() { + kubernetes_events_json.insert( + format!("{}_first_timestamp", key_items), + Value::String(record.first_timestamp.clone().unwrap()), + ); + } + + if record.involved_object.is_some() { + if record + .involved_object + .as_ref() + .unwrap() + .api_version + .is_some() + { + kubernetes_events_json.insert( + format!("{}_{}_api_version", key_items, key_involved_object), + Value::String( + record + .involved_object + .as_ref() + .unwrap() + .api_version + .clone() + .unwrap(), + ), + ); + } + + if record + .involved_object + .as_ref() + .unwrap() + .field_path + .is_some() + { + kubernetes_events_json.insert( + format!("{}_{}_field_path", key_items, key_involved_object), + Value::String( + record + .involved_object + .as_ref() + .unwrap() + .field_path + .clone() + .unwrap(), + ), + ); + } + + if record.involved_object.as_ref().unwrap().kind.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_kind", key_items, key_involved_object), + Value::String( + record + .involved_object + .as_ref() + .unwrap() + .kind + .clone() + .unwrap(), + ), + ); + } + + if record.involved_object.as_ref().unwrap().name.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_name", key_items, key_involved_object), + Value::String( + record + .involved_object + .as_ref() + .unwrap() + .name + .clone() + .unwrap(), + ), + ); + } + + if record.involved_object.as_ref().unwrap().namespace.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_namespace", key_items, key_involved_object), + Value::String( + record + .involved_object + .as_ref() + .unwrap() + .namespace + .clone() + .unwrap(), + ), + ); + } + + if record + .involved_object + .as_ref() + .unwrap() + .resource_version + .is_some() + { + kubernetes_events_json.insert( + format!("{}_{}_resource_version", key_items, key_involved_object), + Value::String( + record + .involved_object + .as_ref() + .unwrap() + .resource_version + .clone() + .unwrap(), + ), + ); + } + + if record.involved_object.as_ref().unwrap().uid.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_uid", key_items, key_involved_object), + Value::String( + record + .involved_object + .as_ref() + .unwrap() + .uid + .clone() + .unwrap(), + ), + ); + } + } + + if record.kind.is_some() { + kubernetes_events_json.insert( + format!("{}_kind", key_items), + Value::String(record.kind.clone().unwrap()), + ); + } + + if record.last_timestamp.is_some() { + kubernetes_events_json.insert( + format!("{}_last_timestamp", key_items), + Value::String(record.last_timestamp.clone().unwrap()), + ); + } + + if record.message.is_some() { + kubernetes_events_json.insert( + format!("{}_message", key_items), + Value::String(record.message.clone().unwrap()), + ); + } + + if record.metadata.is_some() { + if record + .metadata + .as_ref() + .unwrap() + .creation_timestamp + .is_some() + { + kubernetes_events_json.insert( + format!("{}_{}_creation_timestamp", key_items, key_metadata), + Value::String( + record + .metadata + .as_ref() + .unwrap() + .creation_timestamp + .clone() + .unwrap(), + ), + ); + } + + if record.metadata.as_ref().unwrap().name.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_name", key_items, key_metadata), + Value::String(record.metadata.as_ref().unwrap().name.clone().unwrap()), + ); + } + + if record.metadata.as_ref().unwrap().namespace.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_namespace", key_items, key_metadata), + Value::String(record.metadata.as_ref().unwrap().namespace.clone().unwrap()), + ); + } + + if record.metadata.as_ref().unwrap().resource_version.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_resource_version", key_items, key_metadata), + Value::String( + record + .metadata + .as_ref() + .unwrap() + .resource_version + .clone() + .unwrap(), + ), + ); + } + + if record.metadata.as_ref().unwrap().uid.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_uid", key_items, key_metadata), + Value::String(record.metadata.as_ref().unwrap().uid.clone().unwrap()), + ); + } + } + + if record.reason.is_some() { + kubernetes_events_json.insert( + format!("{}_reason", key_items), + Value::String(record.reason.clone().unwrap()), + ); + } + + if record.reporting_component.is_some() { + kubernetes_events_json.insert( + format!("{}_reporting_component", key_items), + Value::String(record.reporting_component.clone().unwrap()), + ); + } + + if record.reporting_nstance.is_some() { + kubernetes_events_json.insert( + format!("{}_reporting_instance", key_items), + Value::String(record.reporting_nstance.clone().unwrap()), + ); + } + + if record.source.is_some() { + if record.source.as_ref().unwrap().component.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_component", key_items, key_source), + Value::String(record.source.as_ref().unwrap().component.clone().unwrap()), + ); + } + + if record.source.as_ref().unwrap().host.is_some() { + kubernetes_events_json.insert( + format!("{}_{}_host", key_items, key_source), + Value::String(record.source.as_ref().unwrap().host.clone().unwrap()), + ); + } + } + + if record.type_.is_some() { + kubernetes_events_json.insert( + format!("{}_type", key_items), + Value::String(record.type_.clone().unwrap()), + ); + } + + if message.kind.is_some() { + kubernetes_events_json.insert( + "kind".to_owned(), + Value::String(message.kind.clone().unwrap()), + ); + } + + if message.metadata.is_some() + && message + .metadata + .as_ref() + .unwrap() + .resource_version + .is_some() + { + kubernetes_events_json.insert( + format!("{}_resource_version", key_metadata), + Value::String( + message + .metadata + .as_ref() + .unwrap() + .resource_version + .clone() + .unwrap(), + ), + ); + } + + vec_kubernetes_events_json.push(kubernetes_events_json); + } + Ok(vec_kubernetes_events_json + .into_iter() + .map(|btree_map| Value::Object(btree_map.into_iter().collect())) + .collect()) + } else { + Err(anyhow::anyhow!("Could not parse the kubernetes events log")) + } +} diff --git a/src/event/mod.rs b/src/event/mod.rs index eda5dd88..93211030 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -17,7 +17,9 @@ * */ +pub mod detect_schema; pub mod format; +pub mod kubernetes_events; mod writer; use arrow_array::RecordBatch; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index d15f6660..4f69df22 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -17,10 +17,13 @@ */ use super::logstream::error::{CreateStreamError, StreamError}; +use super::logstream::fetch_schema_from_event; use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; use super::otel; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; +use crate::event::detect_schema::validate_schema_type; +use crate::event::kubernetes_events::flatten_kubernetes_events_log; use crate::event::{ self, error::EventError, @@ -32,6 +35,7 @@ use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; +use crate::static_schema::{add_parseable_fields_to_static_schema, ParsedSchema}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; @@ -61,7 +65,8 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result { let mut stream_exists = false; if STREAM_INFO.stream_exists(stream_name) { @@ -201,6 +208,27 @@ pub async fn create_stream_if_not_exists( return Ok(stream_exists); } + let mut static_schema_flag = "false"; + + let (schema_type, mut schema) = if *body != Bytes::default() { + check_known_schema(body.clone())? + } else { + Ok::<(String, Schema), StreamError>((String::default(), Schema::empty()))? + }; + + let mut parseable_schema = Arc::new(Schema::empty()); + if schema_type != String::default() { + static_schema_flag = "true"; + if schema_type == *"kubernetes-events" { + let flattened_logs = flatten_kubernetes_events_log(body).await.unwrap(); + let kubernetes_json = serde_json::to_vec(&flattened_logs)?; + let body = Bytes::from(kubernetes_json); + schema = fetch_schema_from_event(body)?; + } + let parsed_schema = ParsedSchema::from(schema.clone()); + parseable_schema = add_parseable_fields_to_static_schema(parsed_schema)?; + } + // For distributed deployments, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage @@ -215,15 +243,25 @@ pub async fn create_stream_if_not_exists( "", "", "", - "", - Arc::new(Schema::empty()), + static_schema_flag, + parseable_schema, stream_type, + &schema_type, ) .await?; Ok(stream_exists) } +pub fn check_known_schema(body: Bytes) -> Result<(String, Schema), StreamError> { + let schema = fetch_schema_from_event(body)?; + let known_schema_type = validate_schema_type(&schema); + if !known_schema_type.is_empty() { + return Ok((known_schema_type, schema)); + } + Ok((String::default(), Schema::empty())) +} + #[derive(Debug, thiserror::Error)] pub enum PostError { #[error("Stream {0} not found")] diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 8ac283cb..5ae10f65 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -26,7 +26,7 @@ use super::modal::utils::logstream_utils::{ use super::query::update_schema_when_distributed; use crate::alerts::Alerts; use crate::event::format::update_data_type_to_datetime; -use crate::handlers::STREAM_TYPE_KEY; +use crate::handlers::{SCHEMA_TYPE_KEY, STREAM_TYPE_KEY}; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; @@ -34,12 +34,13 @@ use crate::option::{Mode, CONFIG}; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::StreamType; use crate::storage::{retention::Retention, StorageDir, StreamInfo}; +use crate::utils::json::flatten_json_body; use crate::{catalog, event, stats}; use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{web, HttpRequest, HttpResponse, Responder}; use arrow_json::reader::infer_json_schema_from_iterator; use arrow_schema::{Field, Schema}; use bytes::Bytes; @@ -97,8 +98,18 @@ pub async fn list(_: HttpRequest) -> impl Responder { } pub async fn detect_schema(body: Bytes) -> Result { + let schema = fetch_schema_from_event(body)?; + let known_schema_type = event::detect_schema::validate_schema_type(&schema); + Ok(HttpResponse::Ok() + .insert_header((header::CONTENT_TYPE, "application/json")) + .insert_header((SCHEMA_TYPE_KEY, known_schema_type)) + .json(schema)) +} + +pub fn fetch_schema_from_event(body: Bytes) -> Result { let body_val: Value = serde_json::from_slice(&body)?; - let log_records: Vec = match body_val { + let flattened_body = flatten_json_body(body_val, None, None, None, false)?; + let log_records: Vec = match flattened_body { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], _ => { @@ -113,7 +124,8 @@ pub async fn detect_schema(body: Bytes) -> Result { for log_record in log_records { schema = update_data_type_to_datetime(schema, log_record, Vec::new()); } - Ok((web::Json(schema), StatusCode::OK)) + + Ok(schema) } pub async fn schema(req: HttpRequest) -> Result { @@ -505,6 +517,7 @@ fn remove_id_from_alerts(value: &mut Value) { } } +#[allow(clippy::too_many_arguments)] pub async fn create_stream( stream_name: String, time_partition: &str, @@ -513,6 +526,7 @@ pub async fn create_stream( static_schema_flag: &str, schema: Arc, stream_type: &str, + schema_type: &str, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal.to_string() { @@ -530,6 +544,7 @@ pub async fn create_stream( static_schema_flag, schema.clone(), stream_type, + schema_type, ) .await { @@ -553,6 +568,7 @@ pub async fn create_stream( static_schema_flag.to_string(), static_schema, stream_type, + schema_type, ); } Err(err) => { @@ -602,6 +618,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result Result Result<(), StreamError> { - if let Ok(stream_exists) = - create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await + if let Ok(stream_exists) = create_stream_if_not_exists( + INTERNAL_STREAM_NAME, + &StreamType::Internal.to_string(), + &Bytes::new(), + ) + .await { if stream_exists { return Ok(()); diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 0e1226b8..9c52589e 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -24,6 +24,7 @@ use super::IngestorMetadata; use super::OpenIdClient; use super::ParseableServer; use crate::analytics; +use crate::event::detect_schema::detect_schema; use crate::handlers::airplane; use crate::handlers::http::ingest; use crate::handlers::http::logstream; @@ -121,7 +122,7 @@ impl ParseableServer for IngestServer { // set the ingestor metadata self.set_ingestor_metadata().await?; - + detect_schema(); // Ingestors shouldn't have to deal with OpenId auth flow let app = self.start(prometheus, None); diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index ffde5fb1..fbeaaa10 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::event::detect_schema::detect_schema; use crate::handlers::airplane; use crate::handlers::http::base_path; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; @@ -95,6 +96,7 @@ impl ParseableServer for QueryServer { FILTERS.load().await?; DASHBOARDS.load().await?; + detect_schema(); // track all parquet files already in the data directory storage::retention::load_retention_from_global(); diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 48e93161..e6437f1b 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -17,6 +17,8 @@ */ use crate::analytics; +use crate::event::detect_schema::detect_schema; + use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; @@ -110,7 +112,7 @@ impl ParseableServer for Server { FILTERS.load().await?; DASHBOARDS.load().await?; - + detect_schema(); storage::retention::load_retention_from_global(); if let Some(hot_tier_manager) = HotTierManager::global() { diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 81ccfbd4..0e46ceb9 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -31,10 +31,12 @@ use crate::{ event::{ self, format::{self, EventFormat}, + kubernetes_events, }, handlers::{ http::{ingest::PostError, kinesis, otel}, - LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, + LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_KUBERNETES_EVENTS, LOG_SOURCE_OTEL, + PREFIX_META, PREFIX_TAGS, SEPARATOR, }, metadata::STREAM_INFO, storage::StreamType, @@ -46,26 +48,36 @@ pub async fn flatten_and_push_logs( body: Bytes, stream_name: String, ) -> Result<(), PostError> { - //flatten logs - if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) { - let mut json: Vec> = Vec::new(); - let log_source: String = log_source.to_str().unwrap().to_owned(); - match log_source.as_str() { - LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), - LOG_SOURCE_OTEL => { - json = otel::flatten_otel_logs(&body); - } - _ => { - log::warn!("Unknown log source: {}", log_source); - push_logs(stream_name.to_string(), req.clone(), body).await?; - } + let log_source = if let Some((_, header_log_source)) = + req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) + { + header_log_source.to_str().unwrap().to_owned() + } else if let Some(schema_type) = STREAM_INFO.get_schema_type(&stream_name)? { + schema_type + } else { + String::default() + }; + let mut json: Vec> = Vec::new(); + match log_source.as_str() { + LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), + LOG_SOURCE_OTEL => { + json = otel::flatten_otel_logs(&body); } - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); + LOG_SOURCE_KUBERNETES_EVENTS => { + let kubernetes_log_records = + kubernetes_events::flatten_kubernetes_events_log(&body).await?; + let body: Bytes = serde_json::to_vec(&kubernetes_log_records).unwrap().into(); push_logs(stream_name.to_string(), req.clone(), body).await?; } - } else { - push_logs(stream_name.to_string(), req, body).await?; + + _ => { + log::warn!("Unknown log source: {}", log_source); + push_logs(stream_name.to_string(), req.clone(), body).await?; + } + } + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(stream_name.to_string(), req.clone(), body).await?; } Ok(()) } diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 0081a258..e69e7848 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -26,8 +26,8 @@ use http::StatusCode; use crate::{ handlers::{ http::logstream::error::{CreateStreamError, StreamError}, - CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, - TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, + CUSTOM_PARTITION_KEY, SCHEMA_TYPE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, + TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }, metadata::{self, STREAM_INFO}, option::{Mode, CONFIG}, @@ -48,6 +48,7 @@ pub async fn create_update_stream( static_schema_flag, update_stream_flag, stream_type, + schema_type, ) = fetch_headers_from_put_stream_request(req); if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream_flag != "true" { @@ -113,6 +114,7 @@ pub async fn create_update_stream( &static_schema_flag, schema, &stream_type, + &schema_type, ) .await?; @@ -166,13 +168,14 @@ async fn validate_and_update_custom_partition( pub fn fetch_headers_from_put_stream_request( req: &HttpRequest, -) -> (String, String, String, String, String, String) { +) -> (String, String, String, String, String, String, String) { let mut time_partition = String::default(); let mut time_partition_limit = String::default(); let mut custom_partition = String::default(); let mut static_schema_flag = String::default(); let mut update_stream = String::default(); let mut stream_type = StreamType::UserDefined.to_string(); + let mut schema_type = String::default(); req.headers().iter().for_each(|(key, value)| { if key == TIME_PARTITION_KEY { time_partition = value.to_str().unwrap().to_string(); @@ -192,6 +195,10 @@ pub fn fetch_headers_from_put_stream_request( if key == STREAM_TYPE_KEY { stream_type = value.to_str().unwrap().to_string(); } + + if key == SCHEMA_TYPE_KEY { + schema_type = value.to_str().unwrap().to_string(); + } }); ( @@ -201,6 +208,7 @@ pub fn fetch_headers_from_put_stream_request( static_schema_flag, update_stream, stream_type, + schema_type, ) } @@ -377,6 +385,7 @@ pub async fn update_custom_partition_in_stream( Ok(()) } +#[allow(clippy::too_many_arguments)] pub async fn create_stream( stream_name: String, time_partition: &str, @@ -385,6 +394,7 @@ pub async fn create_stream( static_schema_flag: &str, schema: Arc, stream_type: &str, + schema_type: &str, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal.to_string() { @@ -402,6 +412,7 @@ pub async fn create_stream( static_schema_flag, schema.clone(), stream_type, + schema_type, ) .await { @@ -425,6 +436,7 @@ pub async fn create_stream( static_schema_flag.to_string(), static_schema, stream_type, + schema_type, ); } Err(err) => { @@ -474,7 +486,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or(""); let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); - + let schema_type = stream_metadata.schema_type.as_deref().unwrap_or(""); metadata::STREAM_INFO.add_stream( stream_name.to_string(), stream_metadata.created_at, @@ -484,6 +496,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< static_schema_flag.to_string(), static_schema, stream_type, + schema_type, ); } else { return Ok(false); diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 2232ce53..21e47528 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -31,6 +31,7 @@ const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; +const SCHEMA_TYPE_KEY: &str = "x-p-schema-type"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; @@ -52,5 +53,6 @@ const LOG_SOURCE_KINESIS: &str = "kinesis"; // specification as explained here https://opentelemetry.io/docs/specs/otel/logs/data-model/ const LOG_SOURCE_OTEL: &str = "otel"; +const LOG_SOURCE_KUBERNETES_EVENTS: &str = "kubernetes-events"; // AWS Kinesis constants const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes"; diff --git a/src/metadata.rs b/src/metadata.rs index f768a4e8..095fb283 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -58,6 +58,7 @@ pub struct LogStreamMetadata { pub static_schema_flag: Option, pub hot_tier_enabled: Option, pub stream_type: Option, + pub schema_type: Option, } // It is very unlikely that panic will occur when dealing with metadata. @@ -137,6 +138,13 @@ impl StreamInfo { .map(|metadata| metadata.static_schema_flag.clone()) } + pub fn get_schema_type(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.schema_type.clone()) + } + pub fn get_retention(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -265,6 +273,7 @@ impl StreamInfo { static_schema_flag: String, static_schema: HashMap>, stream_type: &str, + schema_type: &str, ) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { @@ -299,6 +308,11 @@ impl StreamInfo { static_schema }, stream_type: Some(stream_type.to_string()), + schema_type: if schema_type.is_empty() { + None + } else { + Some(schema_type.to_string()) + }, ..Default::default() }; map.insert(stream_name, metadata); @@ -340,6 +354,7 @@ impl StreamInfo { static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, stream_type: meta.stream_type, + schema_type: meta.schema_type, }; let mut map = self.write().expect(LOCK_EXPECT); @@ -493,6 +508,7 @@ pub async fn load_stream_metadata_on_server_start( static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, stream_type: meta.stream_type, + schema_type: meta.schema_type, }; let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); diff --git a/src/static_schema.rs b/src/static_schema.rs index 62ea258b..ff2e1b8f 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -56,6 +56,28 @@ pub struct Fields { #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Metadata {} + +impl ParsedSchema { + pub fn from(schema: Schema) -> Self { + let fields = schema + .fields() + .iter() + .map(|field| Fields { + name: field.name().clone(), + data_type: field.data_type().clone(), + nullable: default_nullable(), + dict_id: default_dict_id(), + dict_is_ordered: default_dict_is_ordered(), + metadata: HashMap::new(), + }) + .collect(); + ParsedSchema { + fields, + metadata: HashMap::new(), + } + } +} + pub fn convert_static_schema_to_arrow_schema( static_schema: StaticSchema, time_partition: &str, @@ -136,7 +158,7 @@ pub fn convert_static_schema_to_arrow_schema( add_parseable_fields_to_static_schema(parsed_schema) } -fn add_parseable_fields_to_static_schema( +pub fn add_parseable_fields_to_static_schema( parsed_schema: ParsedSchema, ) -> Result, AnyError> { let mut schema: Vec> = Vec::new(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a018c2b1..e1dc5b04 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -104,7 +104,10 @@ pub struct ObjectStoreFormat { pub static_schema_flag: Option, #[serde(skip_serializing_if = "Option::is_none")] pub hot_tier_enabled: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub stream_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_type: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -124,7 +127,10 @@ pub struct StreamInfo { pub custom_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub stream_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_type: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] @@ -191,6 +197,7 @@ impl Default for ObjectStoreFormat { custom_partition: None, static_schema_flag: None, hot_tier_enabled: None, + schema_type: None, } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 78f51685..8c0aea28 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -145,6 +145,7 @@ pub trait ObjectStorage: Send + Sync + 'static { static_schema_flag: &str, schema: Arc, stream_type: &str, + schema_type: &str, ) -> Result { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); @@ -172,6 +173,11 @@ pub trait ObjectStorage: Send + Sync + 'static { } else { format.static_schema_flag = Some(static_schema_flag.to_string()); } + if schema_type.is_empty() { + format.schema_type = None; + } else { + format.schema_type = Some(schema_type.to_string()); + } let format_json = to_bytes(&format); self.put_object(&schema_path(stream_name), to_bytes(&schema)) .await?;