Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

capture: add support for the /batch request shape #21

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
anyhow = "1.0"
assert-json-diff = "2.0.2"
async-trait = "0.1.74"
axum = { version = "0.7.5", features = ["http2", "macros"] }
axum = { version = "0.7.5", features = ["http2", "macros", "matched-path"] }
axum-client-ip = "0.6.0"
base64 = "0.22.0"
bytes = "1"
Expand Down
32 changes: 21 additions & 11 deletions capture/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
use crate::token::InvalidTokenReason;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use thiserror::Error;
use time::OffsetDateTime;
use uuid::Uuid;

#[derive(Debug, Deserialize, Serialize)]
pub struct CaptureRequest {
xvello marked this conversation as resolved.
Show resolved Hide resolved
#[serde(alias = "$token", alias = "api_key")]
pub token: String,

pub event: String,
pub properties: HashMap<String, Value>,
}
use crate::token::InvalidTokenReason;

#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum CaptureResponseCode {
Expand Down Expand Up @@ -84,3 +76,21 @@ impl IntoResponse for CaptureError {
.into_response()
}
}

#[derive(Clone, Default, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
pub uuid: Uuid,
pub distinct_id: String,
pub ip: String,
pub data: String,
pub now: String,
#[serde(with = "time::serde::rfc3339::option")]
pub sent_at: Option<OffsetDateTime>,
pub token: String,
}

impl ProcessedEvent {
pub fn key(&self) -> String {
format!("{}:{}", self.token, self.distinct_id)
}
}
4 changes: 2 additions & 2 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
pub mod api;
pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod limiters;
pub mod prometheus;
Expand All @@ -12,3 +10,5 @@ pub mod sinks;
pub mod time;
pub mod token;
pub mod utils;
pub mod v0_endpoint;
pub mod v0_request;
40 changes: 33 additions & 7 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
use crate::{capture, limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource};
use crate::{
limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource, v0_endpoint,
};

use crate::prometheus::{setup_metrics_recorder, track_metrics};

Expand Down Expand Up @@ -58,17 +60,41 @@ pub fn router<
.route("/", get(index))
.route("/_readiness", get(index))
.route("/_liveness", get(move || ready(liveness.get_status())))
.route(
"/e",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/e/",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/batch",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/batch/",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/i/v0/e",
post(capture::event)
.get(capture::event)
.options(capture::options),
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/i/v0/e/",
post(capture::event)
.get(capture::event)
.options(capture::options),
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.layer(TraceLayer::new_for_http())
.layer(cors)
Expand Down
6 changes: 2 additions & 4 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::CaptureError;
use crate::api::{CaptureError, ProcessedEvent};
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::health::HealthHandle;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;
Expand Down Expand Up @@ -259,9 +258,8 @@ impl Event for KafkaSink {

#[cfg(test)]
mod tests {
use crate::api::CaptureError;
use crate::api::{CaptureError, ProcessedEvent};
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
Expand Down
3 changes: 1 addition & 2 deletions capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use async_trait::async_trait;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;
use crate::api::{CaptureError, ProcessedEvent};

pub mod kafka;
pub mod print;
Expand Down
3 changes: 1 addition & 2 deletions capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use async_trait::async_trait;
use metrics::{counter, histogram};
use tracing::log::info;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;
use crate::api::{CaptureError, ProcessedEvent};
use crate::sinks::Event;

pub struct PrintSink {}
Expand Down
Loading
Loading