From 57901816d96a39804254178cd6eaef3b42bc3c14 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 7 Aug 2024 11:22:06 -0700 Subject: [PATCH 1/2] Fix cancel-before-start abandon activities --- .../machines/activity_state_machine.rs | 20 +++++++-- .../integ_tests/workflow_tests/activities.rs | 44 +++++++++++++++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/core/src/worker/workflow/machines/activity_state_machine.rs b/core/src/worker/workflow/machines/activity_state_machine.rs index 1f5694db4..0fae4bff3 100644 --- a/core/src/worker/workflow/machines/activity_state_machine.rs +++ b/core/src/worker/workflow/machines/activity_state_machine.rs @@ -407,7 +407,7 @@ impl ScheduleCommandCreated { pub(super) fn on_abandoned(self, dat: &mut SharedState) -> ActivityMachineTransition { dat.cancelled_before_sent = true; - ActivityMachineTransition::default() + notify_lang_activity_cancelled(None) } } @@ -806,7 +806,7 @@ mod test { internal_flags::InternalFlags, replay::TestHistoryBuilder, test_help::{build_fake_sdk, MockPollCfg, ResponseType}, - worker::workflow::machines::Machines, + worker::workflow::{machines::Machines, OutgoingJob}, }; use std::{cell::RefCell, mem::discriminant, rc::Rc}; use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext, WorkflowFunction}; @@ -905,7 +905,21 @@ mod test { panic!("Wrong machine type"); }; let cmds = s.cancel().unwrap(); - assert_eq!(cmds.len(), 0); + // We should always be notifying lang that the activity got cancelled, even if it's + // abandoned and we aren't telling server + // MachineResponse::PushWFJob() + assert_matches!( + cmds.as_slice(), + [MachineResponse::PushWFJob(OutgoingJob { + variant: workflow_activation_job::Variant::ResolveActivity(ResolveActivity { + result: Some(ActivityResolution { + status: Some(activity_resolution::Status::Cancelled(_)) + }), + .. + }), + .. + })] + ); let curstate = s.state(); assert!(matches!(curstate, &ActivityMachineState::Canceled(_))); } diff --git a/tests/integ_tests/workflow_tests/activities.rs b/tests/integ_tests/workflow_tests/activities.rs index fc7699f10..cf1b6db77 100644 --- a/tests/integ_tests/workflow_tests/activities.rs +++ b/tests/integ_tests/workflow_tests/activities.rs @@ -787,6 +787,50 @@ async fn activity_cancelled_after_heartbeat_times_out() { .unwrap(); } +#[tokio::test] +async fn one_activity_abandon_cancelled_before_started() { + let wf_name = "one_activity_abandon_cancelled_before_started"; + let mut starter = CoreWfStarter::new(wf_name); + let mut worker = starter.worker().await; + let client = starter.get_client().await; + worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + let act_fut = ctx.activity(ActivityOptions { + activity_type: "echo_activity".to_string(), + start_to_close_timeout: Some(Duration::from_secs(5)), + input: "hi!".as_json_payload().expect("serializes fine"), + cancellation_type: ActivityCancellationType::Abandon, + ..Default::default() + }); + act_fut.cancel(&ctx); + act_fut.await; + Ok(().into()) + }); + worker.register_activity( + "echo_activity", + |_ctx: ActContext, echo_me: String| async move { + sleep(Duration::from_secs(2)).await; + Ok(echo_me) + }, + ); + + let run_id = worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); + let handle = client.get_untyped_workflow_handle(wf_name, run_id); + let res = handle + .get_workflow_result(Default::default()) + .await + .unwrap(); + assert_matches!(res, WorkflowExecutionResult::Succeeded(_)); +} + #[tokio::test] async fn one_activity_abandon_cancelled_after_complete() { let wf_name = "one_activity_abandon_cancelled_after_complete"; From 0ddc0e91dc8ba8c18ebe41ff8fe481b875e65bf1 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 7 Aug 2024 11:24:13 -0700 Subject: [PATCH 2/2] Remove leftover comment line --- core/src/worker/workflow/machines/activity_state_machine.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/worker/workflow/machines/activity_state_machine.rs b/core/src/worker/workflow/machines/activity_state_machine.rs index 0fae4bff3..775c85516 100644 --- a/core/src/worker/workflow/machines/activity_state_machine.rs +++ b/core/src/worker/workflow/machines/activity_state_machine.rs @@ -907,7 +907,6 @@ mod test { let cmds = s.cancel().unwrap(); // We should always be notifying lang that the activity got cancelled, even if it's // abandoned and we aren't telling server - // MachineResponse::PushWFJob() assert_matches!( cmds.as_slice(), [MachineResponse::PushWFJob(OutgoingJob {