Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus Tasks Core->Lang #860

Merged
merged 14 commits into from
Jan 13, 2025
8 changes: 4 additions & 4 deletions .github/workflows/per-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
submodules: recursive
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: 1.80.0
toolchain: 1.84.0
- name: Install protoc
uses: arduino/setup-protoc@v3
with:
Expand All @@ -43,7 +43,7 @@ jobs:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: 1.80.0
toolchain: 1.84.0
- name: Install protoc
uses: arduino/setup-protoc@v3
with:
Expand Down Expand Up @@ -74,7 +74,7 @@ jobs:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: 1.80.0
toolchain: 1.84.0
- name: Install protoc
uses: arduino/setup-protoc@v3
with:
Expand All @@ -99,7 +99,7 @@ jobs:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: 1.80.0
toolchain: 1.84.0
- name: Install protoc
uses: arduino/setup-protoc@v3
with:
Expand Down
5 changes: 5 additions & 0 deletions client/src/workflow_handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ where
&self.info
}

/// Get the client attached to this handle
pub fn client(&self) -> &CT {
&self.client
}

/// Await the result of the workflow execution
pub async fn get_workflow_result(
&self,
Expand Down
43 changes: 23 additions & 20 deletions core-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,18 @@ pub enum WorkerValidationError {
},
}

