Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attach current Build ID to workflow activations #619

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use temporal_sdk_core_protos::{
default_act_sched, default_wes_attribs,
temporal::api::{
command::v1::command::Attributes,
common::v1::{Payload, RetryPolicy},
common::v1::{Payload, RetryPolicy, WorkerVersionStamp},
enums::v1::{EventType, WorkflowTaskFailedCause},
failure::v1::Failure,
history::v1::{
Expand Down Expand Up @@ -2807,3 +2807,54 @@ async fn use_compatible_version_flag(
.await
.unwrap();
}

#[tokio::test]
async fn sets_build_id_from_wft_complete() {
let wfid = "fake_wf_id";

let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
let timer_started_event_id = t.add_by_type(EventType::TimerStarted);
t.add_timer_fired(timer_started_event_id, "1".to_string());
t.add_full_wf_task();
t.modify_event(t.current_event_id(), |he| {
if let history_event::Attributes::WorkflowTaskCompletedEventAttributes(a) =
he.attributes.as_mut().unwrap()
{
a.worker_version = Some(WorkerVersionStamp {
build_id: "enchi-cat".to_string(),
..Default::default()
});
}
});
let timer_started_event_id = t.add_by_type(EventType::TimerStarted);
t.add_timer_fired(timer_started_event_id, "2".to_string());
t.add_full_wf_task();

let mock = mock_workflow_client();
let mut worker = mock_sdk_cfg(
MockPollCfg::from_resp_batches(wfid, t, [ResponseType::AllHistory], mock),
|cfg| {
cfg.worker_build_id = "fierce-predator".to_string();
cfg.max_cached_workflows = 1;
},
);

worker.register_wf(DEFAULT_WORKFLOW_TYPE, |ctx: WfContext| async move {
// First task, it should be empty, since replaying and nothing in first WFT completed
ctx.timer(Duration::from_secs(1)).await;
assert_eq!(ctx.current_build_id(), None);
ctx.timer(Duration::from_secs(1)).await;
assert_eq!(ctx.current_build_id(), Some("enchi-cat".to_string()));
ctx.timer(Duration::from_secs(1)).await;
// Not replaying at this point, so we should see the worker's build id
assert_eq!(ctx.current_build_id(), Some("fierce-predator".to_string()));
Ok(().into())
});
worker
.submit_wf(wfid, DEFAULT_WORKFLOW_TYPE, vec![], Default::default())
.await
.unwrap();
worker.run_until_done().await.unwrap();
}
7 changes: 7 additions & 0 deletions core/src/protosext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ impl TryFrom<PollWorkflowTaskQueueResponse> for ValidPollWFTQResponse {
pub(crate) trait WorkflowActivationExt {
/// Returns true if this activation has one and only one job to perform a legacy query
fn is_legacy_query(&self) -> bool;
/// Augment the activation with the worker's Build ID if appropriate
fn attach_build_id_if_needed(&mut self, build_id: &str);
}

impl WorkflowActivationExt for WorkflowActivation {
Expand All @@ -147,6 +149,11 @@ impl WorkflowActivationExt for WorkflowActivation {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(qr))
}] if qr.query_id == LEGACY_QUERY_ID)
}
fn attach_build_id_if_needed(&mut self, build_id: &str) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing, why would this logic be on a proto struct, it doesn't know when attaching is needed.
I would put it in the worker logic before it hands over the activation or better yet when the activation is created to avoid mutating unnecessarily.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't really be when the activation is created without passing the build id down a few layers where it's not currently needed (and would get cloned a bunch of times). I can just inline the method where it's used though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined it w/ comment

if !self.is_replaying {
self.build_id_for_current_task = build_id.to_string();
}
}
}

