Skip to content

Commit

Permalink
Upgrade tonic, prost, otel to latest (#782)
Browse files Browse the repository at this point in the history
Co-authored-by: Spencer Judge <[email protected]>
  • Loading branch information
h7kanna and Sushisource authored Jul 29, 2024
1 parent 8ae8054 commit 2bc8d7f
Show file tree
Hide file tree
Showing 26 changed files with 68 additions and 68 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ license-file = "LICENSE.txt"
derive_builder = "0.20"
derive_more = { version = "0.99", default-features = false, features = ["constructor", "display", "from", "into"] }
once_cell = "1.16"
tonic = "0.11"
tonic-build = "0.11"
opentelemetry = { version = "0.23", features = ["metrics"] }
prost = "0.12"
prost-types = "0.12"
tonic = "0.12"
tonic-build = "0.12"
opentelemetry = { version = "0.24", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"

[workspace.lints.rust]
unreachable_pub = "warn"
6 changes: 4 additions & 2 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ derive_builder = { workspace = true }
derive_more = "0.99"
futures = "0.3"
futures-retry = "0.6.0"
http = "0.2"
hyper = { version = "0.14" }
http = "1.1.0"
http-body-util = "0.1"
hyper = { version = "1.4.1" }
hyper-util = "0.1.6"
once_cell = { workspace = true }
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
parking_lot = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub struct GrpcMetricSvc {
}

impl Service<http::Request<BoxBody>> for GrpcMetricSvc {
type Response = http::Response<tonic::transport::Body>;
type Response = http::Response<BoxBody>;
type Error = tonic::transport::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

Expand Down
29 changes: 17 additions & 12 deletions client/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use base64::prelude::*;
use hyper::header;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use http_body_util::Empty;
use hyper::{body::Bytes, header};
use hyper_util::{
client::legacy::Client,
rt::{TokioExecutor, TokioIo},
};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::net::TcpStream;
use tonic::transport::Channel;
use tonic::transport::Endpoint;
use tonic::transport::{Channel, Endpoint};
use tower::{service_fn, Service};

/// Options for HTTP CONNECT proxy.
Expand Down Expand Up @@ -43,12 +48,12 @@ impl HttpConnectProxyOptions {
let creds = BASE64_STANDARD.encode(format!("{}:{}", user, pass));
req_build = req_build.header(header::PROXY_AUTHORIZATION, format!("Basic {}", creds));
}
let req = req_build.body(hyper::Body::empty())?;
let req = req_build.body(Empty::<Bytes>::new())?;

// We have to create a client with a specific connector because Hyper is
// not letting us change the HTTP/2 authority
let client =
hyper::Client::builder().build(OverrideAddrConnector(self.target_addr.clone()));
let client = Client::builder(TokioExecutor::new())
.build(OverrideAddrConnector(self.target_addr.clone()));

// Send request
let res = client.request(req).await?;
Expand All @@ -67,7 +72,7 @@ impl HttpConnectProxyOptions {
struct OverrideAddrConnector(String);

impl Service<hyper::Uri> for OverrideAddrConnector {
type Response = TcpStream;
type Response = TokioIo<TcpStream>;

type Error = anyhow::Error;

Expand All @@ -79,7 +84,7 @@ impl Service<hyper::Uri> for OverrideAddrConnector {

fn call(&mut self, _uri: hyper::Uri) -> Self::Future {
let target_addr = self.0.clone();
let fut = async move { Ok(TcpStream::connect(target_addr).await?) };
let fut = async move { Ok(TokioIo::new(TcpStream::connect(target_addr).await?)) };
Box::pin(fut)
}
}
2 changes: 1 addition & 1 deletion client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ fn req_cloner<T: Clone>(cloneme: &Request<T>) -> Request<T> {
new_req
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub(super) struct AttachMetricLabels {
pub(super) labels: Vec<MetricKeyValue>,
}
Expand Down
6 changes: 3 additions & 3 deletions client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ where
list_open_workflow_executions,
maximum_page_size,
next_page_token.clone(),
start_time_filter.clone(),
start_time_filter,
filters.clone()
)
}
Expand All @@ -498,7 +498,7 @@ where
list_closed_workflow_executions,
maximum_page_size,
next_page_token.clone(),
start_time_filter.clone(),
start_time_filter,
filters.clone()
)
}
Expand Down Expand Up @@ -551,7 +551,7 @@ where
workflow_id.clone(),
run_id.clone(),
name.clone(),
wait_policy.clone(),
wait_policy,
args.clone()
)
}
Expand Down
10 changes: 5 additions & 5 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ephemeral-server = ["dep:flate2", "dep:reqwest", "dep:tar", "dep:zip"]
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
console-subscriber = { version = "0.3", optional = true }
console-subscriber = { version = "0.4", optional = true }
crossbeam-channel = "0.5"
crossbeam-queue = "0.3"
crossbeam-utils = "0.8"
Expand All @@ -43,15 +43,15 @@ lru = "0.12"
mockall = "0.12"
once_cell = { workspace = true }
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.23", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.16", features = ["tokio", "metrics"], optional = true }
opentelemetry-prometheus = { version = "0.16", optional = true }
opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics"], optional = true }
opentelemetry-prometheus = { version = "0.17", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
pin-project = "1.0"
prometheus = "0.13"
prost = { workspace = true }
prost-types = { version = "0.5", package = "prost-wkt-types" }
prost-types = { version = "0.6", package = "prost-wkt-types" }
rand = "0.8.3"
reqwest = { version = "0.12", features = ["json", "stream", "rustls-tls"], default-features = false, optional = true }
ringbuf = "0.4"
Expand Down
1 change: 0 additions & 1 deletion core/src/protosext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ impl ValidScheduleLA {
let retry_policy = v.retry_policy.unwrap_or_default();
let local_retry_threshold = v
.local_retry_threshold
.clone()
.try_into_or_none()
.unwrap_or_else(|| Duration::from_secs(60));
let cancellation_type = ActivityCancellationType::try_from(v.cancellation_type)
Expand Down
5 changes: 1 addition & 4 deletions core/src/retry_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@ impl RetryPolicyExt for RetryPolicy {
}
}

if let Some(explicit_delay) = application_failure.and_then(|af| af.next_retry_delay.clone())
{
if let Some(explicit_delay) = application_failure.and_then(|af| af.next_retry_delay) {
return explicit_delay.try_into().ok();
}

let converted_interval = self
.initial_interval
.clone()
.try_into_or_none()
.or(Some(Duration::from_secs(1)));
if attempt_number == 1 {
Expand All @@ -68,7 +66,6 @@ impl RetryPolicyExt for RetryPolicy {
if let Some(interval) = converted_interval {
let max_iv = self
.maximum_interval
.clone()
.try_into_or_none()
.unwrap_or_else(|| interval.saturating_mul(100));
let mul_factor = coeff.powi(attempt_number as i32 - 1);
Expand Down
16 changes: 8 additions & 8 deletions core/src/telemetry/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
use crate::{abstractions::dbg_panic, telemetry::metrics::DEFAULT_S_BUCKETS};
use opentelemetry::{
self,
metrics::{Meter, MeterProvider as MeterProviderT, Unit},
metrics::{Meter, MeterProvider as MeterProviderT},
Key, KeyValue, Value,
};
use opentelemetry_otlp::WithExportConfig;
Expand All @@ -20,10 +20,10 @@ use opentelemetry_sdk::{
data::Temporality,
new_view,
reader::{AggregationSelector, DefaultAggregationSelector, TemporalitySelector},
Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader,
SdkMeterProvider, View,
Aggregation, AttributeSet, Instrument, InstrumentKind, MeterProviderBuilder,
PeriodicReader, SdkMeterProvider, View,
},
runtime, AttributeSet, Resource,
runtime, Resource,
};
use parking_lot::RwLock;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
Expand Down Expand Up @@ -117,7 +117,7 @@ macro_rules! impl_memory_gauge {
fn new(params: MetricParameters, meter: &Meter) -> Self {
let gauge = meter
.$gauge_fn(params.name)
.with_unit(Unit::new(params.unit))
.with_unit(params.unit)
.with_description(params.description)
.init();
let map = Arc::new(RwLock::new(HashMap::<AttributeSet, $ty>::new()));
Expand Down Expand Up @@ -251,7 +251,7 @@ impl CoreMeter for CoreOtelMeter {
Arc::new(
self.meter
.u64_counter(params.name)
.with_unit(Unit::new(params.unit))
.with_unit(params.unit)
.with_description(params.description)
.init(),
)
Expand All @@ -261,7 +261,7 @@ impl CoreMeter for CoreOtelMeter {
Arc::new(
self.meter
.u64_histogram(params.name)
.with_unit(Unit::new(params.unit))
.with_unit(params.unit)
.with_description(params.description)
.init(),
)
Expand All @@ -271,7 +271,7 @@ impl CoreMeter for CoreOtelMeter {
Arc::new(
self.meter
.f64_histogram(params.name)
.with_unit(Unit::new(params.unit))
.with_unit(params.unit)
.with_description(params.description)
.init(),
)
Expand Down
7 changes: 3 additions & 4 deletions core/src/worker/activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl RemoteInFlightActInfo {
workflow_run_id: wec.run_id,
start_time: Instant::now(),
},
heartbeat_timeout: poll_resp.heartbeat_timeout.clone(),
heartbeat_timeout: poll_resp.heartbeat_timeout,
issued_cancel_to_lang: None,
known_not_found: false,
local_timeouts_task: None,
Expand Down Expand Up @@ -417,7 +417,6 @@ impl WorkerActivityTasks {
.ok_or(ActivityHeartbeatError::UnknownActivity)?;
let heartbeat_timeout: Duration = at_info
.heartbeat_timeout
.clone()
// We treat None as 0 (even though heartbeat_timeout is never set to None by the server)
.unwrap_or_default()
.try_into()
Expand Down Expand Up @@ -565,8 +564,8 @@ where
let local_timeout_buffer = self.local_timeout_buffer;
static HEARTBEAT_TYPE: &str = "heartbeat";
let timeout_at = [
(HEARTBEAT_TYPE, task.resp.heartbeat_timeout.clone()),
("start_to_close", task.resp.start_to_close_timeout.clone()),
(HEARTBEAT_TYPE, task.resp.heartbeat_timeout),
("start_to_close", task.resp.start_to_close_timeout),
]
.into_iter()
.filter_map(|(k, d)| {
Expand Down
4 changes: 2 additions & 2 deletions core/src/worker/client/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) static DEFAULT_TEST_CAPABILITIES: &Capabilities = &Capabilities {
pub(crate) fn mock_workflow_client() -> MockWorkerClient {
let mut r = MockWorkerClient::new();
r.expect_capabilities()
.returning(|| Some(DEFAULT_TEST_CAPABILITIES.clone()));
.returning(|| Some(*DEFAULT_TEST_CAPABILITIES));
r.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
r.expect_is_mock().returning(|| true);
Expand All @@ -36,7 +36,7 @@ pub(crate) fn mock_workflow_client() -> MockWorkerClient {
pub(crate) fn mock_manual_workflow_client() -> MockManualWorkerClient {
let mut r = MockManualWorkerClient::new();
r.expect_capabilities()
.returning(|| Some(DEFAULT_TEST_CAPABILITIES.clone()));
.returning(|| Some(*DEFAULT_TEST_CAPABILITIES));
r.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
r.expect_is_mock().returning(|| true);
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ impl Worker {
config.clone(),
metrics,
shutdown_token.child_token(),
client.capabilities().clone().unwrap_or_default(),
client.capabilities().unwrap_or_default(),
),
sticky_queue_name.map(|sq| StickyExecutionAttributes {
worker_task_queue: Some(TaskQueue {
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/driven_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl DrivenWorkflow {
) {
debug!(run_id = %attribs.original_execution_run_id, "Driven WF start");
let started_info = WorkflowStartedInfo {
workflow_task_timeout: attribs.workflow_task_timeout.clone().try_into_or_none(),
workflow_task_timeout: attribs.workflow_task_timeout.try_into_or_none(),
memo: attribs.memo.clone(),
search_attrs: attribs.search_attributes.clone(),
retry_policy: attribs.retry_policy.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ impl WFMachinesAdapter for LocalActivityMachine {
status: Some(
DoBackoff {
attempt: attempt + 1,
backoff_duration: Some(b.clone()),
backoff_duration: Some(*b),
original_schedule_time: original_schedule_time.map(Into::into),
}
.into(),
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl TimerMachine {
.expect("Scheduling timers doesn't fail");
let cmd = Command {
command_type: CommandType::StartTimer as i32,
attributes: Some(s.shared_state().attrs.clone().into()),
attributes: Some(s.shared_state().attrs.into()),
user_metadata: Default::default(),
};
(s, cmd)
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ impl WorkflowMachines {
attrs,
)) = event_dat.event.attributes
{
if let Some(st) = event_dat.event.event_time.clone() {
if let Some(st) = event_dat.event.event_time {
let as_systime: SystemTime = st.try_into()?;
self.workflow_start_time = Some(as_systime);
// Set the workflow time to be the event time of the first event, so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl TryFrom<HistEventData> for WorkflowTaskMachineEvents {
Ok(match e.event_type() {
EventType::WorkflowTaskScheduled => Self::WorkflowTaskScheduled,
EventType::WorkflowTaskStarted => Self::WorkflowTaskStarted({
let time = if let Some(time) = e.event_time.clone() {
let time = if let Some(time) = e.event_time {
match time.try_into() {
Ok(t) => t,
Err(_) => {
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl WFStream {
buffered_polls_need_cache_slot: Default::default(),
runs: RunCache::new(
basics.worker_config.clone(),
basics.server_capabilities.clone(),
basics.server_capabilities,
local_activity_request_sink,
basics.metrics.clone(),
),
Expand Down
7 changes: 4 additions & 3 deletions sdk-core-protos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ categories = ["development-tools"]

[features]
history_builders = ["uuid", "rand"]
serde_serialize = []

[dependencies]
anyhow = "1.0"
base64 = "0.22"
derive_more = { workspace = true }
prost = { workspace = true }
prost-wkt = "0.5"
prost-wkt-types = "0.5"
prost-wkt = "0.6"
prost-wkt-types = "0.6"
rand = { version = "0.8", optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand All @@ -29,7 +30,7 @@ uuid = { version = "1.1", features = ["v4"], optional = true }

[build-dependencies]
tonic-build = { workspace = true }
prost-wkt-build = "0.5"
prost-wkt-build = "0.6"

[lints]
workspace = true
1 change: 0 additions & 1 deletion sdk-core-protos/src/history_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ impl TestHistoryBuilder {
pub fn wft_start_time(&self) -> Timestamp {
self.events[(self.workflow_task_scheduled_event_id + 1) as usize]
.event_time
.clone()
.unwrap()
}

Expand Down
Loading

0 comments on commit 2bc8d7f

Please sign in to comment.