/// Errors thrown by [crate::Worker::poll_workflow_activation]
/// Errors thrown by [crate::Worker] polling methods
#[derive(thiserror::Error, Debug)]
pub enum PollWfError {
/// [crate::Worker::shutdown] was called, and there are no more replay tasks to be handled. Lang
/// must call [crate::Worker::complete_workflow_activation] for any remaining tasks, and then
/// may exit.
#[error("Core is shut down and there are no more workflow replay tasks")]
pub enum PollError {
/// [crate::Worker::shutdown] was called, and there are no more tasks to be handled from this
/// poll function. Lang must call [crate::Worker::complete_workflow_activation],
/// [crate::Worker::complete_activity_task], or
/// [crate::Worker::complete_nexus_task] for any remaining tasks, and then may exit.
#[error("Core is shut down and there are no more tasks of this kind")]
ShutDown,
/// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
/// errors, so lang should consider this fatal.
#[error("Unhandled grpc error when workflow polling: {0:?}")]
TonicError(#[from] tonic::Status),
}

/// Errors thrown by [crate::Worker::poll_activity_task]
#[derive(thiserror::Error, Debug)]
pub enum PollActivityError {
/// [crate::Worker::shutdown] was called, we will no longer fetch new activity tasks. Lang must
/// ensure it is finished with any workflow replay, see [PollWfError::ShutDown]
#[error("Core is shut down")]
ShutDown,
/// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
/// errors, so lang should consider this fatal.
#[error("Unhandled grpc error when activity polling: {0:?}")]
#[error("Unhandled grpc error when polling: {0:?}")]
TonicError(#[from] tonic::Status),
}

Expand Down Expand Up @@ -67,6 +55,21 @@ pub enum CompleteActivityError {
},
}

/// Errors thrown by [crate::Worker::complete_nexus_task]
#[derive(thiserror::Error, Debug)]
pub enum CompleteNexusError {
/// Lang SDK sent us a malformed nexus completion. This likely means a bug in the lang sdk.
#[error("Lang SDK sent us a malformed nexus completion: {reason}")]
MalformedNexusCompletion {
/// Reason the completion was malformed
reason: String,
},
/// Nexus has not been enabled on this worker. If a user registers any Nexus handlers, the
/// TODO: xxx option must be set to true.
#[error("Nexus is not enabled on this worker")]
NexusNotEnabled,
}

/// Errors we can encounter during workflow processing which we may treat as either WFT failures
/// or whole-workflow failures depending on user preference.
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
Expand Down
29 changes: 24 additions & 5 deletions core-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ pub mod worker;

use crate::{
errors::{
CompleteActivityError, CompleteWfError, PollActivityError, PollWfError,
CompleteActivityError, CompleteNexusError, CompleteWfError, PollError,
WorkerValidationError,
},
worker::WorkerConfig,
};
use temporal_sdk_core_protos::coresdk::{
activity_task::ActivityTask, workflow_activation::WorkflowActivation,
workflow_completion::WorkflowActivationCompletion, ActivityHeartbeat, ActivityTaskCompletion,
activity_task::ActivityTask,
nexus::{NexusTask, NexusTaskCompletion},
workflow_activation::WorkflowActivation,
workflow_completion::WorkflowActivationCompletion,
ActivityHeartbeat, ActivityTaskCompletion,
};

/// This trait is the primary way by which language specific SDKs interact with the core SDK.
Expand All @@ -36,14 +39,23 @@ pub trait Worker: Send + Sync {
/// & job processing.
///
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollWfError>;
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError>;

/// Ask the worker for some work, returning an [ActivityTask]. It is then the language SDK's
/// responsibility to call the appropriate activity code with the provided inputs. Blocks
/// indefinitely until such work is available or [Worker::shutdown] is called.
///
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_activity_task(&self) -> Result<ActivityTask, PollActivityError>;
async fn poll_activity_task(&self) -> Result<ActivityTask, PollError>;

/// Ask the worker for some nexus related work. It is then the language SDK's
/// responsibility to call the appropriate nexus operation handler code with the provided
/// inputs. Blocks indefinitely until such work is available or [Worker::shutdown] is called.
///
/// All tasks must be responded to for shutdown to complete.
///
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_nexus_task(&self) -> Result<NexusTask, PollError>;

/// Tell the worker that a workflow activation has completed. May (and should) be freely called
/// concurrently. The future may take some time to resolve, as fetching more events might be
Expand All @@ -61,6 +73,13 @@ pub trait Worker: Send + Sync {
completion: ActivityTaskCompletion,
) -> Result<(), CompleteActivityError>;

/// Tell the worker that a nexus task has completed. May (and should) be freely called
/// concurrently.
async fn complete_nexus_task(
&self,
completion: NexusTaskCompletion,
) -> Result<(), CompleteNexusError>;

/// Notify the Temporal service that an activity is still alive. Long running activities that
/// take longer than `activity_heartbeat_timeout` to finish must call this function in order to
/// report progress, otherwise the activity will timeout and a new attempt will be scheduled.
Expand Down
37 changes: 34 additions & 3 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::Duration,
};
use temporal_sdk_core_protos::coresdk::{
ActivitySlotInfo, LocalActivitySlotInfo, WorkflowSlotInfo,
ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo,
};

const MAX_CONCURRENT_WFT_POLLS_DEFAULT: usize = 5;
Expand Down Expand Up @@ -56,6 +56,10 @@ pub struct WorkerConfig {
/// worker's task queue
#[builder(default = "5")]
pub max_concurrent_at_polls: usize,
/// Maximum number of concurrent poll nexus task requests we will perform at a time on this
/// worker's task queue
#[builder(default = "5")]
pub max_concurrent_nexus_polls: usize,
/// If set to true this worker will only handle workflow tasks and local activities, it will not
/// poll for activity tasks.
#[builder(default = "false")]
Expand Down Expand Up @@ -114,8 +118,8 @@ pub struct WorkerConfig {
#[builder(default = "5")]
pub fetching_concurrency: usize,

/// If set, core will issue cancels for all outstanding activities after shutdown has been
/// initiated and this amount of time has elapsed.
/// If set, core will issue cancels for all outstanding activities and nexus operations after
/// shutdown has been initiated and this amount of time has elapsed.
#[builder(default)]
pub graceful_shutdown_period: Option<Duration>,

Expand Down Expand Up @@ -153,6 +157,12 @@ pub struct WorkerConfig {
/// Mutually exclusive with `tuner`
#[builder(setter(into, strip_option), default)]
pub max_outstanding_local_activities: Option<usize>,
/// The maximum number of nexus tasks that will ever be given to this worker
/// concurrently
///
/// Mutually exclusive with `tuner`
#[builder(setter(into, strip_option), default)]
pub max_outstanding_nexus_tasks: Option<usize>,
}

impl WorkerConfig {
Expand Down Expand Up @@ -263,6 +273,11 @@ pub trait WorkerTuner {
&self,
) -> Arc<dyn SlotSupplier<SlotKind = LocalActivitySlotKind> + Send + Sync>;

/// Return a [SlotSupplier] for nexus tasks
fn nexus_task_slot_supplier(
&self,
) -> Arc<dyn SlotSupplier<SlotKind = NexusSlotKind> + Send + Sync>;

/// Core will call this at worker initialization time, allowing the implementation to hook up to
/// metrics if any are configured. If not, it will not be called.
fn attach_metrics(&self, metrics: TemporalMeter);
Expand Down Expand Up @@ -364,6 +379,7 @@ pub enum SlotKindType {
Workflow,
Activity,
LocalActivity,
Nexus,
}

#[derive(Debug, Copy, Clone)]
Expand All @@ -372,11 +388,14 @@ pub struct WorkflowSlotKind {}
pub struct ActivitySlotKind {}
#[derive(Debug, Copy, Clone)]
pub struct LocalActivitySlotKind {}
#[derive(Debug, Copy, Clone)]
pub struct NexusSlotKind {}

pub enum SlotInfo<'a> {
Workflow(&'a WorkflowSlotInfo),
Activity(&'a ActivitySlotInfo),
LocalActivity(&'a LocalActivitySlotInfo),
Nexus(&'a NexusSlotInfo),
}

pub trait SlotInfoTrait: prost::Message {
Expand All @@ -397,6 +416,11 @@ impl SlotInfoTrait for LocalActivitySlotInfo {
SlotInfo::LocalActivity(self)
}
}
impl SlotInfoTrait for NexusSlotInfo {
fn downcast(&self) -> SlotInfo {
SlotInfo::Nexus(self)
}
}

pub trait SlotKind {
type Info: SlotInfoTrait;
Expand Down Expand Up @@ -424,3 +448,10 @@ impl SlotKind for LocalActivitySlotKind {
SlotKindType::LocalActivity
}
}
impl SlotKind for NexusSlotKind {
type Info = NexusSlotInfo;

fn kind() -> SlotKindType {
SlotKindType::Nexus
}
}
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ lru = "0.12"
mockall = "0.13"
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client",], optional = true }
opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", ], optional = true }
opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
Expand Down
4 changes: 2 additions & 2 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
use temporal_client::WorkflowOptions;
use temporal_sdk::{ActivityOptions, WfContext};
use temporal_sdk_core_api::{
errors::{CompleteActivityError, PollActivityError},
errors::{CompleteActivityError, PollError},
Worker as WorkerTrait,
};
use temporal_sdk_core_protos::{
Expand Down Expand Up @@ -984,7 +984,7 @@ async fn activity_tasks_from_completion_reserve_slots() {
core.initiate_shutdown();
// Even though this test requests eager activity tasks, none are returned in poll responses.
let err = core.poll_activity_task().await.unwrap_err();
assert_matches!(err, PollActivityError::ShutDown);
assert_matches!(err, PollError::ShutDown);
};
// This wf poll should *not* set the flag that it wants tasks back since both slots are
// occupied
Expand Down
9 changes: 3 additions & 6 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use temporal_client::WorkflowOptions;
use temporal_sdk::{
ActContext, ActivityError, LocalActivityOptions, WfContext, WorkflowFunction, WorkflowResult,
};
use temporal_sdk_core_api::{
errors::{PollActivityError, PollWfError},
Worker,
};
use temporal_sdk_core_api::{errors::PollError, Worker};
use temporal_sdk_core_protos::{
coresdk::{
activity_result::ActivityExecutionResult,
Expand Down Expand Up @@ -1179,8 +1176,8 @@ async fn local_activities_can_be_delivered_during_shutdown() {
};

let (wf_r, act_r) = join!(wf_poller, at_poller);
assert_matches!(wf_r.unwrap_err(), PollWfError::ShutDown);
assert_matches!(act_r.unwrap_err(), PollActivityError::ShutDown);
assert_matches!(wf_r.unwrap_err(), PollError::ShutDown);
assert_matches!(act_r.unwrap_err(), PollError::ShutDown);
}

#[tokio::test]
Expand Down
11 changes: 5 additions & 6 deletions core/src/core_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ mod workflow_cancels;
mod workflow_tasks;

use crate::{
errors::{PollActivityError, PollWfError},
errors::PollError,
test_help::{build_mock_pollers, canned_histories, mock_worker, test_worker_cfg, MockPollCfg},
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
Worker,
};
use futures_util::FutureExt;
use std::sync::LazyLock;
use std::time::Duration;
use std::{sync::LazyLock, time::Duration};
use temporal_sdk_core_api::Worker as WorkerTrait;
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
use tokio::{sync::Barrier, time::sleep};
Expand All @@ -40,7 +39,7 @@ async fn after_shutdown_server_is_not_polled() {
worker.shutdown().await;
assert_matches!(
worker.poll_workflow_activation().await.unwrap_err(),
PollWfError::ShutDown
PollError::ShutDown
);
worker.finalize_shutdown().await;
}
Expand Down Expand Up @@ -86,11 +85,11 @@ async fn shutdown_interrupts_both_polls() {
tokio::join! {
async {
assert_matches!(worker.poll_activity_task().await.unwrap_err(),
PollActivityError::ShutDown);
PollError::ShutDown);
},
async {
assert_matches!(worker.poll_workflow_activation().await.unwrap_err(),
PollWfError::ShutDown);
PollError::ShutDown);
},
async {
// Give polling a bit to get stuck, then shutdown
Expand Down
Loading
Loading