Skip to content

Commit

Permalink
update done to read the static files, clippy and deepsource fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsinhaparseable committed Nov 28, 2024
1 parent b9f477b commit 3dd640b
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 363 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ LABEL org.opencontainers.image.licenses="AGPL-3.0"

WORKDIR /parseable
COPY . .

RUN cargo build --release

# final stage
Expand All @@ -32,5 +33,6 @@ WORKDIR /parseable

# Copy the static shell into base image.
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable
COPY --from=builder /parseable/src/event/known-formats /parseable/src/event/known-formats

CMD ["/usr/bin/parseable"]
53 changes: 35 additions & 18 deletions src/event/detect_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,49 @@
use arrow_json::reader::infer_json_schema_from_iterator;
use arrow_schema::Schema;
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use serde::Deserialize;
use serde_json::Value;
use std::{collections::HashMap, fs, path::Path};

use crate::{event::format::update_data_type_to_datetime, utils::json::flatten_json_body};

// Expose some static variables for internal usage
pub static KNOWN_SCHEMA_LIST: OnceCell<HashMap<String, Schema>> = OnceCell::new();
const FORMATS_JSON: &str = include_str!("known-formats/formats.json");

#[derive(Debug, Deserialize)]
struct Format {
name: String,
schema_type: String,
sample_json_path: String,
}

pub fn detect_schema() -> HashMap<String, Schema> {
let mut known_schema_list: HashMap<String, Schema> = HashMap::new();
//read file formats.json
let formats_file = std::fs::File::open("src/event/known-formats/formats.json").unwrap();
let formats_reader = std::io::BufReader::new(formats_file);
let formats: serde_json::Value = serde_json::from_reader(formats_reader).unwrap();
//iterate over the formats
for format in formats.as_array().unwrap() {
let schema_type = format["schema_type"].as_str().unwrap();
let sample_json_path = format["sample_json_path"].as_str().unwrap();
let sample_file = std::fs::File::open(sample_json_path).unwrap();
let sample_reader = std::io::BufReader::new(sample_file);
let sample_json: serde_json::Value = serde_json::from_reader(sample_reader).unwrap();
let flattened_json = flatten_json_body(sample_json, None, None, None, false).unwrap();
let sample_json_records = [flattened_json.clone()];
let mut schema =
infer_json_schema_from_iterator(sample_json_records.iter().map(Ok)).unwrap();
schema = update_data_type_to_datetime(schema, flattened_json, Vec::new());
known_schema_list.insert(schema_type.to_string(), schema);
let json_data: serde_json::Value = serde_json::from_str(FORMATS_JSON).unwrap();

let formats: Vec<Format> =
serde_json::from_value(json_data).expect("Failed to parse formats.json");

for format in &formats {
let sample_path = Path::new(&format.sample_json_path);
let schema_type = &format.schema_type;
let _name = &format.name;
match fs::read_to_string(sample_path) {
Ok(content) => match serde_json::from_str::<Value>(&content) {
Ok(json) => {
let flattened_json = flatten_json_body(json, None, None, None, false).unwrap();
let sample_json_records = [flattened_json.clone()];
let mut schema =
infer_json_schema_from_iterator(sample_json_records.iter().map(Ok))
.unwrap();
schema = update_data_type_to_datetime(schema, flattened_json, Vec::new());
known_schema_list.insert(schema_type.to_string(), schema);
}
Err(err) => eprintln!("Invalid JSON in {}: {}", sample_path.display(), err),
},
Err(err) => eprintln!("Failed to read {}: {}", sample_path.display(), err),
}
}
prepare_known_schema_list(known_schema_list.clone());
known_schema_list
Expand Down
Loading

0 comments on commit 3dd640b

Please sign in to comment.