Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: schema detection for k8s events #1011

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ LABEL org.opencontainers.image.licenses="AGPL-3.0"

WORKDIR /parseable
COPY . .

RUN cargo build --release

# final stage
Expand All @@ -32,5 +33,6 @@ WORKDIR /parseable

# Copy the static shell into base image.
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable
COPY --from=builder /parseable/src/event/known-formats /parseable/src/event/known-formats

CMD ["/usr/bin/parseable"]
1 change: 1 addition & 0 deletions Dockerfile.debug
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ FROM docker.io/debian:bookworm-slim
WORKDIR /parseable

COPY --from=builder /parseable/target/debug/parseable /usr/bin/parseable
COPY --from=builder /parseable/src/event/known-formats /parseable/src/event/known-formats

CMD ["/usr/bin/parseable"]
1 change: 1 addition & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ WORKDIR /parseable

# Copy the static shell into base image.
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable
COPY --from=builder /parseable/src/event/known-formats /parseable/src/event/known-formats

CMD ["/usr/bin/parseable"]
92 changes: 92 additions & 0 deletions src/event/detect_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use arrow_json::reader::infer_json_schema_from_iterator;
use arrow_schema::Schema;
use once_cell::sync::OnceCell;
use serde::Deserialize;
use serde_json::Value;
use std::{collections::HashMap, fs, path::Path};

use crate::{event::format::update_data_type_to_datetime, utils::json::flatten_json_body};

// Expose some static variables for internal usage
pub static KNOWN_SCHEMA_LIST: OnceCell<HashMap<String, Schema>> = OnceCell::new();
const FORMATS_JSON: &str = include_str!("known-formats/formats.json");

#[derive(Debug, Deserialize)]
struct Format {
name: String,
schema_type: String,
sample_json_path: String,
}

pub fn detect_schema() -> HashMap<String, Schema> {
let mut known_schema_list: HashMap<String, Schema> = HashMap::new();
let json_data: serde_json::Value = serde_json::from_str(FORMATS_JSON).unwrap();

let formats: Vec<Format> =
serde_json::from_value(json_data).expect("Failed to parse formats.json");

for format in &formats {
let sample_path = Path::new(&format.sample_json_path);
let schema_type = &format.schema_type;
let _name = &format.name;
match fs::read_to_string(sample_path) {
Ok(content) => match serde_json::from_str::<Value>(&content) {
Ok(json) => {
let flattened_json = flatten_json_body(json, None, None, None, false).unwrap();
let sample_json_records = [flattened_json.clone()];
let mut schema =
infer_json_schema_from_iterator(sample_json_records.iter().map(Ok))
.unwrap();
schema = update_data_type_to_datetime(schema, flattened_json, Vec::new());
known_schema_list.insert(schema_type.to_string(), schema);
}
Err(err) => eprintln!("Invalid JSON in {}: {}", sample_path.display(), err),
},
Err(err) => eprintln!("Failed to read {}: {}", sample_path.display(), err),
}
}
prepare_known_schema_list(known_schema_list.clone());
known_schema_list
}

pub fn prepare_known_schema_list(known_schema_list: HashMap<String, Schema>) {
KNOWN_SCHEMA_LIST
.set(known_schema_list)
.expect("only set once")
}

pub fn get_known_schema_list() -> &'static HashMap<String, Schema> {
KNOWN_SCHEMA_LIST
.get()
.expect("fetch schema list from static variable")
}

