Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
get django_compat test closer to green (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Sep 15, 2023
1 parent af529b1 commit 6ed0609
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
governor = "0.5.1"
tower_governor = "0.0.4"
time = { version = "0.3.20", features = ["formatting", "macros", "serde"] }
time = { version = "0.3.20", features = ["formatting", "macros", "parsing", "serde"] }
tower-http = { version = "0.4.0", features = ["trace"] }
bytes = "1"
anyhow = "1.0"
Expand Down
4 changes: 4 additions & 0 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub async fn event(
client_ip: ip.to_string(),
};

println!("Got context {:?}", &context);

process_events(state.sink.clone(), &events, &context).await?;

Ok(Json(CaptureResponse {
Expand Down Expand Up @@ -131,6 +133,8 @@ pub async fn process_events<'a>(
.map(|e| process_single_event(e, context))
.collect::<Result<Vec<ProcessedEvent>, CaptureError>>()?;

println!("Processed events: {:?}", events);

if events.len() == 1 {
sink.send(events[0].clone()).await?;
} else {
Expand Down
42 changes: 16 additions & 26 deletions capture/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ use uuid::Uuid;
#[derive(Deserialize, Default)]
pub enum Compression {
#[default]
#[serde(rename = "gzip-js")]
GzipJs,
Unsupported,

#[serde(rename = "gzip", alias = "gzip-js")]
Gzip,
}

#[derive(Deserialize, Default)]
Expand All @@ -26,15 +28,6 @@ pub struct EventQuery {

#[serde(alias = "_")]
pub sent_at: Option<i64>,

#[serde(skip_serializing)]
pub token: Option<String>, // Filled by handler

#[serde(skip_serializing)]
pub now: Option<String>, // Filled by handler from timesource

#[serde(skip_serializing)]
pub client_ip: Option<String>, // Filled by handler
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -78,7 +71,7 @@ impl RawEvent {
tracing::debug!(len = bytes.len(), "decoding new event");

let payload = match query.compression {
Some(Compression::GzipJs) => {
Some(Compression::Gzip) => {
let mut d = GzDecoder::new(bytes.reader());
let mut s = String::new();
d.read_to_string(&mut s).map_err(|e| {
Expand All @@ -87,6 +80,12 @@ impl RawEvent {
})?;
s
}
Some(_) => {
return Err(CaptureError::RequestDecodingError(String::from(
"unsupported compression format",
)))
}

None => String::from_utf8(bytes.into()).map_err(|e| {
tracing::error!("failed to decode body: {}", e);
CaptureError::RequestDecodingError(String::from("invalid body encoding"))
Expand All @@ -108,6 +107,7 @@ impl RawEvent {
}
}

#[derive(Debug)]
pub struct ProcessingContext {
pub lib_version: Option<String>,
pub sent_at: Option<OffsetDateTime>,
Expand All @@ -116,22 +116,15 @@ pub struct ProcessingContext {
pub client_ip: String,
}

time::serde::format_description!(
django_iso,
OffsetDateTime,
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:6][offset_hour \
sign:mandatory]:[offset_minute]"
);

#[derive(Clone, Default, Debug, Serialize)]
#[derive(Clone, Default, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
pub uuid: Uuid,
pub distinct_id: String,
pub ip: String,
pub site_url: String,
pub data: String,
pub now: String,
#[serde(with = "django_iso::option")]
#[serde(with = "time::serde::rfc3339::option")]
pub sent_at: Option<OffsetDateTime>,
pub token: String,
}
Expand Down Expand Up @@ -160,16 +153,13 @@ mod tests {
let bytes = Bytes::from(decoded_horrible_blob);
let events = RawEvent::from_bytes(
&EventQuery {
compression: Some(Compression::GzipJs),
compression: Some(Compression::Gzip),
lib_version: None,
sent_at: None,
token: None,
now: None,
client_ip: None,
},
bytes,
);

assert_eq!(events.is_ok(), true);
assert!(events.is_ok());
}
}
4 changes: 2 additions & 2 deletions capture/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl TimeSource for SystemTime {
fn current_time(&self) -> String {
let time = time::OffsetDateTime::now_utc();

time.format(&time::format_description::well_known::Iso8601::DEFAULT)
.expect("failed to iso8601 format timestamp")
time.format(&time::format_description::well_known::Rfc3339)
.expect("failed to format timestamp")
}
}
54 changes: 47 additions & 7 deletions capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use assert_json_diff::assert_json_eq;
use assert_json_diff::assert_json_matches_no_panic;
use async_trait::async_trait;
use axum::http::StatusCode;
use axum_test_helper::TestClient;
Expand All @@ -14,6 +14,8 @@ use serde_json::{json, Value};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::sync::{Arc, Mutex};
use time::format_description::well_known::{Iso8601, Rfc3339};
use time::OffsetDateTime;

#[derive(Debug, Deserialize)]
struct RequestDump {
Expand Down Expand Up @@ -74,8 +76,10 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
let file = File::open(REQUESTS_DUMP_FILE_NAME)?;
let reader = BufReader::new(file);

for line in reader.lines() {
let case: RequestDump = serde_json::from_str(&line?)?;
let mut mismatches = 0;

for (line_number, line_contents) in reader.lines().enumerate() {
let case: RequestDump = serde_json::from_str(&line_contents?)?;
if !case.path.starts_with("/e/") {
println!("Skipping {} test case", &case.path);
continue;
Expand All @@ -93,7 +97,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
let app = router(timesource, sink.clone(), false);

let client = TestClient::new(app);
let mut req = client.post("/i/v0/e/").body(raw_body);
let mut req = client.post(&format!("/i/v0{}", case.path)).body(raw_body);
if !case.content_encoding.is_empty() {
req = req.header("Content-encoding", case.content_encoding);
}
Expand All @@ -104,15 +108,51 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
req = req.header("X-Forwarded-For", case.ip);
}
let res = req.send().await;
assert_eq!(res.status(), StatusCode::OK, "{}", res.text().await);
assert_eq!(
res.status(),
StatusCode::OK,
"line {} rejected: {}",
line_number,
res.text().await
);
assert_eq!(
Some(CaptureResponse {
status: CaptureResponseCode::Ok
}),
res.json().await
);
assert_eq!(sink.len(), case.output.len());
assert_json_eq!(json!(case.output), json!(sink.events()))
assert_eq!(
sink.len(),
case.output.len(),
"event count mismatch on line {}",
line_number
);

for (event_number, (message, expected)) in
sink.events().iter().zip(case.output.iter()).enumerate()
{
// Normalizing the expected event to align with known django->rust inconsistencies
let mut expected = expected.clone();
if let Some(value) = expected.get_mut("sent_at") {
// Default ISO format is different between python and rust, both are valid
// Parse and re-print the value before comparison
let sent_at =
OffsetDateTime::parse(value.as_str().expect("empty"), &Iso8601::DEFAULT)?;
*value = Value::String(sent_at.format(&Rfc3339)?)
}

let match_config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict);
if let Err(e) =
assert_json_matches_no_panic(&json!(expected), &json!(message), match_config)
{
println!(
"mismatch at line {}, event {}: {}",
line_number, event_number, e
);
mismatches += 1;
}
}
}
assert_eq!(0, mismatches, "some events didn't match");
Ok(())
}
22 changes: 12 additions & 10 deletions capture/tests/requests_dump.jsonl

Large diffs are not rendered by default.

0 comments on commit 6ed0609

Please sign in to comment.