Skip to content

Commit

Permalink
fix(hoghooks): drop payloads that are too large for kafka (#24000)
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Jul 26, 2024
1 parent cdca1a9 commit a422e7e
Showing 1 changed file with 98 additions and 1 deletion.
99 changes: 98 additions & 1 deletion rust/hook-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::channel::oneshot::Canceled;
use futures::future::join_all;
use health::HealthHandle;
use http::StatusCode;
use rdkafka::error::KafkaError;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::{FutureProducer, FutureRecord};
use reqwest::{header, Client};
use serde_json::{json, Value};
Expand Down Expand Up @@ -278,6 +278,30 @@ async fn process_batch<'a>(
headers: None,
}) {
Ok(future) => kafka_ack_futures.push(future),
Err((
KafkaError::MessageProduction(
RDKafkaErrorCode::MessageSizeTooLarge,
),
_,
)) => {
// HACK: While under development, we are dropping messages that
// are too large. This is temporary, as we expect the webhook
// handler for Hog to change soon. In the meantime, nobody needs
// to be alerted about this.
let team_id = metadata
.get("teamId")
.and_then(|t| t.as_number())
.map(|t| t.to_string())
.unwrap_or_else(|| "?".to_string());

let hog_function_id = metadata
.get("hogFunctionId")
.and_then(|h| h.as_str())
.map(|h| h.to_string())
.unwrap_or_else(|| "?".to_string());

error!("dropping message due to size limit, team_id: {}, hog_function_id: {}", team_id, hog_function_id);
}
Err((error, _)) => {
// Return early to avoid committing the batch.
return log_kafka_error_and_sleep("send", Some(error)).await;
Expand Down Expand Up @@ -928,6 +952,79 @@ mod tests {
);
}

#[sqlx::test(migrations = "../migrations")]
async fn test_hoghook_drops_large_payloads(db: PgPool) {
use httpmock::prelude::*;

let worker_id = worker_id();
let queue_name = "test_hoghook_drops_large_payloads".to_string();
let queue = PgQueue::new_from_pool(&queue_name, db).await;
let topic = "cdp_function_callbacks";

let server = MockServer::start();

server.mock(|when, then| {
when.method(POST).path("/");
then.status(200)
.header("content-type", "application/json; charset=UTF-8")
.body(r#"{"message": "hello, world"}"#);
});

let mock_url = server.url("/");

let webhook_job_parameters = WebhookJobParameters {
body: "".to_owned(),
headers: collections::HashMap::new(),
method: HttpMethod::POST,
url: mock_url,
};

let webhook_job_metadata = json!({"hugeField": "a".repeat(2 * 1024 * 1024)});

enqueue_job(
&queue,
1,
webhook_job_parameters.clone(),
serde_json::to_value(webhook_job_metadata).unwrap(),
)
.await
.expect("failed to enqueue job");

let registry = HealthRegistry::new("liveness");
let liveness = registry
.register("worker".to_string(), ::time::Duration::seconds(30))
.await;

let (_, mock_producer) = create_mock_kafka().await;
let hog_mode = true;
let worker = WebhookWorker::new(
&worker_id,
&queue,
1,
time::Duration::from_millis(100),
time::Duration::from_millis(5000),
10,
RetryPolicy::default(),
false,
mock_producer,
topic.to_string(),
hog_mode,
liveness,
);

let batch = worker.wait_for_jobs_tx().await;

process_batch(
batch,
worker.http_client,
worker.retry_policy,
worker.kafka_producer,
worker.cdp_function_callbacks_topic,
hog_mode,
)
.await;
}

#[tokio::test]
async fn test_send_webhook() {
let method = HttpMethod::POST;
Expand Down

0 comments on commit a422e7e

Please sign in to comment.