Skip to content

Commit

Permalink
History ooo bug (#858)
Browse files Browse the repository at this point in the history
* Adds repro test & correct other error that should be nondetermminism

* Fix
Sushisource authored Dec 20, 2024
1 parent 7b3776c commit bba9fe1
Showing 4 changed files with 120 additions and 15 deletions.
2 changes: 1 addition & 1 deletion core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
@@ -1482,7 +1482,7 @@ impl WorkflowMachines {

fn get_machine_key(&self, id: CommandID) -> Result<MachineKey> {
Ok(*self.id_to_machine.get(&id).ok_or_else(|| {
WFMachinesError::Fatal(format!("Missing associated machine for {id:?}"))
WFMachinesError::Nondeterminism(format!("Missing associated machine for {id:?}"))
})?)
}

23 changes: 14 additions & 9 deletions core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
@@ -373,6 +373,7 @@ impl ManagedRun {
mut commands: Vec<WFCommand>,
used_flags: Vec<u32>,
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
is_forced_failure: bool,
) -> Result<RunUpdateAct, Box<NextPageReq>> {
let activation_was_only_eviction = self.activation_is_eviction();
let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() {
@@ -446,6 +447,7 @@ impl ManagedRun {
query_responses,
used_flags,
resp_chan,
is_forced_failure,
};

// Verify we can actually apply the next workflow task, which will happen as part of
@@ -617,6 +619,7 @@ impl ManagedRun {
}],
vec![],
resp_chan,
true,
)
.unwrap_or_else(|e| {
dbg_panic!("Got next page request when auto-failing workflow: {e:?}");
@@ -686,6 +689,7 @@ impl ManagedRun {
query_responses: completion.query_responses,
has_pending_query: completion.has_pending_query,
activation_was_eviction: completion.activation_was_eviction,
is_forced_failure: completion.is_forced_failure,
};

self.wfm.machines.add_lang_used_flags(completion.used_flags);
@@ -708,7 +712,8 @@ impl ManagedRun {
self.wfm.feed_history_from_new_page(update)?;
}
// Don't bother applying the next task if we're evicting at the end of this activation
if !completion.activation_was_eviction {
// or are otherwise broken.
if !completion.activation_was_eviction && !self.am_broken {
self.wfm.apply_next_task_if_ready()?;
}
let new_local_acts = self.wfm.drain_queued_local_activities();
@@ -1083,7 +1088,7 @@ impl ManagedRun {
// fulfilling a query. If the activation we sent was *only* an eviction, don't send that
// either.
let should_respond = !(machines_wft_response.has_pending_jobs
|| machines_wft_response.replaying
|| (machines_wft_response.replaying && !data.is_forced_failure)
|| is_query_playback
|| data.activation_was_eviction
|| machines_wft_response.have_seen_terminal_event);
@@ -1331,6 +1336,7 @@ struct CompletionDataForWFT {
query_responses: Vec<QueryResult>,
has_pending_query: bool,
activation_was_eviction: bool,
is_forced_failure: bool,
}

/// Manages an instance of a [WorkflowMachines], which is not thread-safe, as well as other data
@@ -1405,13 +1411,11 @@ impl WorkflowManager {
self.machines.ready_to_apply_next_wft()
}

/// If there are no pending jobs for the workflow, apply the next workflow task and check
/// again if there are any jobs. Importantly, does not *drain* jobs.
///
/// Returns true if there are jobs (before or after applying the next WFT).
fn apply_next_task_if_ready(&mut self) -> Result<bool> {
/// If there are no pending jobs for the workflow apply the next workflow task and check again
/// if there are any jobs. Importantly, does not *drain* jobs.
fn apply_next_task_if_ready(&mut self) -> Result<()> {
if self.machines.has_pending_jobs() {
return Ok(true);
return Ok(());
}
loop {
let consumed_events = self.machines.apply_next_wft_from_history()?;
@@ -1423,7 +1427,7 @@ impl WorkflowManager {
break;
}
}
Ok(self.machines.has_pending_jobs())
Ok(())
}

/// Must be called when we're ready to respond to a WFT after handling catching up on replay
@@ -1473,6 +1477,7 @@ struct RunActivationCompletion {
has_pending_query: bool,
query_responses: Vec<QueryResult>,
used_flags: Vec<u32>,
is_forced_failure: bool,
/// Used to notify the worker when the completion is done processing and the completion can
/// unblock. Must always be `Some` when initialized.
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
7 changes: 6 additions & 1 deletion core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
@@ -264,7 +264,12 @@ impl WFStream {
commands,
used_flags,
..
} => match rh.successful_completion(commands, used_flags, complete.response_tx) {
} => match rh.successful_completion(
commands,
used_flags,
complete.response_tx,
false,
) {
Ok(acts) => acts,
Err(npr) => {
self.runs_needing_fetching
103 changes: 99 additions & 4 deletions tests/integ_tests/workflow_tests.rs
Original file line number Diff line number Diff line change
@@ -23,8 +23,11 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use temporal_client::{WorkflowClientTrait, WorkflowOptions};
use temporal_sdk::{interceptors::WorkerInterceptor, ActivityOptions, WfContext, WorkflowResult};
use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, WorkflowOptions};
use temporal_sdk::{
interceptors::WorkerInterceptor, ActivityOptions, LocalActivityOptions, WfContext,
WorkflowResult,
};
use temporal_sdk_core::{replay::HistoryForReplay, CoreRuntime};
use temporal_sdk_core_api::errors::{PollWfError, WorkflowErrorType};
use temporal_sdk_core_protos::{
@@ -46,9 +49,8 @@ use temporal_sdk_core_test_utils::{
drain_pollers_and_shutdown, history_from_proto_binary, init_core_and_create_wf,
init_core_replay_preloaded, schedule_activity_cmd, CoreWfStarter, WorkerTestHelpers,
};
use tokio::{join, time::sleep};
use tokio::{join, sync::Notify, time::sleep};
use uuid::Uuid;

// TODO: We should get expected histories for these tests and confirm that the history at the end
// matches.

@@ -769,3 +771,96 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to(
);
assert!(body.contains(&match_this));
}

#[tokio::test]
async fn history_out_of_order_on_restart() {
let wf_name = "history_out_of_order_on_restart";
let mut starter = CoreWfStarter::new(wf_name);
starter
.worker_config
.workflow_failure_errors([WorkflowErrorType::Nondeterminism]);
let mut worker = starter.worker().await;
let mut starter2 = starter.clone_no_worker();
let mut worker2 = starter2.worker().await;

static HIT_SLEEP: Notify = Notify::const_new();

worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_owned(),
input: "hi".as_json_payload().unwrap(),
start_to_close_timeout: Some(Duration::from_secs(5)),
..Default::default()
})
.await;
ctx.activity(ActivityOptions {
activity_type: "echo".to_owned(),
input: "hi".as_json_payload().unwrap(),
start_to_close_timeout: Some(Duration::from_secs(5)),
..Default::default()
})
.await;
// Interrupt this sleep on first go
HIT_SLEEP.notify_one();
ctx.timer(Duration::from_secs(5)).await;
Ok(().into())
});
worker.register_activity("echo", echo);

worker2.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_owned(),
input: "hi".as_json_payload().unwrap(),
start_to_close_timeout: Some(Duration::from_secs(5)),
..Default::default()
})
.await;
// Timer is added after restarting workflow
ctx.timer(Duration::from_secs(1)).await;
ctx.activity(ActivityOptions {
activity_type: "echo".to_owned(),
input: "hi".as_json_payload().unwrap(),
start_to_close_timeout: Some(Duration::from_secs(5)),
..Default::default()
})
.await;
ctx.timer(Duration::from_secs(2)).await;
Ok(().into())
});
worker2.register_activity("echo", echo);
worker
.submit_wf(
wf_name.to_owned(),
wf_name.to_owned(),
vec![],
WorkflowOptions {
execution_timeout: Some(Duration::from_secs(20)),
..Default::default()
},
)
.await
.unwrap();

let w1 = async {
worker.run_until_done().await.unwrap();
};
let w2 = async {
// wait to hit sleep
HIT_SLEEP.notified().await;
starter.shutdown().await;
// start new worker
worker2.expect_workflow_completion(wf_name, None);
worker2.run_until_done().await.unwrap();
};
join!(w1, w2);
// The workflow should fail with the nondeterminism error
let handle = starter
.get_client()
.await
.get_untyped_workflow_handle(wf_name, "");
let res = handle
.get_workflow_result(Default::default())
.await
.unwrap();
assert_matches!(res, WorkflowExecutionResult::Failed(_));
}

0 comments on commit bba9fe1

Please sign in to comment.