Skip to content

Commit

Permalink
enhancement: add schema type to stream info
Browse files Browse the repository at this point in the history
add optional field `schema_type` when creating stream
add this field to storage and STREAM_INFO

use this field for known log sources like otel, k8s-events etc

for API `/v1/logs` to ingest otel-logs, add `schema_type`=`otel`
for API `/api/v1/ingest` with header `X-P-Log-Source`=`otel`
add `schema_type`=`otel`

this field will be used for schema detection
  • Loading branch information
nikhilsinhaparseable committed Dec 8, 2024
1 parent a08e096 commit a51d8c3
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 70 deletions.
13 changes: 8 additions & 5 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
stream_name
)));
}
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string(), "").await?;

flatten_and_push_logs(req, body, stream_name).await?;
flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Header(ParseHeaderError::MissingStreamName))
Expand Down Expand Up @@ -116,8 +116,9 @@ pub async fn handle_otel_ingestion(
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
push_logs(stream_name.to_string(), req.clone(), body).await?;
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string(), "otel")
.await?;
push_logs(&stream_name, req.clone(), body).await?;
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Expand Down Expand Up @@ -150,7 +151,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
}
}

flatten_and_push_logs(req, body, stream_name).await?;
flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
}

Expand Down Expand Up @@ -178,6 +179,7 @@ pub async fn push_logs_unchecked(
pub async fn create_stream_if_not_exists(
stream_name: &str,
stream_type: &str,
schema_type: &str,
) -> Result<bool, PostError> {
let mut stream_exists = false;
if STREAM_INFO.stream_exists(stream_name) {
Expand All @@ -202,6 +204,7 @@ pub async fn create_stream_if_not_exists(
"",
Arc::new(Schema::empty()),
stream_type,
schema_type,
)
.await?;

Expand Down
13 changes: 11 additions & 2 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ fn remove_id_from_alerts(value: &mut Value) {
}
}

#[allow(clippy::too_many_arguments)]
pub async fn create_stream(
stream_name: String,
time_partition: &str,
Expand All @@ -513,6 +514,7 @@ pub async fn create_stream(
static_schema_flag: &str,
schema: Arc<Schema>,
stream_type: &str,
schema_type: &str,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal.to_string() {
Expand All @@ -530,6 +532,7 @@ pub async fn create_stream(
static_schema_flag,
schema.clone(),
stream_type,
schema_type,
)
.await
{
Expand All @@ -553,6 +556,7 @@ pub async fn create_stream(
static_schema_flag.to_string(),
static_schema,
stream_type,
schema_type,
);
}
Err(err) => {
Expand Down Expand Up @@ -602,6 +606,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
custom_partition: stream_meta.custom_partition.clone(),
cache_enabled: stream_meta.cache_enabled,
static_schema_flag: stream_meta.static_schema_flag.clone(),
schema_type: stream_meta.schema_type.clone(),
};

// get the other info from
Expand Down Expand Up @@ -744,8 +749,12 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
}

pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
if let Ok(stream_exists) =
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await
if let Ok(stream_exists) = create_stream_if_not_exists(
INTERNAL_STREAM_NAME,
&StreamType::Internal.to_string(),
INTERNAL_STREAM_NAME,
)
.await
{
if stream_exists {
return Ok(());
Expand Down
58 changes: 30 additions & 28 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
},
handlers::{
http::{ingest::PostError, kinesis},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
},
metadata::STREAM_INFO,
storage::StreamType,
Expand All @@ -41,42 +41,44 @@ use crate::{
pub async fn flatten_and_push_logs(
req: HttpRequest,
body: Bytes,
stream_name: String,
stream_name: &str,
) -> Result<(), PostError> {
let log_source = req
.headers()
.get(LOG_SOURCE_KEY)
.map(|header| header.to_str().unwrap_or_default())
.unwrap_or_default();
if log_source == LOG_SOURCE_KINESIS {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.clone(), req.clone(), body.clone()).await?;
match log_source {
LOG_SOURCE_KINESIS => {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name, req.clone(), body.clone()).await?;
}
return Ok(());
}
} else {
push_logs(stream_name, req, body).await?;
LOG_SOURCE_OTEL => {
STREAM_INFO.set_schema_type(stream_name, Some(LOG_SOURCE_OTEL.to_string()))?
}
_ => {}
}
push_logs(stream_name, req.clone(), body.clone()).await?;
Ok(())
}

pub async fn push_logs(
stream_name: String,
req: HttpRequest,
body: Bytes,
) -> Result<(), PostError> {
let time_partition = STREAM_INFO.get_time_partition(&stream_name)?;
let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?;
let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?;
pub async fn push_logs(stream_name: &str, req: HttpRequest, body: Bytes) -> Result<(), PostError> {
let time_partition = STREAM_INFO.get_time_partition(stream_name)?;
let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?;
let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?;
let body_val: Value = serde_json::from_slice(&body)?;
let size: usize = body.len();
let mut parsed_timestamp = Utc::now().naive_utc();
if time_partition.is_none() {
if custom_partition.is_none() {
let size = size as u64;
create_process_record_batch(
stream_name.clone(),
stream_name,
req.clone(),
body_val.clone(),
static_schema_flag.clone(),
Expand All @@ -98,7 +100,7 @@ pub async fn push_logs(

let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
stream_name,
req.clone(),
value.clone(),
static_schema_flag.clone(),
Expand All @@ -121,7 +123,7 @@ pub async fn push_logs(
parsed_timestamp = get_parsed_timestamp(&value, &time_partition);
let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
stream_name,
req.clone(),
value.clone(),
static_schema_flag.clone(),
Expand Down Expand Up @@ -149,7 +151,7 @@ pub async fn push_logs(
parsed_timestamp = get_parsed_timestamp(&value, &time_partition);
let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
stream_name,
req.clone(),
value.clone(),
static_schema_flag.clone(),
Expand All @@ -167,7 +169,7 @@ pub async fn push_logs(

#[allow(clippy::too_many_arguments)]
pub async fn create_process_record_batch(
stream_name: String,
stream_name: &str,
req: HttpRequest,
value: Value,
static_schema_flag: Option<String>,
Expand All @@ -177,15 +179,15 @@ pub async fn create_process_record_batch(
origin_size: u64,
) -> Result<(), PostError> {
let (rb, is_first_event) = get_stream_schema(
stream_name.clone(),
stream_name,
req.clone(),
value.clone(),
static_schema_flag.clone(),
time_partition.clone(),
)?;
event::Event {
rb,
stream_name: stream_name.clone(),
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size,
is_first_event,
Expand All @@ -201,16 +203,16 @@ pub async fn create_process_record_batch(
}

pub fn get_stream_schema(
stream_name: String,
stream_name: &str,
req: HttpRequest,
body: Value,
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
let hash_map = STREAM_INFO.read().unwrap();
let schema = hash_map
.get(&stream_name)
.ok_or(PostError::StreamNotFound(stream_name))?
.get(stream_name)
.ok_or(PostError::StreamNotFound(stream_name.to_string()))?
.schema
.clone();
into_event_batch(req, body, schema, static_schema_flag, time_partition)
Expand Down
20 changes: 16 additions & 4 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use http::StatusCode;
use crate::{
handlers::{
http::logstream::error::{CreateStreamError, StreamError},
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY,
TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
CUSTOM_PARTITION_KEY, SCHEMA_TYPE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
},
metadata::{self, STREAM_INFO},
option::{Mode, CONFIG},
Expand All @@ -48,6 +48,7 @@ pub async fn create_update_stream(
static_schema_flag,
update_stream_flag,
stream_type,
schema_type,
) = fetch_headers_from_put_stream_request(req);

if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream_flag != "true" {
Expand Down Expand Up @@ -113,6 +114,7 @@ pub async fn create_update_stream(
&static_schema_flag,
schema,
&stream_type,
&schema_type,
)
.await?;

Expand Down Expand Up @@ -167,13 +169,14 @@ async fn validate_and_update_custom_partition(

pub fn fetch_headers_from_put_stream_request(
req: &HttpRequest,
) -> (String, String, String, String, String, String) {
) -> (String, String, String, String, String, String, String) {
let mut time_partition = String::default();
let mut time_partition_limit = String::default();
let mut custom_partition = String::default();
let mut static_schema_flag = String::default();
let mut update_stream = String::default();
let mut stream_type = StreamType::UserDefined.to_string();
let mut schema_type = String::default();
req.headers().iter().for_each(|(key, value)| {
if key == TIME_PARTITION_KEY {
time_partition = value.to_str().unwrap().to_string();
Expand All @@ -193,6 +196,9 @@ pub fn fetch_headers_from_put_stream_request(
if key == STREAM_TYPE_KEY {
stream_type = value.to_str().unwrap().to_string();
}
if key == SCHEMA_TYPE_KEY {
schema_type = value.to_str().unwrap().to_string();
}
});

(
Expand All @@ -202,6 +208,7 @@ pub fn fetch_headers_from_put_stream_request(
static_schema_flag,
update_stream,
stream_type,
schema_type,
)
}

Expand Down Expand Up @@ -378,6 +385,7 @@ pub async fn update_custom_partition_in_stream(
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub async fn create_stream(
stream_name: String,
time_partition: &str,
Expand All @@ -386,6 +394,7 @@ pub async fn create_stream(
static_schema_flag: &str,
schema: Arc<Schema>,
stream_type: &str,
schema_type: &str,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal.to_string() {
Expand All @@ -403,6 +412,7 @@ pub async fn create_stream(
static_schema_flag,
schema.clone(),
stream_type,
schema_type,
)
.await
{
Expand All @@ -426,6 +436,7 @@ pub async fn create_stream(
static_schema_flag.to_string(),
static_schema,
stream_type,
schema_type,
);
}
Err(err) => {
Expand Down Expand Up @@ -475,7 +486,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or("");
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");

let schema_type = stream_metadata.schema_type.as_deref().unwrap_or("");
metadata::STREAM_INFO.add_stream(
stream_name.to_string(),
stream_metadata.created_at,
Expand All @@ -485,6 +496,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
static_schema_flag.to_string(),
static_schema,
stream_type,
schema_type,
);
} else {
return Ok(false);
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
const STREAM_TYPE_KEY: &str = "x-p-stream-type";
const SCHEMA_TYPE_KEY: &str = "x-p-schema-type";
const OIDC_SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
Expand All @@ -47,6 +48,6 @@ const TRINO_USER: &str = "x-trino-user";

// constants for log Source values for known sources and formats
const LOG_SOURCE_KINESIS: &str = "kinesis";

const LOG_SOURCE_OTEL: &str = "otel";
// AWS Kinesis constants
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
Loading

0 comments on commit a51d8c3

Please sign in to comment.