Skip to content

Commit

Permalink
Remove CaptureSmithyConnectionWrapper (#3045)
Browse files Browse the repository at this point in the history
_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
jdisanti authored Oct 10, 2023
1 parent e61fb6f commit 0f90806
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 82 deletions.
56 changes: 1 addition & 55 deletions rust-runtime/aws-smithy-http/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use std::fmt::{Debug, Formatter};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

/// Metadata that tracks the state of an active connection.
#[derive(Clone)]
Expand Down Expand Up @@ -51,57 +51,3 @@ impl Debug for ConnectionMetadata {
.finish()
}
}

type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;

/// State for a middleware that will monitor and manage connections.
#[allow(missing_debug_implementations)]
#[derive(Clone, Default)]
pub struct CaptureSmithyConnection {
loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
}

impl CaptureSmithyConnection {
/// Create a new connection monitor.
pub fn new() -> Self {
Self {
loader: Default::default(),
}
}

/// Set the retriever that will capture the `hyper` connection.
pub fn set_connection_retriever<F>(&self, f: F)
where
F: Fn() -> Option<ConnectionMetadata> + Send + Sync + 'static,
{
*self.loader.lock().unwrap() = Some(Box::new(f));
}

/// Get the associated connection metadata.
pub fn get(&self) -> Option<ConnectionMetadata> {
match self.loader.lock().unwrap().as_ref() {
Some(loader) => loader(),
None => {
tracing::debug!("no loader was set on the CaptureSmithyConnection");
None
}
}
}
}

#[cfg(test)]
mod test {
use crate::connection::{CaptureSmithyConnection, ConnectionMetadata};

#[test]
#[allow(clippy::redundant_clone)]
fn retrieve_connection_metadata() {
let retriever = CaptureSmithyConnection::new();
let retriever_clone = retriever.clone();
assert!(retriever.get().is_none());
retriever.set_connection_retriever(|| Some(ConnectionMetadata::new(true, None, || {})));

assert!(retriever.get().is_some());
assert!(retriever_clone.get().is_some());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

use aws_smithy_http::connection::{CaptureSmithyConnection, ConnectionMetadata};
use aws_smithy_http::connection::ConnectionMetadata;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::{
AfterDeserializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
Expand All @@ -14,6 +14,7 @@ use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use aws_smithy_types::retry::{ErrorKind, ReconnectMode, RetryConfig};
use std::fmt;
use std::sync::{Arc, Mutex};
use tracing::{debug, error};

/// An interceptor for poisoning connections in response to certain events.
Expand Down Expand Up @@ -52,11 +53,11 @@ impl Interceptor for ConnectionPoisoningInterceptor {
_runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let capture_smithy_connection = CaptureSmithyConnectionWrapper::new();
let capture_smithy_connection = CaptureSmithyConnection::new();
context
.request_mut()
.extensions_mut()
.insert(capture_smithy_connection.clone_inner());
.insert(capture_smithy_connection.clone());
cfg.interceptor_state().store_put(capture_smithy_connection);

Ok(())
Expand All @@ -72,7 +73,7 @@ impl Interceptor for ConnectionPoisoningInterceptor {
.load::<RetryConfig>()
.map(RetryConfig::reconnect_mode)
.unwrap_or(ReconnectMode::ReconnectOnTransientError);
let captured_connection = cfg.load::<CaptureSmithyConnectionWrapper>().cloned();
let captured_connection = cfg.load::<CaptureSmithyConnection>().cloned();
let retry_classifiers = runtime_components
.retry_classifiers()
.ok_or("retry classifiers are required for connection poisoning to work")?;
Expand Down Expand Up @@ -101,46 +102,66 @@ impl Interceptor for ConnectionPoisoningInterceptor {
}
}

// TODO(enableNewSmithyRuntimeCleanup): A storable wrapper won't be needed anymore once we absorb aws_smithy_http into the new runtime crate.
/// A wrapper around CaptureSmithyConnection that implements `Storable` so that it can be added to the `ConfigBag`.
type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;

/// State for a middleware that will monitor and manage connections.
#[allow(missing_debug_implementations)]
#[derive(Clone, Default)]
pub struct CaptureSmithyConnectionWrapper {
inner: CaptureSmithyConnection,
pub struct CaptureSmithyConnection {
loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
}

impl CaptureSmithyConnectionWrapper {
/// Creates a new `CaptureSmithyConnectionWrapper`.
impl CaptureSmithyConnection {
/// Create a new connection monitor.
pub fn new() -> Self {
Self {
inner: CaptureSmithyConnection::new(),
loader: Default::default(),
}
}

/// Returns a reference to the inner `CaptureSmithyConnection`.
pub fn clone_inner(&self) -> CaptureSmithyConnection {
self.inner.clone()
/// Set the retriever that will capture the `hyper` connection.
pub fn set_connection_retriever<F>(&self, f: F)
where
F: Fn() -> Option<ConnectionMetadata> + Send + Sync + 'static,
{
*self.loader.lock().unwrap() = Some(Box::new(f));
}

/// Returns the captured connection metadata, if any.
/// Get the associated connection metadata.
pub fn get(&self) -> Option<ConnectionMetadata> {
self.inner.get()
match self.loader.lock().unwrap().as_ref() {
Some(loader) => loader(),
None => {
tracing::debug!("no loader was set on the CaptureSmithyConnection");
None
}
}
}
}

/// Sets the connection retriever function.
pub fn set_connection_retriever<F>(&self, f: F)
where
F: Fn() -> Option<ConnectionMetadata> + Send + Sync + 'static,
{
self.inner.set_connection_retriever(f)
impl fmt::Debug for CaptureSmithyConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "CaptureSmithyConnection")
}
}

impl Storable for CaptureSmithyConnectionWrapper {
impl Storable for CaptureSmithyConnection {
type Storer = StoreReplace<Self>;
}

impl fmt::Debug for CaptureSmithyConnectionWrapper {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "CaptureSmithyConnectionWrapper")
#[cfg(test)]
mod test {
use super::*;

#[test]
#[allow(clippy::redundant_clone)]
fn retrieve_connection_metadata() {
let retriever = CaptureSmithyConnection::new();
let retriever_clone = retriever.clone();
assert!(retriever.get().is_none());
retriever.set_connection_retriever(|| Some(ConnectionMetadata::new(true, None, || {})));

assert!(retriever.get().is_some());
assert!(retriever_clone.get().is_some());
}
}
3 changes: 2 additions & 1 deletion rust-runtime/aws-smithy-runtime/src/client/http/hyper_014.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

use crate::client::http::connection_poisoning::CaptureSmithyConnection;
use aws_smithy_async::future::timeout::TimedOutError;
use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
use aws_smithy_http::body::SdkBody;
use aws_smithy_http::connection::{CaptureSmithyConnection, ConnectionMetadata};
use aws_smithy_http::connection::ConnectionMetadata;
use aws_smithy_http::result::ConnectorError;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::http::{
Expand Down

0 comments on commit 0f90806

Please sign in to comment.