Skip to content

Commit

Permalink
fix: remove custom flattening for otel logs (#1019)
Browse files Browse the repository at this point in the history
with generic flattening in place,
server doesn't need to perform custom flattening for the otel logs
client does not need to send additional header `X-P-Log-Source=OTEL`

for API `/v1/logs` which is specifically used for otel log ingestion
no custom logic is required to verify if header `X-P-Log-Source` is sent
no custom flattening is required either

server does the flattening if it finds the nested structure and ingests
  • Loading branch information
nikhilsinhaparseable authored Dec 6, 2024
1 parent 935ee79 commit 4af4e6c
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 399 deletions.
23 changes: 2 additions & 21 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
use super::otel;
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;
use crate::event::{
Expand All @@ -27,7 +26,7 @@ use crate::event::{
format::{self, EventFormat},
};
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::STREAM_INFO;
Expand Down Expand Up @@ -115,25 +114,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
{
let log_source: String = log_source.to_str().unwrap().to_owned();
if log_source == LOG_SOURCE_OTEL {
let mut json = otel::flatten_otel_logs(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
return Err(PostError::CustomError("Unknown log source".to_string()));
}
} else {
return Err(PostError::CustomError(
"log source key header is missing".to_string(),
));
}
push_logs(stream_name.to_string(), req.clone(), body).await?;
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Expand Down
1 change: 0 additions & 1 deletion src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub mod logstream;
pub mod middleware;
pub mod modal;
pub mod oidc;
mod otel;
pub mod query;
pub mod rbac;
pub mod role;
Expand Down
36 changes: 13 additions & 23 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
*
*/

use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};

use actix_web::HttpRequest;
use arrow_schema::Field;
Expand All @@ -33,8 +30,8 @@ use crate::{
format::{self, EventFormat},
},
handlers::{
http::{ingest::PostError, kinesis, otel},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
http::{ingest::PostError, kinesis},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
},
metadata::STREAM_INFO,
storage::StreamType,
Expand All @@ -46,26 +43,19 @@ pub async fn flatten_and_push_logs(
body: Bytes,
stream_name: String,
) -> Result<(), PostError> {
//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
LOG_SOURCE_OTEL => {
json = otel::flatten_otel_logs(&body);
}
_ => {
log::warn!("Unknown log source: {}", log_source);
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
}
for record in json.iter_mut() {
let log_source = req
.headers()
.get(LOG_SOURCE_KEY)
.map(|header| header.to_str().unwrap_or_default())
.unwrap_or_default();
if log_source == LOG_SOURCE_KINESIS {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
push_logs(stream_name.clone(), req.clone(), body.clone()).await?;
}
} else {
push_logs(stream_name.to_string(), req, body).await?;
push_logs(stream_name, req, body).await?;
}
Ok(())
}
Expand Down
Loading

0 comments on commit 4af4e6c

Please sign in to comment.