diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index 37ce51c9e..06622c2f2 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -45,7 +45,7 @@ use temporal_sdk_core_protos::{ default_act_sched, default_wes_attribs, temporal::api::{ command::v1::command::Attributes, - common::v1::{Payload, RetryPolicy}, + common::v1::{Payload, RetryPolicy, WorkerVersionStamp}, enums::v1::{EventType, WorkflowTaskFailedCause}, failure::v1::Failure, history::v1::{ @@ -2807,3 +2807,54 @@ async fn use_compatible_version_flag( .await .unwrap(); } + +#[tokio::test] +async fn sets_build_id_from_wft_complete() { + let wfid = "fake_wf_id"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let timer_started_event_id = t.add_by_type(EventType::TimerStarted); + t.add_timer_fired(timer_started_event_id, "1".to_string()); + t.add_full_wf_task(); + t.modify_event(t.current_event_id(), |he| { + if let history_event::Attributes::WorkflowTaskCompletedEventAttributes(a) = + he.attributes.as_mut().unwrap() + { + a.worker_version = Some(WorkerVersionStamp { + build_id: "enchi-cat".to_string(), + ..Default::default() + }); + } + }); + let timer_started_event_id = t.add_by_type(EventType::TimerStarted); + t.add_timer_fired(timer_started_event_id, "2".to_string()); + t.add_full_wf_task(); + + let mock = mock_workflow_client(); + let mut worker = mock_sdk_cfg( + MockPollCfg::from_resp_batches(wfid, t, [ResponseType::AllHistory], mock), + |cfg| { + cfg.worker_build_id = "fierce-predator".to_string(); + cfg.max_cached_workflows = 1; + }, + ); + + worker.register_wf(DEFAULT_WORKFLOW_TYPE, |ctx: WfContext| async move { + // First task, it should be empty, since replaying and nothing in first WFT completed + ctx.timer(Duration::from_secs(1)).await; + assert_eq!(ctx.current_build_id(), None); + ctx.timer(Duration::from_secs(1)).await; + assert_eq!(ctx.current_build_id(), Some("enchi-cat".to_string())); + ctx.timer(Duration::from_secs(1)).await; + // Not replaying at this point, so we should see the worker's build id + assert_eq!(ctx.current_build_id(), Some("fierce-predator".to_string())); + Ok(().into()) + }); + worker + .submit_wf(wfid, DEFAULT_WORKFLOW_TYPE, vec![], Default::default()) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); +} diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 1edca65f9..7b7a5a7ed 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -13,9 +13,11 @@ pub(crate) use activities::{ pub(crate) use workflow::{wft_poller::new_wft_poller, LEGACY_QUERY_ID}; use crate::{ - abstractions::MeteredSemaphore, + abstractions::{dbg_panic, MeteredSemaphore}, errors::CompleteWfError, - pollers::{new_activity_task_buffer, new_workflow_task_buffer, WorkflowTaskPoller}, + pollers::{ + new_activity_task_buffer, new_workflow_task_buffer, BoxedActPoller, WorkflowTaskPoller, + }, protosext::validate_activity_completion, telemetry::{ metrics::{ @@ -58,7 +60,6 @@ use temporal_sdk_core_protos::{ use tokio::sync::mpsc::unbounded_channel; use tokio_util::sync::CancellationToken; -use crate::{abstractions::dbg_panic, pollers::BoxedActPoller}; #[cfg(test)] use { crate::{ @@ -95,7 +96,15 @@ pub struct Worker { #[async_trait::async_trait] impl WorkerTrait for Worker { async fn poll_workflow_activation(&self) -> Result { - self.next_workflow_activation().await + self.next_workflow_activation().await.map(|mut a| { + // Attach this worker's Build ID to the activation if appropriate. This is done here + // to avoid cloning the ID for every workflow instance. Can be lowered when + // https://github.com/temporalio/sdk-core/issues/567 is done + if !a.is_replaying { + a.build_id_for_current_task = self.config.worker_build_id.clone(); + } + a + }) } #[instrument(skip(self))] diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 4e13ca4ce..73bceeb0e 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -123,6 +123,8 @@ pub(crate) struct WorkflowMachines { history_size_bytes: u64, /// Set on each WFT started event continue_as_new_suggested: bool, + /// Set if the current WFT is already complete and that completion event had a build id in it. + current_wft_build_id: Option, all_machines: SlotMap, /// If a machine key is in this map, that machine was created internally by core, not as a @@ -263,6 +265,7 @@ impl WorkflowMachines { observed_internal_flags: Rc::new(RefCell::new(observed_internal_flags)), history_size_bytes: 0, continue_as_new_suggested: false, + current_wft_build_id: None, all_machines: Default::default(), machine_is_core_created: Default::default(), machines_by_event_id: Default::default(), @@ -425,6 +428,7 @@ impl WorkflowMachines { .collect(), history_size_bytes: self.history_size_bytes, continue_as_new_suggested: self.continue_as_new_suggested, + build_id_for_current_task: self.current_wft_build_id.clone().unwrap_or_default(), } } @@ -508,18 +512,6 @@ impl WorkflowMachines { return Ok(0); } - fn update_internal_flags(me: &mut WorkflowMachines) { - // Update observed patches with any that were used in the task - if let Some(next_complete) = me - .last_history_from_server - .peek_next_wft_completed(me.last_processed_event) - { - (*me.observed_internal_flags) - .borrow_mut() - .add_from_complete(next_complete); - } - } - fn get_processable_messages( me: &mut WorkflowMachines, for_event_id: i64, @@ -542,10 +534,23 @@ impl WorkflowMachines { ret } - // We update the internal flags before applying the current task (peeking to the completion - // of this task), and also at the end (peeking to the completion of the task that lang is - // about to generate commands for, and for which we will want those flags active). - update_internal_flags(self); + // Peek to the next WFT complete and update ourselves with data we might need in it. + if let Some(next_complete) = self + .last_history_from_server + .peek_next_wft_completed(self.last_processed_event) + { + // We update the internal flags before applying the current task + (*self.observed_internal_flags) + .borrow_mut() + .add_from_complete(next_complete); + // Save this tasks' Build ID if it had one + if let Some(bid) = next_complete.worker_version.as_ref().map(|wv| &wv.build_id) { + self.current_wft_build_id = Some(bid.to_string()); + } else { + // Otherwise we do not want to keep anything previously stored + self.current_wft_build_id = None; + } + } let last_handled_wft_started_id = self.current_started_event_id; let (events, has_final_event) = match self @@ -724,9 +729,6 @@ impl WorkflowMachines { } } - // TODO: All tests pass with this removed. - update_internal_flags(self); - if !self.replaying { self.metrics.wf_task_replay_latency(replay_start.elapsed()); } diff --git a/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto b/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto index 605c679e2..70718729f 100644 --- a/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto +++ b/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto @@ -40,6 +40,10 @@ message WorkflowActivation { uint64 history_size_bytes = 7; // Set true if the most recent WFT started event had this suggestion bool continue_as_new_suggested = 8; + // Set to the Build ID of the worker that processed this task, which may be empty. During replay + // this id may not equal the id of the replaying worker. If not replaying and this worker has + // a defined Build ID, it will equal that ID. It will also be empty for evict-only activations. + string build_id_for_current_task = 9; } message WorkflowActivationJob { diff --git a/sdk-core-protos/src/history_builder.rs b/sdk-core-protos/src/history_builder.rs index 6f5cfe7ef..9d4edc195 100644 --- a/sdk-core-protos/src/history_builder.rs +++ b/sdk-core-protos/src/history_builder.rs @@ -556,6 +556,11 @@ impl TestHistoryBuilder { Self::set_flags(self.events.iter_mut().rev(), core, lang) } + /// Get the event ID of the most recently added event + pub fn current_event_id(&self) -> i64 { + self.current_event_id + } + fn set_flags<'a>( mut events: impl Iterator, core: &[u32], diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index 5cf23dfc3..b650e0b66 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -456,6 +456,7 @@ pub mod coresdk { available_internal_flags: vec![], history_size_bytes: 0, continue_as_new_suggested: false, + build_id_for_current_task: "".to_string(), } } diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index b2bd877ca..e14d7b58c 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -96,12 +96,13 @@ impl WfCtxProtectedDat { } #[derive(Clone, Debug, Default)] -pub struct WfContextSharedData { +pub(crate) struct WfContextSharedData { /// Maps change ids -> resolved status pub changes: HashMap, pub is_replaying: bool, pub wf_time: Option, pub history_length: u32, + pub current_build_id: Option, } // TODO: Dataconverter type interface to replace Payloads here. Possibly just use serde @@ -157,6 +158,12 @@ impl WfContext { self.shared.read().history_length } + /// Return the Build ID as it was when this point in the workflow was first reached. If this + /// code is being executed for the first time, return this Worker's Build ID if it has one. + pub fn current_build_id(&self) -> Option { + self.shared.read().current_build_id.clone() + } + /// A future that resolves if/when the workflow is cancelled pub async fn cancelled(&mut self) { if *self.am_cancelled.borrow() { diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index f4665eae7..06e91e0da 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -320,6 +320,11 @@ impl Future for WorkflowFuture { wlock.is_replaying = activation.is_replaying; wlock.wf_time = activation.timestamp.try_into_or_none(); wlock.history_length = activation.history_length; + wlock.current_build_id = if activation.build_id_for_current_task.is_empty() { + None + } else { + Some(activation.build_id_for_current_task) + }; } let mut die_of_eviction_when_done = false;