From 9da92027ef82f96140ee6663733c19ac927e3775 Mon Sep 17 00:00:00 2001 From: Sanchith Hegde <22217505+SanchithHegde@users.noreply.github.com> Date: Thu, 6 Jun 2024 23:15:45 +0530 Subject: [PATCH] refactor(outgoing_webhooks): raise errors in the analytics pipeline in case of API client errors or non-2xx responses (#4894) --- crates/router/src/core/webhooks/outgoing.rs | 481 +++++++++++--------- 1 file changed, 258 insertions(+), 223 deletions(-) diff --git a/crates/router/src/core/webhooks/outgoing.rs b/crates/router/src/core/webhooks/outgoing.rs index 94f96c452166..0be4e9272f06 100644 --- a/crates/router/src/core/webhooks/outgoing.rs +++ b/crates/router/src/core/webhooks/outgoing.rs @@ -274,199 +274,17 @@ async fn trigger_webhook_to_merchant( ); logger::debug!(outgoing_webhook_response=?response); - let update_event_if_client_error = - |state: SessionState, - merchant_key_store: domain::MerchantKeyStore, - merchant_id: String, - event_id: String, - error_message: String| async move { - let is_webhook_notified = false; - - let response_to_store = OutgoingWebhookResponseContent { - body: None, - headers: None, - status_code: None, - error_message: Some(error_message), - }; - - let event_update = domain::EventUpdate::UpdateResponse { - is_webhook_notified, - response: Some( - domain_types::encrypt( - response_to_store - .encode_to_string_of_json() - .change_context( - errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, - ) - .map(Secret::new)?, - merchant_key_store.key.get_inner().peek(), - ) - .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) - .attach_printable("Failed to encrypt outgoing webhook response content")?, - ), - }; - - state - .store - .update_event_by_merchant_id_event_id( - &merchant_id, - &event_id, - event_update, - &merchant_key_store, - ) - .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) - }; - - let api_client_error_handler = - |state: SessionState, - merchant_key_store: domain::MerchantKeyStore, - merchant_id: String, - event_id: String, - client_error: error_stack::Report, - delivery_attempt: enums::WebhookDeliveryAttempt| async move { - // Not including detailed error message in response information since it contains too - // much of diagnostic information to be exposed to the merchant. - update_event_if_client_error( - state, - merchant_key_store, - merchant_id, - event_id, - "Unable to send request to merchant server".to_string(), - ) - .await?; - - let error = - client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed); - logger::error!( - ?error, - ?delivery_attempt, - "An error occurred when sending webhook to merchant" - ); - - Ok::<_, error_stack::Report>(()) - }; - let update_event_in_storage = |state: SessionState, - merchant_key_store: domain::MerchantKeyStore, - merchant_id: String, - event_id: String, - response: reqwest::Response| async move { - let status_code = response.status(); - let is_webhook_notified = status_code.is_success(); - - let response_headers = response - .headers() - .iter() - .map(|(name, value)| { - ( - name.as_str().to_owned(), - value - .to_str() - .map(|s| Secret::from(String::from(s))) - .unwrap_or_else(|error| { - logger::warn!( - "Response header {} contains non-UTF-8 characters: {error:?}", - name.as_str() - ); - Secret::from(String::from("Non-UTF-8 header value")) - }), - ) - }) - .collect::>(); - let response_body = response - .text() - .await - .map(Secret::from) - .unwrap_or_else(|error| { - logger::warn!("Response contains non-UTF-8 characters: {error:?}"); - Secret::from(String::from("Non-UTF-8 response body")) - }); - let response_to_store = OutgoingWebhookResponseContent { - body: Some(response_body), - headers: Some(response_headers), - status_code: Some(status_code.as_u16()), - error_message: None, - }; - - let event_update = domain::EventUpdate::UpdateResponse { - is_webhook_notified, - response: Some( - domain_types::encrypt( - response_to_store - .encode_to_string_of_json() - .change_context( - errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, - ) - .map(Secret::new)?, - merchant_key_store.key.get_inner().peek(), - ) - .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) - .attach_printable("Failed to encrypt outgoing webhook response content")?, - ), - }; - state - .store - .update_event_by_merchant_id_event_id( - &merchant_id, - &event_id, - event_update, - &merchant_key_store, - ) - .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) - }; - let increment_webhook_outgoing_received_count = |merchant_id: String| { - metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add( - &metrics::CONTEXT, - 1, - &[metrics::KeyValue::new(MERCHANT_ID, merchant_id)], - ) - }; - let success_response_handler = - |state: SessionState, - merchant_id: String, - process_tracker: Option, - business_status: &'static str| async move { - increment_webhook_outgoing_received_count(merchant_id); - - match process_tracker { - Some(process_tracker) => state - .store - .as_scheduler() - .finish_process_with_business_status(process_tracker, business_status.into()) - .await - .change_context( - errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, - ), - None => Ok(()), - } - }; - let error_response_handler = |merchant_id: String, - delivery_attempt: enums::WebhookDeliveryAttempt, - status_code: u16, - log_message: &'static str| { - metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add( - &metrics::CONTEXT, - 1, - &[metrics::KeyValue::new(MERCHANT_ID, merchant_id)], - ); - - let error = report!(errors::WebhooksFlowError::NotReceivedByMerchant); - logger::warn!(?error, ?delivery_attempt, ?status_code, %log_message); - }; - match delivery_attempt { enums::WebhookDeliveryAttempt::InitialAttempt => match response { Err(client_error) => { api_client_error_handler( state.clone(), merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), + &business_profile.merchant_id, + &event_id, client_error, delivery_attempt, + ScheduleWebhookRetry::NoSchedule, ) .await? } @@ -475,8 +293,8 @@ async fn trigger_webhook_to_merchant( let _updated_event = update_event_in_storage( state.clone(), merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), + &business_profile.merchant_id, + &event_id, response, ) .await?; @@ -484,18 +302,21 @@ async fn trigger_webhook_to_merchant( if status_code.is_success() { success_response_handler( state.clone(), - business_profile.merchant_id, + &business_profile.merchant_id, process_tracker, "INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL", ) .await?; } else { error_response_handler( - business_profile.merchant_id, + state.clone(), + &business_profile.merchant_id, delivery_attempt, status_code.as_u16(), "Ignoring error when sending webhook to merchant", - ); + ScheduleWebhookRetry::NoSchedule, + ) + .await?; } } }, @@ -509,30 +330,21 @@ async fn trigger_webhook_to_merchant( api_client_error_handler( state.clone(), merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), + &business_profile.merchant_id, + &event_id, client_error, delivery_attempt, + ScheduleWebhookRetry::WithProcessTracker(process_tracker), ) .await?; - // Schedule a retry attempt for webhook delivery - outgoing_webhook_retry::retry_webhook_delivery_task( - &*state.store, - &business_profile.merchant_id, - process_tracker, - ) - .await - .change_context( - errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed, - )?; } Ok(response) => { let status_code = response.status(); let _updated_event = update_event_in_storage( state.clone(), merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), + &business_profile.merchant_id, + &event_id, response, ) .await?; @@ -540,28 +352,21 @@ async fn trigger_webhook_to_merchant( if status_code.is_success() { success_response_handler( state.clone(), - business_profile.merchant_id, + &business_profile.merchant_id, Some(process_tracker), "COMPLETED_BY_PT", ) .await?; } else { error_response_handler( - business_profile.merchant_id.clone(), + state.clone(), + &business_profile.merchant_id, delivery_attempt, status_code.as_u16(), "An error occurred when sending webhook to merchant", - ); - // Schedule a retry attempt for webhook delivery - outgoing_webhook_retry::retry_webhook_delivery_task( - &*state.store, - &business_profile.merchant_id, - process_tracker, + ScheduleWebhookRetry::WithProcessTracker(process_tracker), ) - .await - .change_context( - errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed, - )?; + .await?; } } } @@ -571,10 +376,11 @@ async fn trigger_webhook_to_merchant( api_client_error_handler( state.clone(), merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), + &business_profile.merchant_id, + &event_id, client_error, delivery_attempt, + ScheduleWebhookRetry::NoSchedule, ) .await? } @@ -583,21 +389,24 @@ async fn trigger_webhook_to_merchant( let _updated_event = update_event_in_storage( state.clone(), merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), + &business_profile.merchant_id, + &event_id, response, ) .await?; if status_code.is_success() { - increment_webhook_outgoing_received_count(business_profile.merchant_id.clone()); + increment_webhook_outgoing_received_count(&business_profile.merchant_id); } else { error_response_handler( - business_profile.merchant_id, + state, + &business_profile.merchant_id, delivery_attempt, status_code.as_u16(), "Ignoring error when sending webhook to merchant", - ); + ScheduleWebhookRetry::NoSchedule, + ) + .await?; } } }, @@ -773,3 +582,229 @@ pub(crate) fn get_outgoing_webhook_request( ), } } + +#[derive(Debug)] +enum ScheduleWebhookRetry { + WithProcessTracker(storage::ProcessTracker), + NoSchedule, +} + +async fn update_event_if_client_error( + state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: &str, + event_id: &str, + error_message: String, +) -> CustomResult { + let is_webhook_notified = false; + + let response_to_store = OutgoingWebhookResponseContent { + body: None, + headers: None, + status_code: None, + error_message: Some(error_message), + }; + + let event_update = domain::EventUpdate::UpdateResponse { + is_webhook_notified, + response: Some( + domain_types::encrypt( + response_to_store + .encode_to_string_of_json() + .change_context( + errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, + ) + .map(Secret::new)?, + merchant_key_store.key.get_inner().peek(), + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + .attach_printable("Failed to encrypt outgoing webhook response content")?, + ), + }; + + state + .store + .update_event_by_merchant_id_event_id( + merchant_id, + event_id, + event_update, + &merchant_key_store, + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) +} + +async fn api_client_error_handler( + state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: &str, + event_id: &str, + client_error: error_stack::Report, + delivery_attempt: enums::WebhookDeliveryAttempt, + schedule_webhook_retry: ScheduleWebhookRetry, +) -> CustomResult<(), errors::WebhooksFlowError> { + // Not including detailed error message in response information since it contains too + // much of diagnostic information to be exposed to the merchant. + update_event_if_client_error( + state.clone(), + merchant_key_store, + merchant_id, + event_id, + "Unable to send request to merchant server".to_string(), + ) + .await?; + + let error = client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed); + logger::error!( + ?error, + ?delivery_attempt, + "An error occurred when sending webhook to merchant" + ); + + if let ScheduleWebhookRetry::WithProcessTracker(process_tracker) = schedule_webhook_retry { + // Schedule a retry attempt for webhook delivery + outgoing_webhook_retry::retry_webhook_delivery_task( + &*state.store, + merchant_id, + process_tracker, + ) + .await + .change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)?; + } + + Err(error) +} + +async fn update_event_in_storage( + state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: &str, + event_id: &str, + response: reqwest::Response, +) -> CustomResult { + let status_code = response.status(); + let is_webhook_notified = status_code.is_success(); + + let response_headers = response + .headers() + .iter() + .map(|(name, value)| { + ( + name.as_str().to_owned(), + value + .to_str() + .map(|s| Secret::from(String::from(s))) + .unwrap_or_else(|error| { + logger::warn!( + "Response header {} contains non-UTF-8 characters: {error:?}", + name.as_str() + ); + Secret::from(String::from("Non-UTF-8 header value")) + }), + ) + }) + .collect::>(); + let response_body = response + .text() + .await + .map(Secret::from) + .unwrap_or_else(|error| { + logger::warn!("Response contains non-UTF-8 characters: {error:?}"); + Secret::from(String::from("Non-UTF-8 response body")) + }); + let response_to_store = OutgoingWebhookResponseContent { + body: Some(response_body), + headers: Some(response_headers), + status_code: Some(status_code.as_u16()), + error_message: None, + }; + + let event_update = domain::EventUpdate::UpdateResponse { + is_webhook_notified, + response: Some( + domain_types::encrypt( + response_to_store + .encode_to_string_of_json() + .change_context( + errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, + ) + .map(Secret::new)?, + merchant_key_store.key.get_inner().peek(), + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + .attach_printable("Failed to encrypt outgoing webhook response content")?, + ), + }; + state + .store + .update_event_by_merchant_id_event_id( + merchant_id, + event_id, + event_update, + &merchant_key_store, + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) +} + +fn increment_webhook_outgoing_received_count(merchant_id: &str) { + metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new(MERCHANT_ID, merchant_id.to_owned())], + ) +} + +async fn success_response_handler( + state: SessionState, + merchant_id: &str, + process_tracker: Option, + business_status: &'static str, +) -> CustomResult<(), errors::WebhooksFlowError> { + increment_webhook_outgoing_received_count(merchant_id); + + match process_tracker { + Some(process_tracker) => state + .store + .as_scheduler() + .finish_process_with_business_status(process_tracker, business_status.into()) + .await + .change_context( + errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, + ), + None => Ok(()), + } +} + +async fn error_response_handler( + state: SessionState, + merchant_id: &str, + delivery_attempt: enums::WebhookDeliveryAttempt, + status_code: u16, + log_message: &'static str, + schedule_webhook_retry: ScheduleWebhookRetry, +) -> CustomResult<(), errors::WebhooksFlowError> { + metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new(MERCHANT_ID, merchant_id.to_owned())], + ); + + let error = report!(errors::WebhooksFlowError::NotReceivedByMerchant); + logger::warn!(?error, ?delivery_attempt, ?status_code, %log_message); + + if let ScheduleWebhookRetry::WithProcessTracker(process_tracker) = schedule_webhook_retry { + // Schedule a retry attempt for webhook delivery + outgoing_webhook_retry::retry_webhook_delivery_task( + &*state.store, + merchant_id, + process_tracker, + ) + .await + .change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)?; + } + + Err(error) +}