/// Create a legacy query failure result
Expand Down
7 changes: 5 additions & 2 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use temporal_sdk_core_protos::{
use tokio::sync::mpsc::unbounded_channel;
use tokio_util::sync::CancellationToken;

use crate::{abstractions::dbg_panic, pollers::BoxedActPoller};
use crate::{abstractions::dbg_panic, pollers::BoxedActPoller, protosext::WorkflowActivationExt};
#[cfg(test)]
use {
crate::{
Expand Down Expand Up @@ -95,7 +95,10 @@ pub struct Worker {
#[async_trait::async_trait]
impl WorkerTrait for Worker {
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollWfError> {
self.next_workflow_activation().await
self.next_workflow_activation().await.map(|mut a| {
a.attach_build_id_if_needed(&self.config.worker_build_id);
a
})
}

#[instrument(skip(self))]
Expand Down
40 changes: 21 additions & 19 deletions core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ pub(crate) struct WorkflowMachines {
history_size_bytes: u64,
/// Set on each WFT started event
continue_as_new_suggested: bool,
/// Set if the current WFT is already complete and that completion event had a build id in it.
current_wft_build_id: Option<String>,

all_machines: SlotMap<MachineKey, Machines>,
/// If a machine key is in this map, that machine was created internally by core, not as a
Expand Down Expand Up @@ -263,6 +265,7 @@ impl WorkflowMachines {
observed_internal_flags: Rc::new(RefCell::new(observed_internal_flags)),
history_size_bytes: 0,
continue_as_new_suggested: false,
current_wft_build_id: None,
all_machines: Default::default(),
machine_is_core_created: Default::default(),
machines_by_event_id: Default::default(),
Expand Down Expand Up @@ -425,6 +428,7 @@ impl WorkflowMachines {
.collect(),
history_size_bytes: self.history_size_bytes,
continue_as_new_suggested: self.continue_as_new_suggested,
build_id_for_current_task: self.current_wft_build_id.clone().unwrap_or_default(),
}
}

Expand Down Expand Up @@ -508,18 +512,6 @@ impl WorkflowMachines {
return Ok(0);
}

fn update_internal_flags(me: &mut WorkflowMachines) {
// Update observed patches with any that were used in the task
if let Some(next_complete) = me
.last_history_from_server
.peek_next_wft_completed(me.last_processed_event)
{
(*me.observed_internal_flags)
.borrow_mut()
.add_from_complete(next_complete);
}
}

fn get_processable_messages(
me: &mut WorkflowMachines,
for_event_id: i64,
Expand All @@ -542,10 +534,23 @@ impl WorkflowMachines {
ret
}

// We update the internal flags before applying the current task (peeking to the completion
// of this task), and also at the end (peeking to the completion of the task that lang is
// about to generate commands for, and for which we will want those flags active).
update_internal_flags(self);
// Peek to the next WFT complete and update ourselves with data we might need in it.
if let Some(next_complete) = self
.last_history_from_server
.peek_next_wft_completed(self.last_processed_event)
{
// We update the internal flags before applying the current task
(*self.observed_internal_flags)
.borrow_mut()
.add_from_complete(next_complete);
// Save this tasks' Build ID if it had one
if let Some(bid) = next_complete.worker_version.as_ref().map(|wv| &wv.build_id) {
self.current_wft_build_id = Some(bid.to_string());
} else {
// Otherwise we do not want to keep anything previously stored
self.current_wft_build_id = None;
}
}

let last_handled_wft_started_id = self.current_started_event_id;
let (events, has_final_event) = match self
Expand Down Expand Up @@ -724,9 +729,6 @@ impl WorkflowMachines {
}
}

// TODO: All tests pass with this removed.
update_internal_flags(self);

if !self.replaying {
self.metrics.wf_task_replay_latency(replay_start.elapsed());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ message WorkflowActivation {
uint64 history_size_bytes = 7;
// Set true if the most recent WFT started event had this suggestion
bool continue_as_new_suggested = 8;
// Set to the Build ID of the worker that processed this task, which may be empty. During replay
// this id may not equal the id of the replaying worker. If not replaying and this worker has
// a defined Build ID, it will equal that ID. It will also be empty for evict-only activations.
string build_id_for_current_task = 9;
}

message WorkflowActivationJob {
Expand Down
5 changes: 5 additions & 0 deletions sdk-core-protos/src/history_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ impl TestHistoryBuilder {
Self::set_flags(self.events.iter_mut().rev(), core, lang)
}

/// Get the event ID of the most recently added event
pub fn current_event_id(&self) -> i64 {
self.current_event_id
}

fn set_flags<'a>(
mut events: impl Iterator<Item = &'a mut HistoryEvent>,
core: &[u32],
Expand Down
1 change: 1 addition & 0 deletions sdk-core-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ pub mod coresdk {
available_internal_flags: vec![],
history_size_bytes: 0,
continue_as_new_suggested: false,
build_id_for_current_task: "".to_string(),
}
}

Expand Down
9 changes: 8 additions & 1 deletion sdk/src/workflow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ impl WfCtxProtectedDat {
}

#[derive(Clone, Debug, Default)]
pub struct WfContextSharedData {
pub(crate) struct WfContextSharedData {
/// Maps change ids -> resolved status
pub changes: HashMap<String, bool>,
pub is_replaying: bool,
pub wf_time: Option<SystemTime>,
pub history_length: u32,
pub current_build_id: Option<String>,
}

// TODO: Dataconverter type interface to replace Payloads here. Possibly just use serde
Expand Down Expand Up @@ -157,6 +158,12 @@ impl WfContext {
self.shared.read().history_length
}

/// Return the Build ID as it was when this point in the workflow was first reached. If this
/// code is being executed for the first time, return this Worker's Build ID if it has one.
pub fn current_build_id(&self) -> Option<String> {
self.shared.read().current_build_id.clone()
}

/// A future that resolves if/when the workflow is cancelled
pub async fn cancelled(&mut self) {
if *self.am_cancelled.borrow() {
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/workflow_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ impl Future for WorkflowFuture {
wlock.is_replaying = activation.is_replaying;
wlock.wf_time = activation.timestamp.try_into_or_none();
wlock.history_length = activation.history_length;
wlock.current_build_id = if activation.build_id_for_current_task.is_empty() {
None
} else {
Some(activation.build_id_for_current_task)
};
}

let mut die_of_eviction_when_done = false;
Expand Down
Loading