From 2bc8d7fd244a320cf9bbbc26145f36a57ffb87dc Mon Sep 17 00:00:00 2001 From: Harsha Teja Kanna Date: Mon, 29 Jul 2024 19:17:45 -0400 Subject: [PATCH] Upgrade tonic, prost, otel to latest (#782) Co-authored-by: Spencer Judge --- Cargo.toml | 10 +++---- client/Cargo.toml | 6 ++-- client/src/metrics.rs | 2 +- client/src/proxy.rs | 29 +++++++++++-------- client/src/raw.rs | 2 +- client/src/retry.rs | 6 ++-- core/Cargo.toml | 10 +++---- core/src/protosext/mod.rs | 1 - core/src/retry_logic.rs | 5 +--- core/src/telemetry/otel.rs | 16 +++++----- core/src/worker/activities.rs | 7 ++--- core/src/worker/client/mocks.rs | 4 +-- core/src/worker/mod.rs | 2 +- core/src/worker/workflow/driven_workflow.rs | 2 +- .../machines/local_activity_state_machine.rs | 2 +- .../workflow/machines/timer_state_machine.rs | 2 +- .../workflow/machines/workflow_machines.rs | 2 +- .../machines/workflow_task_state_machine.rs | 2 +- core/src/worker/workflow/workflow_stream.rs | 2 +- sdk-core-protos/Cargo.toml | 7 +++-- sdk-core-protos/src/history_builder.rs | 1 - sdk/Cargo.toml | 2 +- sdk/src/activity_context.rs | 7 ++--- sdk/src/workflow_context.rs | 1 - test-utils/src/canned_histories.rs | 2 +- tests/integ_tests/client_tests.rs | 4 +-- 26 files changed, 68 insertions(+), 68 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 126c79916..0e85f05a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,11 +10,11 @@ license-file = "LICENSE.txt" derive_builder = "0.20" derive_more = { version = "0.99", default-features = false, features = ["constructor", "display", "from", "into"] } once_cell = "1.16" -tonic = "0.11" -tonic-build = "0.11" -opentelemetry = { version = "0.23", features = ["metrics"] } -prost = "0.12" -prost-types = "0.12" +tonic = "0.12" +tonic-build = "0.12" +opentelemetry = { version = "0.24", features = ["metrics"] } +prost = "0.13" +prost-types = "0.13" [workspace.lints.rust] unreachable_pub = "warn" diff --git a/client/Cargo.toml b/client/Cargo.toml index 2aa07bc8c..383a32b68 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -22,8 +22,10 @@ derive_builder = { workspace = true } derive_more = "0.99" futures = "0.3" futures-retry = "0.6.0" -http = "0.2" -hyper = { version = "0.14" } +http = "1.1.0" +http-body-util = "0.1" +hyper = { version = "1.4.1" } +hyper-util = "0.1.6" once_cell = { workspace = true } opentelemetry = { workspace = true, features = ["metrics"], optional = true } parking_lot = "0.12" diff --git a/client/src/metrics.rs b/client/src/metrics.rs index a13456435..023aa121e 100644 --- a/client/src/metrics.rs +++ b/client/src/metrics.rs @@ -173,7 +173,7 @@ pub struct GrpcMetricSvc { } impl Service> for GrpcMetricSvc { - type Response = http::Response; + type Response = http::Response; type Error = tonic::transport::Error; type Future = BoxFuture<'static, Result>; diff --git a/client/src/proxy.rs b/client/src/proxy.rs index 7e419a2c2..83a0a5c62 100644 --- a/client/src/proxy.rs +++ b/client/src/proxy.rs @@ -1,12 +1,17 @@ use base64::prelude::*; -use hyper::header; -use std::future::Future; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; +use http_body_util::Empty; +use hyper::{body::Bytes, header}; +use hyper_util::{ + client::legacy::Client, + rt::{TokioExecutor, TokioIo}, +}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tokio::net::TcpStream; -use tonic::transport::Channel; -use tonic::transport::Endpoint; +use tonic::transport::{Channel, Endpoint}; use tower::{service_fn, Service}; /// Options for HTTP CONNECT proxy. @@ -43,12 +48,12 @@ impl HttpConnectProxyOptions { let creds = BASE64_STANDARD.encode(format!("{}:{}", user, pass)); req_build = req_build.header(header::PROXY_AUTHORIZATION, format!("Basic {}", creds)); } - let req = req_build.body(hyper::Body::empty())?; + let req = req_build.body(Empty::::new())?; // We have to create a client with a specific connector because Hyper is // not letting us change the HTTP/2 authority - let client = - hyper::Client::builder().build(OverrideAddrConnector(self.target_addr.clone())); + let client = Client::builder(TokioExecutor::new()) + .build(OverrideAddrConnector(self.target_addr.clone())); // Send request let res = client.request(req).await?; @@ -67,7 +72,7 @@ impl HttpConnectProxyOptions { struct OverrideAddrConnector(String); impl Service for OverrideAddrConnector { - type Response = TcpStream; + type Response = TokioIo; type Error = anyhow::Error; @@ -79,7 +84,7 @@ impl Service for OverrideAddrConnector { fn call(&mut self, _uri: hyper::Uri) -> Self::Future { let target_addr = self.0.clone(); - let fut = async move { Ok(TcpStream::connect(target_addr).await?) }; + let fut = async move { Ok(TokioIo::new(TcpStream::connect(target_addr).await?)) }; Box::pin(fut) } } diff --git a/client/src/raw.rs b/client/src/raw.rs index 0462888a0..4a537009f 100644 --- a/client/src/raw.rs +++ b/client/src/raw.rs @@ -333,7 +333,7 @@ fn req_cloner(cloneme: &Request) -> Request { new_req } -#[derive(Debug)] +#[derive(Clone, Debug)] pub(super) struct AttachMetricLabels { pub(super) labels: Vec, } diff --git a/client/src/retry.rs b/client/src/retry.rs index cabb6ca92..d243a0f60 100644 --- a/client/src/retry.rs +++ b/client/src/retry.rs @@ -481,7 +481,7 @@ where list_open_workflow_executions, maximum_page_size, next_page_token.clone(), - start_time_filter.clone(), + start_time_filter, filters.clone() ) } @@ -498,7 +498,7 @@ where list_closed_workflow_executions, maximum_page_size, next_page_token.clone(), - start_time_filter.clone(), + start_time_filter, filters.clone() ) } @@ -551,7 +551,7 @@ where workflow_id.clone(), run_id.clone(), name.clone(), - wait_policy.clone(), + wait_policy, args.clone() ) } diff --git a/core/Cargo.toml b/core/Cargo.toml index 1fe23a07d..c88205b24 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -22,7 +22,7 @@ ephemeral-server = ["dep:flate2", "dep:reqwest", "dep:tar", "dep:zip"] [dependencies] anyhow = "1.0" async-trait = "0.1" -console-subscriber = { version = "0.3", optional = true } +console-subscriber = { version = "0.4", optional = true } crossbeam-channel = "0.5" crossbeam-queue = "0.3" crossbeam-utils = "0.8" @@ -43,15 +43,15 @@ lru = "0.12" mockall = "0.12" once_cell = { workspace = true } opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.23", features = ["rt-tokio", "metrics"], optional = true } -opentelemetry-otlp = { version = "0.16", features = ["tokio", "metrics"], optional = true } -opentelemetry-prometheus = { version = "0.16", optional = true } +opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true } +opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics"], optional = true } +opentelemetry-prometheus = { version = "0.17", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" pin-project = "1.0" prometheus = "0.13" prost = { workspace = true } -prost-types = { version = "0.5", package = "prost-wkt-types" } +prost-types = { version = "0.6", package = "prost-wkt-types" } rand = "0.8.3" reqwest = { version = "0.12", features = ["json", "stream", "rustls-tls"], default-features = false, optional = true } ringbuf = "0.4" diff --git a/core/src/protosext/mod.rs b/core/src/protosext/mod.rs index fc850d114..8d057ec98 100644 --- a/core/src/protosext/mod.rs +++ b/core/src/protosext/mod.rs @@ -403,7 +403,6 @@ impl ValidScheduleLA { let retry_policy = v.retry_policy.unwrap_or_default(); let local_retry_threshold = v .local_retry_threshold - .clone() .try_into_or_none() .unwrap_or_else(|| Duration::from_secs(60)); let cancellation_type = ActivityCancellationType::try_from(v.cancellation_type) diff --git a/core/src/retry_logic.rs b/core/src/retry_logic.rs index d21f4baa5..9b0782245 100644 --- a/core/src/retry_logic.rs +++ b/core/src/retry_logic.rs @@ -46,14 +46,12 @@ impl RetryPolicyExt for RetryPolicy { } } - if let Some(explicit_delay) = application_failure.and_then(|af| af.next_retry_delay.clone()) - { + if let Some(explicit_delay) = application_failure.and_then(|af| af.next_retry_delay) { return explicit_delay.try_into().ok(); } let converted_interval = self .initial_interval - .clone() .try_into_or_none() .or(Some(Duration::from_secs(1))); if attempt_number == 1 { @@ -68,7 +66,6 @@ impl RetryPolicyExt for RetryPolicy { if let Some(interval) = converted_interval { let max_iv = self .maximum_interval - .clone() .try_into_or_none() .unwrap_or_else(|| interval.saturating_mul(100)); let mul_factor = coeff.powi(attempt_number as i32 - 1); diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 25b5b8024..f50f9affe 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -11,7 +11,7 @@ use super::{ use crate::{abstractions::dbg_panic, telemetry::metrics::DEFAULT_S_BUCKETS}; use opentelemetry::{ self, - metrics::{Meter, MeterProvider as MeterProviderT, Unit}, + metrics::{Meter, MeterProvider as MeterProviderT}, Key, KeyValue, Value, }; use opentelemetry_otlp::WithExportConfig; @@ -20,10 +20,10 @@ use opentelemetry_sdk::{ data::Temporality, new_view, reader::{AggregationSelector, DefaultAggregationSelector, TemporalitySelector}, - Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader, - SdkMeterProvider, View, + Aggregation, AttributeSet, Instrument, InstrumentKind, MeterProviderBuilder, + PeriodicReader, SdkMeterProvider, View, }, - runtime, AttributeSet, Resource, + runtime, Resource, }; use parking_lot::RwLock; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; @@ -117,7 +117,7 @@ macro_rules! impl_memory_gauge { fn new(params: MetricParameters, meter: &Meter) -> Self { let gauge = meter .$gauge_fn(params.name) - .with_unit(Unit::new(params.unit)) + .with_unit(params.unit) .with_description(params.description) .init(); let map = Arc::new(RwLock::new(HashMap::::new())); @@ -251,7 +251,7 @@ impl CoreMeter for CoreOtelMeter { Arc::new( self.meter .u64_counter(params.name) - .with_unit(Unit::new(params.unit)) + .with_unit(params.unit) .with_description(params.description) .init(), ) @@ -261,7 +261,7 @@ impl CoreMeter for CoreOtelMeter { Arc::new( self.meter .u64_histogram(params.name) - .with_unit(Unit::new(params.unit)) + .with_unit(params.unit) .with_description(params.description) .init(), ) @@ -271,7 +271,7 @@ impl CoreMeter for CoreOtelMeter { Arc::new( self.meter .f64_histogram(params.name) - .with_unit(Unit::new(params.unit)) + .with_unit(params.unit) .with_description(params.description) .init(), ) diff --git a/core/src/worker/activities.rs b/core/src/worker/activities.rs index 1e388d8aa..fc6f88238 100644 --- a/core/src/worker/activities.rs +++ b/core/src/worker/activities.rs @@ -128,7 +128,7 @@ impl RemoteInFlightActInfo { workflow_run_id: wec.run_id, start_time: Instant::now(), }, - heartbeat_timeout: poll_resp.heartbeat_timeout.clone(), + heartbeat_timeout: poll_resp.heartbeat_timeout, issued_cancel_to_lang: None, known_not_found: false, local_timeouts_task: None, @@ -417,7 +417,6 @@ impl WorkerActivityTasks { .ok_or(ActivityHeartbeatError::UnknownActivity)?; let heartbeat_timeout: Duration = at_info .heartbeat_timeout - .clone() // We treat None as 0 (even though heartbeat_timeout is never set to None by the server) .unwrap_or_default() .try_into() @@ -565,8 +564,8 @@ where let local_timeout_buffer = self.local_timeout_buffer; static HEARTBEAT_TYPE: &str = "heartbeat"; let timeout_at = [ - (HEARTBEAT_TYPE, task.resp.heartbeat_timeout.clone()), - ("start_to_close", task.resp.start_to_close_timeout.clone()), + (HEARTBEAT_TYPE, task.resp.heartbeat_timeout), + ("start_to_close", task.resp.start_to_close_timeout), ] .into_iter() .filter_map(|(k, d)| { diff --git a/core/src/worker/client/mocks.rs b/core/src/worker/client/mocks.rs index 249f679c2..5169c1a65 100644 --- a/core/src/worker/client/mocks.rs +++ b/core/src/worker/client/mocks.rs @@ -25,7 +25,7 @@ pub(crate) static DEFAULT_TEST_CAPABILITIES: &Capabilities = &Capabilities { pub(crate) fn mock_workflow_client() -> MockWorkerClient { let mut r = MockWorkerClient::new(); r.expect_capabilities() - .returning(|| Some(DEFAULT_TEST_CAPABILITIES.clone())); + .returning(|| Some(*DEFAULT_TEST_CAPABILITIES)); r.expect_workers() .returning(|| DEFAULT_WORKERS_REGISTRY.clone()); r.expect_is_mock().returning(|| true); @@ -36,7 +36,7 @@ pub(crate) fn mock_workflow_client() -> MockWorkerClient { pub(crate) fn mock_manual_workflow_client() -> MockManualWorkerClient { let mut r = MockManualWorkerClient::new(); r.expect_capabilities() - .returning(|| Some(DEFAULT_TEST_CAPABILITIES.clone())); + .returning(|| Some(*DEFAULT_TEST_CAPABILITIES)); r.expect_workers() .returning(|| DEFAULT_WORKERS_REGISTRY.clone()); r.expect_is_mock().returning(|| true); diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index dda034245..ae08bbece 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -419,7 +419,7 @@ impl Worker { config.clone(), metrics, shutdown_token.child_token(), - client.capabilities().clone().unwrap_or_default(), + client.capabilities().unwrap_or_default(), ), sticky_queue_name.map(|sq| StickyExecutionAttributes { worker_task_queue: Some(TaskQueue { diff --git a/core/src/worker/workflow/driven_workflow.rs b/core/src/worker/workflow/driven_workflow.rs index 52d7a408a..6042b0787 100644 --- a/core/src/worker/workflow/driven_workflow.rs +++ b/core/src/worker/workflow/driven_workflow.rs @@ -41,7 +41,7 @@ impl DrivenWorkflow { ) { debug!(run_id = %attribs.original_execution_run_id, "Driven WF start"); let started_info = WorkflowStartedInfo { - workflow_task_timeout: attribs.workflow_task_timeout.clone().try_into_or_none(), + workflow_task_timeout: attribs.workflow_task_timeout.try_into_or_none(), memo: attribs.memo.clone(), search_attrs: attribs.search_attributes.clone(), retry_policy: attribs.retry_policy.clone(), diff --git a/core/src/worker/workflow/machines/local_activity_state_machine.rs b/core/src/worker/workflow/machines/local_activity_state_machine.rs index 5b5e9edc3..0fb3118a2 100644 --- a/core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -674,7 +674,7 @@ impl WFMachinesAdapter for LocalActivityMachine { status: Some( DoBackoff { attempt: attempt + 1, - backoff_duration: Some(b.clone()), + backoff_duration: Some(*b), original_schedule_time: original_schedule_time.map(Into::into), } .into(), diff --git a/core/src/worker/workflow/machines/timer_state_machine.rs b/core/src/worker/workflow/machines/timer_state_machine.rs index 54de941d8..65674dfef 100644 --- a/core/src/worker/workflow/machines/timer_state_machine.rs +++ b/core/src/worker/workflow/machines/timer_state_machine.rs @@ -78,7 +78,7 @@ impl TimerMachine { .expect("Scheduling timers doesn't fail"); let cmd = Command { command_type: CommandType::StartTimer as i32, - attributes: Some(s.shared_state().attrs.clone().into()), + attributes: Some(s.shared_state().attrs.into()), user_metadata: Default::default(), }; (s, cmd) diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 0c283c76e..f637df5a7 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -934,7 +934,7 @@ impl WorkflowMachines { attrs, )) = event_dat.event.attributes { - if let Some(st) = event_dat.event.event_time.clone() { + if let Some(st) = event_dat.event.event_time { let as_systime: SystemTime = st.try_into()?; self.workflow_start_time = Some(as_systime); // Set the workflow time to be the event time of the first event, so that diff --git a/core/src/worker/workflow/machines/workflow_task_state_machine.rs b/core/src/worker/workflow/machines/workflow_task_state_machine.rs index 384042064..39c35f859 100644 --- a/core/src/worker/workflow/machines/workflow_task_state_machine.rs +++ b/core/src/worker/workflow/machines/workflow_task_state_machine.rs @@ -98,7 +98,7 @@ impl TryFrom for WorkflowTaskMachineEvents { Ok(match e.event_type() { EventType::WorkflowTaskScheduled => Self::WorkflowTaskScheduled, EventType::WorkflowTaskStarted => Self::WorkflowTaskStarted({ - let time = if let Some(time) = e.event_time.clone() { + let time = if let Some(time) = e.event_time { match time.try_into() { Ok(t) => t, Err(_) => { diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index 7714fcd9a..b048cb779 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -87,7 +87,7 @@ impl WFStream { buffered_polls_need_cache_slot: Default::default(), runs: RunCache::new( basics.worker_config.clone(), - basics.server_capabilities.clone(), + basics.server_capabilities, local_activity_request_sink, basics.metrics.clone(), ), diff --git a/sdk-core-protos/Cargo.toml b/sdk-core-protos/Cargo.toml index ebf6c9e4f..616d6935e 100644 --- a/sdk-core-protos/Cargo.toml +++ b/sdk-core-protos/Cargo.toml @@ -12,14 +12,15 @@ categories = ["development-tools"] [features] history_builders = ["uuid", "rand"] +serde_serialize = [] [dependencies] anyhow = "1.0" base64 = "0.22" derive_more = { workspace = true } prost = { workspace = true } -prost-wkt = "0.5" -prost-wkt-types = "0.5" +prost-wkt = "0.6" +prost-wkt-types = "0.6" rand = { version = "0.8", optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -29,7 +30,7 @@ uuid = { version = "1.1", features = ["v4"], optional = true } [build-dependencies] tonic-build = { workspace = true } -prost-wkt-build = "0.5" +prost-wkt-build = "0.6" [lints] workspace = true diff --git a/sdk-core-protos/src/history_builder.rs b/sdk-core-protos/src/history_builder.rs index 1ae80f7ae..fff3fc2ad 100644 --- a/sdk-core-protos/src/history_builder.rs +++ b/sdk-core-protos/src/history_builder.rs @@ -521,7 +521,6 @@ impl TestHistoryBuilder { pub fn wft_start_time(&self) -> Timestamp { self.events[(self.workflow_task_scheduled_event_id + 1) as usize] .event_time - .clone() .unwrap() } diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 4e83a0d43..030a06555 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -18,7 +18,7 @@ crossbeam-channel = "0.5" derive_more = { workspace = true } futures = "0.3" parking_lot = { version = "0.12", features = ["send_guard"] } -prost-types = { version = "0.5", package = "prost-wkt-types" } +prost-types = { version = "0.6", package = "prost-wkt-types" } serde = "1.0" tokio = { version = "1.26", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs"] } tokio-util = { version = "0.7" } diff --git a/sdk/src/activity_context.rs b/sdk/src/activity_context.rs index a2cf11cdb..8854d0583 100644 --- a/sdk/src/activity_context.rs +++ b/sdk/src/activity_context.rs @@ -188,10 +188,9 @@ fn calculate_deadline( ) => { let scheduled: SystemTime = maybe_convert_timestamp(scheduled)?; let started: SystemTime = maybe_convert_timestamp(started)?; - let start_to_close_timeout: StdDuration = - start_to_close_timeout.clone().try_into().ok()?; + let start_to_close_timeout: StdDuration = (*start_to_close_timeout).try_into().ok()?; let schedule_to_close_timeout: StdDuration = - schedule_to_close_timeout.clone().try_into().ok()?; + (*schedule_to_close_timeout).try_into().ok()?; let start_to_close_deadline: SystemTime = started.checked_add(start_to_close_timeout)?; @@ -215,7 +214,7 @@ fn calculate_deadline( /// Helper function lifted from prost_types::Timestamp implementation to prevent double cloning in /// error construction fn maybe_convert_timestamp(timestamp: &Timestamp) -> Option { - let mut timestamp = timestamp.clone(); + let mut timestamp = *timestamp; timestamp.normalize(); let system_time = if timestamp.seconds >= 0 { diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 8df234c53..9736c7a32 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -607,7 +607,6 @@ impl<'a> Future for LATimerBackoffFut<'a> { let timer_f = self.ctx.timer( b.backoff_duration - .clone() .expect("Duration is set") .try_into() .expect("duration converts ok"), diff --git a/test-utils/src/canned_histories.rs b/test-utils/src/canned_histories.rs index 436a77da1..84a07773a 100644 --- a/test-utils/src/canned_histories.rs +++ b/test-utils/src/canned_histories.rs @@ -1312,7 +1312,7 @@ pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder { t.add_full_wf_task(); let mut start_time = t.wft_start_time(); start_time.seconds += 1; - t.add_local_activity_result_marker_with_time(1, "1", b"hi".into(), start_time.clone()); + t.add_local_activity_result_marker_with_time(1, "1", b"hi".into(), start_time); if !parallel { start_time.seconds += 1; } diff --git a/tests/integ_tests/client_tests.rs b/tests/integ_tests/client_tests.rs index af6b30c5d..44ac05767 100644 --- a/tests/integ_tests/client_tests.rs +++ b/tests/integ_tests/client_tests.rs @@ -86,7 +86,7 @@ async fn per_call_timeout_respected_one_call() { struct GenericService { timeouts_tx: UnboundedSender, } -impl Service> for GenericService { +impl Service> for GenericService { type Response = tonic::codegen::http::Response; type Error = Infallible; type Future = BoxFuture<'static, Result>; @@ -95,7 +95,7 @@ impl Service> for GenericS Poll::Ready(Ok(())) } - fn call(&mut self, req: tonic::codegen::http::Request) -> Self::Future { + fn call(&mut self, req: tonic::codegen::http::Request) -> Self::Future { self.timeouts_tx .send( String::from_utf8_lossy(req.headers().get("grpc-timeout").unwrap().as_bytes())