Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
implement error handling with a CaptureError enum (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Sep 14, 2023
1 parent 3390610 commit 76905ec
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 94 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ rand = "0.8.5"
rdkafka = { version = "0.34", features = ["cmake-build"] }
metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
thiserror = "1.0.48"
1 change: 1 addition & 0 deletions capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rand = { workspace = true }
rdkafka = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
assert-json-diff = "2.0.2"
Expand Down
49 changes: 49 additions & 0 deletions capture/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::token::InvalidTokenReason;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use thiserror::Error;

#[derive(Debug, Deserialize, Serialize)]
pub struct CaptureRequest {
Expand All @@ -20,3 +24,48 @@ pub enum CaptureResponseCode {
pub struct CaptureResponse {
pub status: CaptureResponseCode,
}

#[derive(Error, Debug)]
pub enum CaptureError {
#[error("failed to decode request: {0}")]
RequestDecodingError(String),
#[error("failed to decode request: {0}")]
RequestParsingError(#[from] serde_json::Error),

#[error("request holds no event")]
EmptyBatch,
#[error("event submitted without a distinct_id")]
MissingDistinctId,

#[error("event submitted without an api_key")]
NoTokenError,
#[error("batch submitted with inconsistent api_key values")]
MultipleTokensError,
#[error("API key is not valid: {0}")]
TokenValidationError(#[from] InvalidTokenReason),

#[error("transient error, please retry")]
RetryableSinkError,
#[error("maximum event size exceeded")]
EventTooBig,
#[error("invalid event could not be processed")]
NonRetryableSinkError,
}

impl IntoResponse for CaptureError {
fn into_response(self) -> Response {
match self {
CaptureError::RequestDecodingError(_)
| CaptureError::RequestParsingError(_)
| CaptureError::EmptyBatch
| CaptureError::MissingDistinctId
| CaptureError::EventTooBig
| CaptureError::NonRetryableSinkError => (StatusCode::BAD_REQUEST, self.to_string()),
CaptureError::NoTokenError
| CaptureError::MultipleTokensError
| CaptureError::TokenValidationError(_) => (StatusCode::UNAUTHORIZED, self.to_string()),
CaptureError::RetryableSinkError => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
}
.into_response()
}
}
74 changes: 19 additions & 55 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use bytes::Bytes;

use axum::{http::StatusCode, Json};
use axum::Json;
// TODO: stream this instead
use axum::extract::{Query, State};
use axum::http::HeaderMap;
Expand All @@ -16,7 +15,7 @@ use time::OffsetDateTime;
use crate::event::ProcessingContext;
use crate::token::validate_token;
use crate::{
api::{CaptureResponse, CaptureResponseCode},
api::{CaptureError, CaptureResponse, CaptureResponseCode},
event::{EventFormData, EventQuery, ProcessedEvent, RawEvent},
router, sink,
utils::uuid_v7,
Expand All @@ -28,7 +27,7 @@ pub async fn event(
meta: Query<EventQuery>,
headers: HeaderMap,
body: Bytes,
) -> Result<Json<CaptureResponse>, (StatusCode, String)> {
) -> Result<Json<CaptureResponse>, CaptureError> {
tracing::debug!(len = body.len(), "new event request");

let events = match headers
Expand All @@ -43,28 +42,14 @@ pub async fn event(
RawEvent::from_bytes(&meta, payload.into())
}
_ => RawEvent::from_bytes(&meta, body),
};

let events = match events {
Ok(events) => events,
Err(e) => {
tracing::error!("failed to decode event: {:?}", e);
return Err((
StatusCode::BAD_REQUEST,
String::from("Failed to decode event"),
));
}
};
}?;

println!("Got events {:?}", &events);

if events.is_empty() {
return Err((StatusCode::BAD_REQUEST, String::from("No events in batch")));
return Err(CaptureError::EmptyBatch);
}
let token = match extract_and_verify_token(&events) {
Ok(token) => token,
Err(msg) => return Err((StatusCode::UNAUTHORIZED, msg)),
};
let token = extract_and_verify_token(&events)?;

let sent_at = meta.sent_at.and_then(|value| {
let value_nanos: i128 = i128::from(value) * 1_000_000; // Assuming the value is in milliseconds, latest posthog-js releases
Expand All @@ -84,11 +69,7 @@ pub async fn event(
client_ip: ip.to_string(),
};

let processed = process_events(state.sink.clone(), &events, &context).await;

if let Err(msg) = processed {
return Err((StatusCode::BAD_REQUEST, msg));
}
process_events(state.sink.clone(), &events, &context).await?;

Ok(Json(CaptureResponse {
status: CaptureResponseCode::Ok,
Expand All @@ -98,12 +79,12 @@ pub async fn event(
pub fn process_single_event(
event: &RawEvent,
context: &ProcessingContext,
) -> Result<ProcessedEvent> {
) -> Result<ProcessedEvent, CaptureError> {
let distinct_id = match &event.distinct_id {
Some(id) => id,
None => match event.properties.get("distinct_id").map(|v| v.as_str()) {
Some(Some(id)) => id,
_ => return Err(anyhow!("missing distinct_id")),
_ => return Err(CaptureError::MissingDistinctId),
},
};

Expand All @@ -119,7 +100,7 @@ pub fn process_single_event(
})
}

pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, String> {
pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, CaptureError> {
let distinct_tokens: HashSet<Option<String>> = HashSet::from_iter(
events
.iter()
Expand All @@ -128,50 +109,33 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, String> {
);

return match distinct_tokens.len() {
0 => Err(String::from("no token found in request")),
0 => Err(CaptureError::NoTokenError),
1 => match distinct_tokens.iter().last() {
Some(Some(token)) => {
validate_token(token).map_err(|err| String::from(err.reason()))?;
validate_token(token)?;
Ok(token.clone())
}
_ => Err(String::from("no token found in request")),
_ => Err(CaptureError::NoTokenError),
},
_ => Err(String::from("number of distinct tokens in batch > 1")),
_ => Err(CaptureError::MultipleTokensError),
};
}

pub async fn process_events<'a>(
sink: Arc<dyn sink::EventSink + Send + Sync>,
events: &'a [RawEvent],
context: &'a ProcessingContext,
) -> Result<(), String> {
let events: Vec<ProcessedEvent> = match events
) -> Result<(), CaptureError> {
let events: Vec<ProcessedEvent> = events
.iter()
.map(|e| process_single_event(e, context))
.collect()
{
Err(_) => return Err(String::from("Failed to process all events")),
Ok(events) => events,
};
.collect::<Result<Vec<ProcessedEvent>, CaptureError>>()?;

if events.len() == 1 {
let sent = sink.send(events[0].clone()).await;

if let Err(e) = sent {
tracing::error!("Failed to send event to sink: {:?}", e);

return Err(String::from("Failed to send event to sink"));
}
sink.send(events[0].clone()).await?;
} else {
let sent = sink.send_batch(events).await;

if let Err(e) = sent {
tracing::error!("Failed to send batch events to sink: {:?}", e);

return Err(String::from("Failed to send batch events to sink"));
}
sink.send_batch(events).await?;
}

Ok(())
}

Expand Down
41 changes: 29 additions & 12 deletions capture/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use anyhow::{anyhow, Result};
use crate::api::CaptureError;
use bytes::{Buf, Bytes};
use flate2::read::GzDecoder;
use time::OffsetDateTime;
Expand Down Expand Up @@ -52,31 +52,48 @@ pub struct RawEvent {
pub properties: HashMap<String, Value>,
}

#[derive(Deserialize)]
#[serde(untagged)]
enum RawRequest {
/// Batch of events
Batch(Vec<RawEvent>),
/// Single event
One(RawEvent),
}

impl RawRequest {
pub fn events(self) -> Vec<RawEvent> {
match self {
RawRequest::Batch(events) => events,
RawRequest::One(event) => vec![event],
}
}
}

impl RawEvent {
/// We post up _at least one_ event, so when decompressiong and deserializing there
/// could be more than one. Hence this function has to return a Vec.
/// TODO: Use an axum extractor for this
pub fn from_bytes(query: &EventQuery, bytes: Bytes) -> Result<Vec<RawEvent>> {
pub fn from_bytes(query: &EventQuery, bytes: Bytes) -> Result<Vec<RawEvent>, CaptureError> {
tracing::debug!(len = bytes.len(), "decoding new event");

let payload = match query.compression {
Some(Compression::GzipJs) => {
let mut d = GzDecoder::new(bytes.reader());
let mut s = String::new();
d.read_to_string(&mut s)?;
d.read_to_string(&mut s).map_err(|e| {
tracing::error!("failed to decode gzip: {}", e);
CaptureError::RequestDecodingError(String::from("invalid gzip data"))
})?;
s
}
None => String::from_utf8(bytes.into())?,
None => String::from_utf8(bytes.into()).map_err(|e| {
tracing::error!("failed to decode body: {}", e);
CaptureError::RequestDecodingError(String::from("invalid body encoding"))
})?,
};

tracing::debug!(json = payload, "decoded event data");
if let Ok(events) = serde_json::from_str::<Vec<RawEvent>>(&payload) {
return Ok(events);
}
if let Ok(events) = serde_json::from_str::<RawEvent>(&payload) {
return Ok(vec![events]);
}
Err(anyhow!("unknown input shape"))
Ok(serde_json::from_str::<RawRequest>(&payload)?.events())
}

pub fn extract_token(&self) -> Option<String> {
Expand Down
Loading

0 comments on commit 76905ec

Please sign in to comment.