pub fn validate_schema_type(schema: &Schema) -> String {
let known_schema_list = get_known_schema_list();
let mut schema_type = String::default();
for (known_schema_type, known_schema) in known_schema_list.iter() {
if known_schema == schema {
schema_type = known_schema_type.to_string();
break;
}
}
schema_type
}
7 changes: 7 additions & 0 deletions src/event/known-formats/formats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
{
"name": "kubernetes",
"schema_type": "kubernetes-events",
"sample_json_path": "src/event/known-formats/kubernetes-events-sample/kubernetes-events-sample.json"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
{
"apiVersion": "v1",
"items": [
{
"apiVersion": "v1",
"count": 1,
"eventTime": null,
"firstTimestamp": "2024-11-08T10:17:18Z",
"involvedObject": {
"apiVersion": "v1",
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
"kind": "Pod",
"name": "vka-vantage-kubernetes-agent-0",
"namespace": "vantage",
"resourceVersion": "15629581",
"uid": "3fa579b0-0c6f-4f44-a320-69389c8f607a"
},
"kind": "Event",
"lastTimestamp": "2024-11-08T10:17:18Z",
"message": "Stopping container vantage-kubernetes-agent",
"metadata": {
"creationTimestamp": "2024-11-08T10:17:18Z",
"name": "vka-vantage-kubernetes-agent-0.1805f6d7de4bc710",
"namespace": "vantage",
"resourceVersion": "25741805",
"uid": "629a5864-06de-414d-8ad7-b7637b8cbfa0"
},
"reason": "Killing",
"reportingComponent": "kubelet",
"reportingInstance": "ip-10-0-2-170.ec2.internal",
"source": {
"component": "kubelet",
"host": "ip-10-0-2-170.ec2.internal"
},
"type": "Normal"
},
{
"apiVersion": "v1",
"count": 1,
"eventTime": null,
"firstTimestamp": "2024-11-08T10:17:19Z",
"involvedObject": {
"apiVersion": "v1",
"kind": "Pod",
"name": "vka-vantage-kubernetes-agent-0",
"namespace": "vantage",
"resourceVersion": "25741822",
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
},
"kind": "Event",
"lastTimestamp": "2024-11-08T10:17:19Z",
"message": "Successfully assigned vantage/vka-vantage-kubernetes-agent-0 to ip-10-0-2-170.ec2.internal",
"metadata": {
"creationTimestamp": "2024-11-08T10:17:19Z",
"name": "vka-vantage-kubernetes-agent-0.1805f6d80c652af1",
"namespace": "vantage",
"resourceVersion": "25741826",
"uid": "e1dab7eb-ab65-44be-9b75-2f400cd70275"
},
"reason": "Scheduled",
"reportingComponent": "default-scheduler",
"reportingInstance": "",
"source": {
"component": "default-scheduler"
},
"type": "Normal"
},
{
"apiVersion": "v1",
"count": 1,
"eventTime": null,
"firstTimestamp": "2024-11-08T10:17:22Z",
"involvedObject": {
"apiVersion": "v1",
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
"kind": "Pod",
"name": "vka-vantage-kubernetes-agent-0",
"namespace": "vantage",
"resourceVersion": "25741823",
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
},
"kind": "Event",
"lastTimestamp": "2024-11-08T10:17:22Z",
"message": "Container image \"quay.io/vantage-sh/kubernetes-agent:1.0.26\" already present on machine",
"metadata": {
"creationTimestamp": "2024-11-08T10:17:22Z",
"name": "vka-vantage-kubernetes-agent-0.1805f6d8d0c1d741",
"namespace": "vantage",
"resourceVersion": "25741846",
"uid": "6c9c24bb-4ff3-486f-8151-91d1dad159ee"
},
"reason": "Pulled",
"reportingComponent": "kubelet",
"reportingInstance": "ip-10-0-2-170.ec2.internal",
"source": {
"component": "kubelet",
"host": "ip-10-0-2-170.ec2.internal"
},
"type": "Normal"
},
{
"apiVersion": "v1",
"count": 1,
"eventTime": null,
"firstTimestamp": "2024-11-08T10:17:22Z",
"involvedObject": {
"apiVersion": "v1",
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
"kind": "Pod",
"name": "vka-vantage-kubernetes-agent-0",
"namespace": "vantage",
"resourceVersion": "25741823",
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
},
"kind": "Event",
"lastTimestamp": "2024-11-08T10:17:22Z",
"message": "Created container vantage-kubernetes-agent",
"metadata": {
"creationTimestamp": "2024-11-08T10:17:22Z",
"name": "vka-vantage-kubernetes-agent-0.1805f6d8d271c600",
"namespace": "vantage",
"resourceVersion": "25741847",
"uid": "d23e308a-b17e-42ba-a5ed-3a55c3d9e0d2"
},
"reason": "Created",
"reportingComponent": "kubelet",
"reportingInstance": "ip-10-0-2-170.ec2.internal",
"source": {
"component": "kubelet",
"host": "ip-10-0-2-170.ec2.internal"
},
"type": "Normal"
},
{
"apiVersion": "v1",
"count": 1,
"eventTime": null,
"firstTimestamp": "2024-11-08T10:17:22Z",
"involvedObject": {
"apiVersion": "v1",
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
"kind": "Pod",
"name": "vka-vantage-kubernetes-agent-0",
"namespace": "vantage",
"resourceVersion": "25741823",
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
},
"kind": "Event",
"lastTimestamp": "2024-11-08T10:17:22Z",
"message": "Started container vantage-kubernetes-agent",
"metadata": {
"creationTimestamp": "2024-11-08T10:17:23Z",
"name": "vka-vantage-kubernetes-agent-0.1805f6d8d87a3795",
"namespace": "vantage",
"resourceVersion": "25741848",
"uid": "e48c06da-3fbf-41a1-8685-6224854f0391"
},
"reason": "Started",
"reportingComponent": "kubelet",
"reportingInstance": "ip-10-0-2-170.ec2.internal",
"source": {
"component": "kubelet",
"host": "ip-10-0-2-170.ec2.internal"
},
"type": "Normal"
},
{
"apiVersion": "v1",
"count": 1,
"eventTime": null,
"firstTimestamp": "2024-11-08T10:17:23Z",
"involvedObject": {
"apiVersion": "v1",
"fieldPath": "spec.containers{vantage-kubernetes-agent}",
"kind": "Pod",
"name": "vka-vantage-kubernetes-agent-0",
"namespace": "vantage",
"resourceVersion": "25741823",
"uid": "0118c8be-55df-40bf-96ed-41bb11b5a771"
},
"kind": "Event",
"lastTimestamp": "2024-11-08T10:17:23Z",
"message": "Readiness probe failed: Get \"http://10.0.2.143:9010/healthz\": dial tcp 10.0.2.143:9010: connect: connection refused",
"metadata": {
"creationTimestamp": "2024-11-08T10:17:23Z",
"name": "vka-vantage-kubernetes-agent-0.1805f6d8f61959d7",
"namespace": "vantage",
"resourceVersion": "25741851",
"uid": "6199c62b-9ca5-4c46-abcb-53137ed24c47"
},
"reason": "Unhealthy",
"reportingComponent": "kubelet",
"reportingInstance": "ip-10-0-2-170.ec2.internal",
"source": {
"component": "kubelet",
"host": "ip-10-0-2-170.ec2.internal"
},
"type": "Warning"
},
{
"apiVersion": "v1",
"count": 1,
"eventTime": null,
"firstTimestamp": "2024-11-08T10:17:19Z",
"involvedObject": {
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"name": "vka-vantage-kubernetes-agent",
"namespace": "vantage",
"resourceVersion": "25741814",
"uid": "3f91d728-f31f-4582-8639-df259d97ac55"
},
"kind": "Event",
"lastTimestamp": "2024-11-08T10:17:19Z",
"message": "create Pod vka-vantage-kubernetes-agent-0 in StatefulSet vka-vantage-kubernetes-agent successful",
"metadata": {
"creationTimestamp": "2024-11-08T10:17:19Z",
"name": "vka-vantage-kubernetes-agent.1805f6d80bd97994",
"namespace": "vantage",
"resourceVersion": "25741827",
"uid": "c5bf4dee-649f-48ba-b6da-c6ccf4e9262c"
},
"reason": "SuccessfulCreate",
"reportingComponent": "statefulset-controller",
"reportingInstance": "",
"source": {
"component": "statefulset-controller"
},
"type": "Normal"
}
],
"kind": "List",
"metadata": {
"resourceVersion": ""
}
}
Loading
Loading