Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic client endpoints #676

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 241 additions & 19 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ use temporal_sdk_core_protos::{
},
TaskToken,
};
use tokio::sync::{
mpsc::{error::SendError, Sender},
Mutex as TokioMutex,
};
use tonic::{
body::BoxBody,
client::GrpcService,
Expand All @@ -71,7 +75,7 @@ use tonic::{
transport::{Certificate, Channel, Endpoint, Identity},
Code, Status,
};
use tower::ServiceBuilder;
use tower::{discover::Change, ServiceBuilder};
use url::Url;
use uuid::Uuid;

Expand All @@ -87,6 +91,9 @@ static LONG_POLL_METHOD_NAMES: [&str; 3] = [
const LONG_POLL_TIMEOUT: Duration = Duration::from_secs(70);
const OTHER_CALL_TIMEOUT: Duration = Duration::from_secs(30);

/// Buffer size for the channel that listens to change events
const DEFAULT_BUFFER_SIZE: usize = 1024;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this can probably be a lot smaller (though I doubt it matters a whole lot either way).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied the constant used internally by tonic, frankly, no idea about the right size... But, yes, being a control channel, that looks large...


type Result<T, E = tonic::Status> = std::result::Result<T, E>;

/// Options for the connection to the temporal server. Construct with [ClientOptionsBuilder]
Expand Down Expand Up @@ -134,8 +141,94 @@ pub struct ClientOptions {
pub keep_alive: Option<ClientKeepAliveConfig>,
}

/// Connection options that can be dynamically updated. Construct with [ClientOptionsUpdateBuilder].
/// All fields are optional, and will be ignored if not set. To disable optional fields in [ClientOptions],
/// set field to a tombstone specified in the field description.
#[derive(Clone, Debug, Default, derive_builder::Builder)]
#[non_exhaustive]
pub struct ClientOptionsUpdate {
Copy link
Member

@cretz cretz Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the entire ClientOptions struct be reused instead? Not every field has to be optional. You can require the callers provide the same data they did for original client options if they don't want to change it. The lang side doesn't need help here.

(would also be neat if some of the runtime options like headers was set/updated via this common mechanism and didn't require reconnect, but that doesn't have to be now)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably makes decent sense too. I imagine the lang-side API will typically involve the user copying some existing options and changing them, in which case this all works out fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the only fields that I can change dynamically, because they are associated with the endpoint, and I think it makes it more clear what can be changed with a separate struct. Also, fields like target_url are not optional in ClientOptions but need to be optional in update, or remember the original value set, which is a pain with concurrent updates...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with swapping out the whole options behind a mutex which I mentioned in another comment, then the other fields should be changeable too I think, which could work out nicely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things like retry_config are used by the Retry wrapper that I cannot change. client_name or client_version or identity may be used in metrics, we probably don't want to change them, and I cannot see a use case to change them dynamically... We could fail the update if they don't match.
I still think that is easier in the API to do deltas with only the changeable attributes, but internally we could use ClientOptions instead of a changeset, if we are ok with a mutex on options. We would still need some state for the updates that does not belong in options, such as the channel, the version id,... but we could probably lock them independently from options...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't they be changed? I agree if they can't be changed it makes sense to accept something that only allows changing what can be. But, I'm not sure I follow why we couldn't change those. They are accessed each time a request is made - seems like changing them would affect subsequent requests and work properly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for retry_config we cache it inside:
impl RetryClient {
/// Use the provided retry config with the provided client
pub fn new(client: SG, retry_config: RetryConfig) -> Self {
Self {
client,
retry_config: Arc::new(retry_config),
}
}
}
we could change that if needed, but not sure if it is that important to dynamically change retry_config...

The others we can, but the question is whether we should, if it is used in metrics, or server side, it may be confusing, and
I don't see much value of changing them dynamically...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to actually expose those to end users, but I take your point. Especially since the RetryClient is a level higher. I'm fine with having just the diff struct.

/// The URL of the Temporal server to connect to.
#[builder(setter(into, strip_option), default)]
pub target_url: Option<Url>,

/// If specified, use TLS as configured by the [TlsConfig] struct. If this is set, core will
/// attempt to use TLS when connecting to the Temporal server. Lang SDK is expected to pass any
/// certs or keys as bytes, loading them from disk itself if needed.
/// If set with all optional fields to `None` in [TlsConfig], it disables TLS.
#[builder(setter(strip_option), default)]
pub tls_cfg: Option<TlsConfig>,

/// If set, override the origin used when connecting. May be useful in rare situations where tls
/// verification needs to use a different name from what should be set as the `:authority`
/// header.
/// If set with the URI default value, i.e., "/", it disables the origin override.
#[builder(setter(strip_option), default)]
pub override_origin: Option<Uri>,

/// If set (which it is by default), HTTP2 gRPC keep alive will be enabled.
/// If specified with all fields in [ClientKeepAliveConfig] set to zero, it disables `keep_alive`.
#[builder(setter(strip_option), default)]
pub keep_alive: Option<ClientKeepAliveConfig>,
}

impl ClientOptionsUpdate {
/// Override fields that are present in the proposed update.
pub fn merge(&mut self, update: ClientOptionsUpdate) {
if update.target_url.is_some() {
self.target_url = update.target_url;
}
if update.tls_cfg.is_some() {
self.tls_cfg = update.tls_cfg;
}
if update.override_origin.is_some() {
self.override_origin = update.override_origin;
}
if update.keep_alive.is_some() {
self.keep_alive = update.keep_alive;
}
}

/// Transform client options using this update.
pub fn apply(&self, options: &ClientOptions) -> ClientOptions {
let mut result = options.clone();
if let Some(target_url) = self.target_url.as_ref() {
result.target_url = target_url.clone();
}

if let Some(tls_cfg) = self.tls_cfg.as_ref() {
result.tls_cfg = if *tls_cfg == Default::default() {
// Default, i.e., all fields `None`, is a tombstone for tls_cfg
None
} else {
Some(tls_cfg.clone())
};
}

if let Some(override_origin) = self.override_origin.as_ref() {
// Tombstone is "/"
result.override_origin = if *override_origin == Uri::default() {
None
} else {
Some(override_origin.clone())
}
}

if let Some(keep_alive) = self.keep_alive.as_ref() {
// Tombstone sets all durations to zero
let zero_duration = Duration::new(0, 0);
result.keep_alive =
if keep_alive.interval == zero_duration && keep_alive.timeout == zero_duration {
None
} else {
Some(keep_alive.clone())
}
}

result
}
}
/// Configuration options for TLS
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq)]
pub struct TlsConfig {
/// Bytes representing the root CA certificate used by the server. If not set, and the server's
/// cert is issued by someone the operating system trusts, verification will still work (ex:
Expand All @@ -149,7 +242,7 @@ pub struct TlsConfig {
}

/// If using mTLS, both the client cert and private key must be specified, this contains them.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct ClientTlsConfig {
/// The certificate for this client
pub client_cert: Vec<u8>,
Expand Down Expand Up @@ -272,6 +365,28 @@ pub enum ClientInitError {
SystemInfoCallError(tonic::Status),
}

/// Errors thrown while updating an existing client.
#[derive(thiserror::Error, Debug)]
pub enum ClientUpdateError {
/// Cannot change the channel configuration.
#[error("Cannot send the update: {0:?}")]
ControlChannelError(#[from] SendError<Change<i32, Endpoint>>),
/// Invalid update, cannot create the new endpoint.
#[error("Cannot create an endpoint: {0:?}")]
InvalidUpdateError(#[from] ClientInitError),
}

/// Metadata to enable the dynamic configuration of a client.
#[derive(Clone, Debug)]
pub struct DynamicUpdateInfo {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to be pub

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They may need to access the changeset to get the current options when there is no mutable ref to the client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not accessible as it stands anyway. It's stored as a private field and there's no accessor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We need to decide whether to get rid of changeset and update ClientOptions directly (with mutex), or not, and then either add the accessor or make everything private.

/// Version number of the current update, or zero if no updates have been applied.
version: i32,
/// Control channel sender
tx: Sender<Change<i32, Endpoint>>,
/// Summary of contributions from all the previous updates.
changeset: ClientOptionsUpdate,
}

/// A client with [ClientOptions] attached, which can be passed to initialize workers,
/// or can be used directly. Is cheap to clone.
#[derive(Clone, Debug)]
Expand All @@ -282,6 +397,7 @@ pub struct ConfiguredClient<C> {
/// Capabilities as read from the `get_system_info` RPC call made on client connection
capabilities: Option<get_system_info_response::Capabilities>,
workers: Arc<SlotManager>,
update: Arc<TokioMutex<DynamicUpdateInfo>>,
}

impl<C> ConfiguredClient<C> {
Expand All @@ -306,6 +422,57 @@ impl<C> ConfiguredClient<C> {
pub fn workers(&self) -> Arc<SlotManager> {
self.workers.clone()
}

/// Patches the client options, and then dynamically updates the endpoint with them.
/// If the update fails, the client does not rollback to the previous configuration, and future
/// connections will fail.
Comment on lines +427 to +428
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rough. Why can't we only apply the update after the new endpoint is connected and works? Can't you do an atomic swap of the endpoint where there is no downtime and it only contains a successfully connected endpoint?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I would call this try_update_options and only swap if the connection succeeds. Also I find leaving the old options to be scary since now things aren't in sync.

It'd be fine to put a mutex around the options as well, so that they can be updated too. This also fits better with the idea of using the ClientOptions directly rather than ClientOptionsUpdate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been looking into the atomic swap reusing the id for the endpoint, but the problem is that it s anything but atomic.
When you reuse the id the endpoint gets in a "preparing" queue, the old endpoint still active, when the new one gets connected the swap happens. The problem is that this is all asynchronous, and if there is a problem with the new endpoint there is no visible error, and the old one continuous forever. It is also impossible to know when this swap will happen, or validate that it happened without sending a carefully crafted request. The endpoints are also created in lazy mode, so they need to be part of channel to validate them.
I think the root problem is that this mechanism was designed for load balancing, not dynamic changes, and if you are adding more (similar) endpoints with different targets, you can be more relaxed about when exactly changes happened.
For this reason, I'm forcing the client to fail when there are problems, otherwise they get silently ignored...

Copy link
Member

@cretz cretz Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that this is all asynchronous

Yeah I see that with the balance channel and makes sense. But can we wait to even send the insert until we have connected the endpoint? And can we send the remove after the insert? So logic would be:

endpoint = connect_new()
channel_tx.send(insert(endpoint))
channel_tx.send(remove(old_endpoint_seq))

Also curious, if you send an insert then a remove, is it guaranteed that the next call will be on the inserted? And in the case of what's there now w/ remove-then-insert, is it possible for multi-threaded use that there is a time where no endpoint is there?

I think the root problem is that this mechanism was designed for load balancing, not dynamic changes

I wonder if we can have our own "mutex_channel" or something, or if that's too much (EDIT: I see ::new is crate-private)

The endpoints are also created in lazy mode, so they need to be part of channel to validate them.

This is unfortunate but if we can't eagerly validate, ok. I now wonder if we should go higher than channel. What I mean is if we can swap our own channel for a completely new one instead of updating the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that you can easily swap the channel without creating a new client, and the problem of creating new clients is that there is a lot of shared state associated to them, like eager workflow start workers, or metrics,... that need to be moved across atomically.
The semantics of the control channel are really weak. Remove happens immediately, but inserts are queued in a "preparing" queue, so there is no guarantee that after sending insert ok you are getting the new options.
To avoid the gap you can reuse the id in insert, and then you don't need a remove, but there is no guarantee when, or if, the swap will happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that it is "lazy". It will create the connection with the first request, and we want to make sure that this connection is not created until we finish the upgrade.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should attempt a get-system-info call on the client before swapping it. IMO we want to make sure the connection is created and is a known config to be successful even way before we swap it out. We don't want to swap out lazy or a when given a bad client, the worker will just silently fail for a minute (our default retry iirc) then fatal out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the swap we can check the connection, but that doesn't help for lazy. Internally Tonic defers the connection until it gets the first request. And even if we do a get-system-info, there is a race with someone else doing the first call after dropping the lock, if they grabbed it before the update started, and getting the error...

Copy link
Member

@cretz cretz Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the swap we can check the connection, but that doesn't help for lazy

This is why I'm suggesting a higher level client swap. Don't swap things at the Tonic level, swap the connected Temporal client.

there is a race with someone else doing the first call after dropping the lock, if they grabbed it before the update started, and getting the error

I think we can accept/document this race. If you're really concerned, we can use a separate lock for the entire client update process to prevent concurrent updates and put a short timeout on the get-system-info call, but of course we should never hold a lock during regular client operations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably do this:

pub struct ConfiguredClient<C> {
    client_and_options: ArcSwap<ClientAndOptions>,
    headers: Arc<RwLock<HashMap<String, String>>>,
    /// Capabilities as read from the `get_system_info` RPC call made on client connection
    capabilities: Option<get_system_info_response::Capabilities>,
    workers: Arc<SlotManager>,
}

pub struct ClientAndOptions<C> {
    client: C,
    options: ClientOptions
}

If ArcSwap doesn't work out for whatever reason (it might be hard to use load() references in all cases) then you can do the Arc<RwLock<Arc<ClientAndOptions>>> approach

/// Internally it maintains a mutable changeset so that we can apply a sequence of updates without a
/// mutable client reference. However, this means it cannot update the `options` field.
/// To keep `options` consistent use `update_options_mut` or, when needed, manually `apply()` the
/// changeset to the original options.
pub async fn update_options(
&self,
update: ClientOptionsUpdate,
) -> Result<ClientOptions, ClientUpdateError> {
// We take a "hard" approach for updating. First remove the previous endpoint, and if successful,
// add the updated one. If there is a failure, don't try to recover, and just let the upper layer
// handle the failed client.
// This approach may leave a short window in which the client cannot open new connections. Again,
// we let the upper layer handle this infrequent failure.
// The "soft" approach, i.e., remove-last or override-when-ready, cannot guarantee when the update
// happens, and also the asynchronous update could fail silently.

let mut update_meta = self.update.lock().await;
let remove_change = Change::Remove(update_meta.version);
if let Err(err) = update_meta.tx.send(remove_change).await {
return Err(ClientUpdateError::ControlChannelError(err));
}
update_meta.changeset.merge(update);
let new_options = update_meta.changeset.apply(&self.options);
update_meta.version += 1;
let endpoint = new_options.create_endpoint().await?;
let add_change = Change::Insert(update_meta.version, endpoint);
if let Err(err) = update_meta.tx.send(add_change).await {
return Err(ClientUpdateError::ControlChannelError(err));
}

Ok(new_options)
}

/// Similar to `update_options` but it also updates the `options` field in this `ConfiguredClient`
/// to reflect the changes.
pub async fn update_options_mut(
&mut self,
update: ClientOptionsUpdate,
) -> Result<ClientOptions, ClientUpdateError> {
match self.update_options(update).await {
Ok(new_options) => {
*Arc::get_mut(&mut self.options).unwrap() = new_options.clone();
Ok(new_options)
}
Err(error) => Err(error),
}
}
}

// The configured client is effectively a "smart" (dumb) pointer
Expand Down Expand Up @@ -340,6 +507,26 @@ impl ClientOptions {
Ok(retry_client)
}

/// Creates an endpoint for an HTTP/2 channel.
pub async fn create_endpoint(&self) -> Result<Endpoint, ClientInitError> {
let endpoint = Channel::from_shared(self.target_url.to_string())?;
let endpoint = self.add_tls_to_channel(endpoint).await?;
let endpoint = if let Some(keep_alive) = self.keep_alive.as_ref() {
endpoint
.keep_alive_while_idle(true)
.http2_keep_alive_interval(keep_alive.interval)
.keep_alive_timeout(keep_alive.timeout)
} else {
endpoint
};
let endpoint = if let Some(origin) = self.override_origin.clone() {
endpoint.origin(origin)
} else {
endpoint
};
Ok(endpoint)
}

/// Attempt to establish a connection to the Temporal server and return a gRPC client which is
/// intercepted with retry, default headers functionality, and metrics if provided.
///
Expand All @@ -350,22 +537,15 @@ impl ClientOptions {
headers: Option<Arc<RwLock<HashMap<String, String>>>>,
) -> Result<RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>, ClientInitError>
{
let channel = Channel::from_shared(self.target_url.to_string())?;
let channel = self.add_tls_to_channel(channel).await?;
let channel = if let Some(keep_alive) = self.keep_alive.as_ref() {
channel
.keep_alive_while_idle(true)
.http2_keep_alive_interval(keep_alive.interval)
.keep_alive_timeout(keep_alive.timeout)
} else {
channel
};
let channel = if let Some(origin) = self.override_origin.clone() {
channel.origin(origin)
} else {
channel
};
let channel = channel.connect().await?;
let endpoint = self.create_endpoint().await?;
let (channel, tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE);
let change = Change::Insert(0, endpoint);
if let Err(err) = tx.send(change).await {
return Err(ClientInitError::SystemInfoCallError(tonic::Status::new(
Code::Unavailable,
format!("Cannot initialize channel with endpoint: {:?}", err),
)));
}
let service = ServiceBuilder::new()
.layer_fn(move |channel| GrpcMetricSvc {
inner: channel,
Expand All @@ -385,6 +565,11 @@ impl ClientOptions {
options: Arc::new(self.clone()),
capabilities: None,
workers: Arc::new(SlotManager::new()),
update: Arc::new(TokioMutex::new(DynamicUpdateInfo {
version: 0,
tx,
changeset: Default::default(),
})),
};
match client
.get_system_info(GetSystemInfoRequest::default())
Expand Down Expand Up @@ -1573,4 +1758,41 @@ mod tests {
let opts = builder.keep_alive(None).build().unwrap();
assert!(opts.keep_alive.is_none());
}

#[test]
fn tombstones_reset_options() {
let mut builder = ClientOptionsBuilder::default();
builder
.target_url(Url::parse("https://smolkitty").unwrap())
.client_name("cute-kitty".to_string())
.client_version("0.1.0".to_string())
.tls_cfg(TlsConfig {
client_tls_config: None,
domain: Some("something.cloud".to_string()),
server_root_ca_cert: None,
})
.override_origin(Some("/foo/bar".parse::<Uri>().unwrap()));
// If unset, defaults to Some
let opts = builder.build().unwrap();
let mut builder_update = ClientOptionsUpdateBuilder::default();
let mut first_update = ClientOptionsUpdate::default();
let update = builder_update
.tls_cfg(Default::default())
.override_origin(Default::default())
.keep_alive(ClientKeepAliveConfig {
interval: Duration::new(0, 0),
timeout: Duration::new(0, 0),
})
.build()
.unwrap();
first_update.merge(update);
let new_opts = first_update.apply(&opts);
assert!(new_opts.keep_alive.is_none());
assert!(new_opts.tls_cfg.is_none());
assert!(new_opts.override_origin.is_none());
assert_eq!(
new_opts.target_url,
"https://smolkitty".parse::<Url>().unwrap()
);
}
}
Loading
Loading