diff --git a/client/src/lib.rs b/client/src/lib.rs index 622955e24..ecdf19d9b 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -62,6 +62,10 @@ use temporal_sdk_core_protos::{ }, TaskToken, }; +use tokio::sync::{ + mpsc::{error::SendError, Sender}, + Mutex as TokioMutex, +}; use tonic::{ body::BoxBody, client::GrpcService, @@ -71,7 +75,7 @@ use tonic::{ transport::{Certificate, Channel, Endpoint, Identity}, Code, Status, }; -use tower::ServiceBuilder; +use tower::{discover::Change, ServiceBuilder}; use url::Url; use uuid::Uuid; @@ -87,6 +91,9 @@ static LONG_POLL_METHOD_NAMES: [&str; 3] = [ const LONG_POLL_TIMEOUT: Duration = Duration::from_secs(70); const OTHER_CALL_TIMEOUT: Duration = Duration::from_secs(30); +/// Buffer size for the channel that listens to change events +const DEFAULT_BUFFER_SIZE: usize = 1024; + type Result = std::result::Result; /// Options for the connection to the temporal server. Construct with [ClientOptionsBuilder] @@ -134,8 +141,94 @@ pub struct ClientOptions { pub keep_alive: Option, } +/// Connection options that can be dynamically updated. Construct with [ClientOptionsUpdateBuilder]. +/// All fields are optional, and will be ignored if not set. To disable optional fields in [ClientOptions], +/// set field to a tombstone specified in the field description. +#[derive(Clone, Debug, Default, derive_builder::Builder)] +#[non_exhaustive] +pub struct ClientOptionsUpdate { + /// The URL of the Temporal server to connect to. + #[builder(setter(into, strip_option), default)] + pub target_url: Option, + + /// If specified, use TLS as configured by the [TlsConfig] struct. If this is set, core will + /// attempt to use TLS when connecting to the Temporal server. Lang SDK is expected to pass any + /// certs or keys as bytes, loading them from disk itself if needed. + /// If set with all optional fields to `None` in [TlsConfig], it disables TLS. + #[builder(setter(strip_option), default)] + pub tls_cfg: Option, + + /// If set, override the origin used when connecting. May be useful in rare situations where tls + /// verification needs to use a different name from what should be set as the `:authority` + /// header. + /// If set with the URI default value, i.e., "/", it disables the origin override. + #[builder(setter(strip_option), default)] + pub override_origin: Option, + + /// If set (which it is by default), HTTP2 gRPC keep alive will be enabled. + /// If specified with all fields in [ClientKeepAliveConfig] set to zero, it disables `keep_alive`. + #[builder(setter(strip_option), default)] + pub keep_alive: Option, +} + +impl ClientOptionsUpdate { + /// Override fields that are present in the proposed update. + pub fn merge(&mut self, update: ClientOptionsUpdate) { + if update.target_url.is_some() { + self.target_url = update.target_url; + } + if update.tls_cfg.is_some() { + self.tls_cfg = update.tls_cfg; + } + if update.override_origin.is_some() { + self.override_origin = update.override_origin; + } + if update.keep_alive.is_some() { + self.keep_alive = update.keep_alive; + } + } + + /// Transform client options using this update. + pub fn apply(&self, options: &ClientOptions) -> ClientOptions { + let mut result = options.clone(); + if let Some(target_url) = self.target_url.as_ref() { + result.target_url = target_url.clone(); + } + + if let Some(tls_cfg) = self.tls_cfg.as_ref() { + result.tls_cfg = if *tls_cfg == Default::default() { + // Default, i.e., all fields `None`, is a tombstone for tls_cfg + None + } else { + Some(tls_cfg.clone()) + }; + } + + if let Some(override_origin) = self.override_origin.as_ref() { + // Tombstone is "/" + result.override_origin = if *override_origin == Uri::default() { + None + } else { + Some(override_origin.clone()) + } + } + + if let Some(keep_alive) = self.keep_alive.as_ref() { + // Tombstone sets all durations to zero + let zero_duration = Duration::new(0, 0); + result.keep_alive = + if keep_alive.interval == zero_duration && keep_alive.timeout == zero_duration { + None + } else { + Some(keep_alive.clone()) + } + } + + result + } +} /// Configuration options for TLS -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq)] pub struct TlsConfig { /// Bytes representing the root CA certificate used by the server. If not set, and the server's /// cert is issued by someone the operating system trusts, verification will still work (ex: @@ -149,7 +242,7 @@ pub struct TlsConfig { } /// If using mTLS, both the client cert and private key must be specified, this contains them. -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub struct ClientTlsConfig { /// The certificate for this client pub client_cert: Vec, @@ -272,6 +365,28 @@ pub enum ClientInitError { SystemInfoCallError(tonic::Status), } +/// Errors thrown while updating an existing client. +#[derive(thiserror::Error, Debug)] +pub enum ClientUpdateError { + /// Cannot change the channel configuration. + #[error("Cannot send the update: {0:?}")] + ControlChannelError(#[from] SendError>), + /// Invalid update, cannot create the new endpoint. + #[error("Cannot create an endpoint: {0:?}")] + InvalidUpdateError(#[from] ClientInitError), +} + +/// Metadata to enable the dynamic configuration of a client. +#[derive(Clone, Debug)] +pub struct DynamicUpdateInfo { + /// Version number of the current update, or zero if no updates have been applied. + version: i32, + /// Control channel sender + tx: Sender>, + /// Summary of contributions from all the previous updates. + changeset: ClientOptionsUpdate, +} + /// A client with [ClientOptions] attached, which can be passed to initialize workers, /// or can be used directly. Is cheap to clone. #[derive(Clone, Debug)] @@ -282,6 +397,7 @@ pub struct ConfiguredClient { /// Capabilities as read from the `get_system_info` RPC call made on client connection capabilities: Option, workers: Arc, + update: Arc>, } impl ConfiguredClient { @@ -306,6 +422,57 @@ impl ConfiguredClient { pub fn workers(&self) -> Arc { self.workers.clone() } + + /// Patches the client options, and then dynamically updates the endpoint with them. + /// If the update fails, the client does not rollback to the previous configuration, and future + /// connections will fail. + /// Internally it maintains a mutable changeset so that we can apply a sequence of updates without a + /// mutable client reference. However, this means it cannot update the `options` field. + /// To keep `options` consistent use `update_options_mut` or, when needed, manually `apply()` the + /// changeset to the original options. + pub async fn update_options( + &self, + update: ClientOptionsUpdate, + ) -> Result { + // We take a "hard" approach for updating. First remove the previous endpoint, and if successful, + // add the updated one. If there is a failure, don't try to recover, and just let the upper layer + // handle the failed client. + // This approach may leave a short window in which the client cannot open new connections. Again, + // we let the upper layer handle this infrequent failure. + // The "soft" approach, i.e., remove-last or override-when-ready, cannot guarantee when the update + // happens, and also the asynchronous update could fail silently. + + let mut update_meta = self.update.lock().await; + let remove_change = Change::Remove(update_meta.version); + if let Err(err) = update_meta.tx.send(remove_change).await { + return Err(ClientUpdateError::ControlChannelError(err)); + } + update_meta.changeset.merge(update); + let new_options = update_meta.changeset.apply(&self.options); + update_meta.version += 1; + let endpoint = new_options.create_endpoint().await?; + let add_change = Change::Insert(update_meta.version, endpoint); + if let Err(err) = update_meta.tx.send(add_change).await { + return Err(ClientUpdateError::ControlChannelError(err)); + } + + Ok(new_options) + } + + /// Similar to `update_options` but it also updates the `options` field in this `ConfiguredClient` + /// to reflect the changes. + pub async fn update_options_mut( + &mut self, + update: ClientOptionsUpdate, + ) -> Result { + match self.update_options(update).await { + Ok(new_options) => { + *Arc::get_mut(&mut self.options).unwrap() = new_options.clone(); + Ok(new_options) + } + Err(error) => Err(error), + } + } } // The configured client is effectively a "smart" (dumb) pointer @@ -340,6 +507,26 @@ impl ClientOptions { Ok(retry_client) } + /// Creates an endpoint for an HTTP/2 channel. + pub async fn create_endpoint(&self) -> Result { + let endpoint = Channel::from_shared(self.target_url.to_string())?; + let endpoint = self.add_tls_to_channel(endpoint).await?; + let endpoint = if let Some(keep_alive) = self.keep_alive.as_ref() { + endpoint + .keep_alive_while_idle(true) + .http2_keep_alive_interval(keep_alive.interval) + .keep_alive_timeout(keep_alive.timeout) + } else { + endpoint + }; + let endpoint = if let Some(origin) = self.override_origin.clone() { + endpoint.origin(origin) + } else { + endpoint + }; + Ok(endpoint) + } + /// Attempt to establish a connection to the Temporal server and return a gRPC client which is /// intercepted with retry, default headers functionality, and metrics if provided. /// @@ -350,22 +537,15 @@ impl ClientOptions { headers: Option>>>, ) -> Result>, ClientInitError> { - let channel = Channel::from_shared(self.target_url.to_string())?; - let channel = self.add_tls_to_channel(channel).await?; - let channel = if let Some(keep_alive) = self.keep_alive.as_ref() { - channel - .keep_alive_while_idle(true) - .http2_keep_alive_interval(keep_alive.interval) - .keep_alive_timeout(keep_alive.timeout) - } else { - channel - }; - let channel = if let Some(origin) = self.override_origin.clone() { - channel.origin(origin) - } else { - channel - }; - let channel = channel.connect().await?; + let endpoint = self.create_endpoint().await?; + let (channel, tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE); + let change = Change::Insert(0, endpoint); + if let Err(err) = tx.send(change).await { + return Err(ClientInitError::SystemInfoCallError(tonic::Status::new( + Code::Unavailable, + format!("Cannot initialize channel with endpoint: {:?}", err), + ))); + } let service = ServiceBuilder::new() .layer_fn(move |channel| GrpcMetricSvc { inner: channel, @@ -385,6 +565,11 @@ impl ClientOptions { options: Arc::new(self.clone()), capabilities: None, workers: Arc::new(SlotManager::new()), + update: Arc::new(TokioMutex::new(DynamicUpdateInfo { + version: 0, + tx, + changeset: Default::default(), + })), }; match client .get_system_info(GetSystemInfoRequest::default()) @@ -1573,4 +1758,41 @@ mod tests { let opts = builder.keep_alive(None).build().unwrap(); assert!(opts.keep_alive.is_none()); } + + #[test] + fn tombstones_reset_options() { + let mut builder = ClientOptionsBuilder::default(); + builder + .target_url(Url::parse("https://smolkitty").unwrap()) + .client_name("cute-kitty".to_string()) + .client_version("0.1.0".to_string()) + .tls_cfg(TlsConfig { + client_tls_config: None, + domain: Some("something.cloud".to_string()), + server_root_ca_cert: None, + }) + .override_origin(Some("/foo/bar".parse::().unwrap())); + // If unset, defaults to Some + let opts = builder.build().unwrap(); + let mut builder_update = ClientOptionsUpdateBuilder::default(); + let mut first_update = ClientOptionsUpdate::default(); + let update = builder_update + .tls_cfg(Default::default()) + .override_origin(Default::default()) + .keep_alive(ClientKeepAliveConfig { + interval: Duration::new(0, 0), + timeout: Duration::new(0, 0), + }) + .build() + .unwrap(); + first_update.merge(update); + let new_opts = first_update.apply(&opts); + assert!(new_opts.keep_alive.is_none()); + assert!(new_opts.tls_cfg.is_none()); + assert!(new_opts.override_origin.is_none()); + assert_eq!( + new_opts.target_url, + "https://smolkitty".parse::().unwrap() + ); + } } diff --git a/tests/integ_tests/ephemeral_server_tests.rs b/tests/integ_tests/ephemeral_server_tests.rs index ab714b416..68cfdb809 100644 --- a/tests/integ_tests/ephemeral_server_tests.rs +++ b/tests/integ_tests/ephemeral_server_tests.rs @@ -1,5 +1,8 @@ use std::time::{SystemTime, UNIX_EPOCH}; -use temporal_client::{ClientOptionsBuilder, TestService, WorkflowService}; +use temporal_client::{ + ClientOptionsBuilder, ClientOptionsUpdateBuilder, ConfiguredClient, RetryClient, + TemporalServiceClientWithMetrics, TestService, WorkflowService, +}; use temporal_sdk_core::ephemeral_server::{ EphemeralExe, EphemeralExeVersion, EphemeralServer, TemporalDevServerConfigBuilder, TemporaliteConfigBuilder, TestServerConfigBuilder, @@ -30,6 +33,103 @@ async fn temporal_cli_fixed() { server.shutdown().await.unwrap(); } +#[tokio::test] +async fn temporal_cli_update_client_options() { + // Start two servers in different ports, create a client for the first one. + // Shutdown the server, update the ClientConfig with the other port, and + // show the updated client connects to the second one. + // Revert the client changes, and try with a fresh server using the first port. + let config1 = TemporalDevServerConfigBuilder::default() + .exe(default_cached_download()) + .port(Some(10124)) + .build() + .unwrap(); + let mut server1 = config1.start_server().await.unwrap(); + let config2 = TemporalDevServerConfigBuilder::default() + .exe(default_cached_download()) + .port(Some(10125)) + .build() + .unwrap(); + let mut server2 = config2.start_server().await.unwrap(); + let client_options = ClientOptionsBuilder::default() + .identity("integ_tester".to_string()) + .target_url(Url::try_from(&*format!("http://{}", server1.target)).unwrap()) + .client_name("temporal-core".to_string()) + .client_version("0.1.0".to_string()) + .build() + .unwrap(); + let mut client = client_options + .connect_no_namespace(None, None) + .await + .unwrap(); + assert_ephemeral_server_with_client(&server1, &mut client).await; + server1.shutdown().await.unwrap(); + + let update = ClientOptionsUpdateBuilder::default() + .target_url(Url::try_from(&*format!("http://{}", server2.target)).unwrap()) + .build() + .unwrap(); + assert!(client.get_client().update_options(update).await.is_ok()); + assert_ephemeral_server_with_client(&server2, &mut client).await; + server2.shutdown().await.unwrap(); + + // Revert the port change + let mut server1 = config1.start_server().await.unwrap(); + let update = ClientOptionsUpdateBuilder::default() + .target_url(Url::try_from(&*format!("http://{}", server1.target)).unwrap()) + .build() + .unwrap(); + assert!(client.get_client().update_options(update).await.is_ok()); + assert_ephemeral_server_with_client(&server1, &mut client).await; + server1.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn temporal_cli_update_client_options_mut() { + let config1 = TemporalDevServerConfigBuilder::default() + .exe(default_cached_download()) + .port(Some(10125)) + .build() + .unwrap(); + let mut server1 = config1.start_server().await.unwrap(); + let client_options = ClientOptionsBuilder::default() + .identity("integ_tester".to_string()) + .target_url(Url::try_from(&*format!("http://{}", server1.target)).unwrap()) + .client_name("temporal-core".to_string()) + .client_version("0.1.0".to_string()) + .build() + .unwrap(); + let mut client = client_options + .connect_no_namespace(None, None) + .await + .unwrap(); + server1.shutdown().await.unwrap(); + + let config2 = TemporalDevServerConfigBuilder::default() + .exe(default_cached_download()) + .port(Some(10126)) + .build() + .unwrap(); + let mut server2 = config2.start_server().await.unwrap(); + let update = ClientOptionsUpdateBuilder::default() + .target_url(Url::try_from(&*format!("http://{}", server2.target)).unwrap()) + .build() + .unwrap(); + assert!(client + .get_client_mut() + .update_options_mut(update) + .await + .is_ok()); + assert_ephemeral_server_with_client(&server2, &mut client).await; + + // Check that the options field was modified + assert_eq!( + client.get_client().options().target_url, + Url::try_from("http://127.0.0.1:10126").unwrap(), + ); + server2.shutdown().await.unwrap(); +} + #[tokio::test] async fn temporalite_default() { let config = TemporaliteConfigBuilder::default() @@ -127,6 +227,13 @@ async fn assert_ephemeral_server(server: &EphemeralServer) { .connect_no_namespace(None, None) .await .unwrap(); + assert_ephemeral_server_with_client(server, &mut client).await; +} + +async fn assert_ephemeral_server_with_client( + server: &EphemeralServer, + client: &mut RetryClient>, +) { let resp = client .describe_namespace(DescribeNamespaceRequest { namespace: NAMESPACE.to_string(),