diff --git a/rust/src/client.rs b/rust/src/client.rs index 884f98b0..4183256f 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -26,9 +26,10 @@ use parking_lot::Mutex; use prost_types::Duration; use slog::{debug, error, info, o, warn, Logger}; use tokio::select; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::time::Instant; -use crate::conf::ClientOption; +use crate::conf::{ClientOption, SettingsAware}; use crate::error::{ClientError, ErrorKind}; use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt}; use crate::model::message::AckMessageEntry; @@ -44,14 +45,14 @@ use crate::pb::{ use crate::session::SessionManager; use crate::session::{RPCClient, Session}; -pub(crate) struct Client { +pub(crate) struct Client { logger: Logger, option: ClientOption, session_manager: Arc, route_table: Mutex>, id: String, access_endpoints: Endpoints, - settings: TelemetryCommand, + settings: Arc>, telemetry_command_tx: Option>, shutdown_tx: Option>, } @@ -68,7 +69,10 @@ const OPERATION_SEND_MESSAGE: &str = "client.send_message"; const OPERATION_RECEIVE_MESSAGE: &str = "client.receive_message"; const OPERATION_ACK_MESSAGE: &str = "client.ack_message"; -impl Debug for Client { +impl Debug for Client +where + S: SettingsAware + 'static, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Client") .field("id", &self.id) @@ -79,11 +83,14 @@ impl Debug for Client { } #[automock] -impl Client { +impl Client +where + S: SettingsAware + 'static + Send + Sync, +{ pub(crate) fn new( logger: &Logger, option: ClientOption, - settings: TelemetryCommand, + settings: Arc>, ) -> Result { let id = Self::generate_client_id(); let endpoints = Endpoints::from_url(option.access_url()) @@ -131,12 +138,16 @@ impl Client { .await .map_err(|error| error.with_operation(OPERATION_CLIENT_START))?; + let settings = Arc::clone(&self.settings); tokio::spawn(async move { rpc_client.is_started(); - let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); + let seconds_30 = std::time::Duration::from_secs(30); + let mut heartbeat_interval = tokio::time::interval(seconds_30); + let mut sync_settings_interval = + tokio::time::interval_at(Instant::now() + seconds_30, seconds_30); loop { select! { - _ = interval.tick() => { + _ = heartbeat_interval.tick() => { let sessions = session_manager.get_all_sessions().await; if sessions.is_err() { error!( @@ -159,7 +170,7 @@ impl Client { continue; } let result = - Self::handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT); + handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT); if result.is_err() { error!( logger, @@ -171,13 +182,34 @@ impl Client { debug!(logger,"send heartbeat to server success, peer={}",peer); } }, + _ = sync_settings_interval.tick() => { + let sessions = session_manager.get_all_sessions().await; + if sessions.is_err() { + error!(logger, "sync settings failed: failed to get sessions: {}", sessions.unwrap_err()); + continue; + } + for mut session in sessions.unwrap() { + let command; + { + command = settings.read().await.build_telemetry_command(); + } + let peer = session.peer().to_string(); + let result = session.update_settings(command).await; + if result.is_err() { + error!(logger, "sync settings failed: failed to call rpc: {}", result.unwrap_err()); + continue; + } + debug!(logger, "sync settings success, peer = {}", peer); + } + + }, _ = &mut shutdown_rx => { - info!(logger, "receive shutdown signal, stop heartbeat task."); + info!(logger, "receive shutdown signal, stop heartbeat and telemetry tasks."); break; } } } - info!(logger, "heartbeat task is stopped"); + info!(logger, "heartbeat and telemetry task were stopped"); }); Ok(()) } @@ -206,7 +238,7 @@ impl Client { resource_namespace: self.option.namespace.to_string(), }); let response = rpc_client.notify_shutdown(NotifyClientTerminationRequest { group }); - Self::handle_response_status(response.await?.status, OPERATION_CLIENT_SHUTDOWN)?; + handle_response_status(response.await?.status, OPERATION_CLIENT_SHUTDOWN)?; self.session_manager.shutdown().await; Ok(()) } @@ -234,13 +266,17 @@ impl Client { ) } + async fn build_telemetry_command(&self) -> TelemetryCommand { + self.settings.read().await.build_telemetry_command() + } + pub(crate) async fn get_session(&self) -> Result { self.check_started(OPERATION_GET_SESSION)?; let session = self .session_manager .get_or_create_session( &self.access_endpoints, - self.settings.clone(), + self.build_telemetry_command().await, self.telemetry_command_tx.clone().unwrap(), ) .await?; @@ -255,37 +291,13 @@ impl Client { .session_manager .get_or_create_session( endpoints, - self.settings.clone(), + self.build_telemetry_command().await, self.telemetry_command_tx.clone().unwrap(), ) .await?; Ok(session) } - pub(crate) fn handle_response_status( - status: Option, - operation: &'static str, - ) -> Result<(), ClientError> { - if status.is_none() { - return Err(ClientError::new( - ErrorKind::Server, - "server do not return status, this may be a bug", - operation, - )); - } - - let status = status.unwrap(); - let status_code = Code::from_i32(status.code).unwrap(); - if !status_code.eq(&Code::Ok) { - return Err( - ClientError::new(ErrorKind::Server, "server return an error", operation) - .with_context("code", status_code.as_str_name()) - .with_context("message", status.message), - ); - } - Ok(()) - } - pub(crate) fn topic_route_from_cache(&self, topic: &str) -> Option> { self.route_table.lock().get(topic).and_then(|route_status| { if let RouteStatus::Found(route) = route_status { @@ -325,7 +337,7 @@ impl Client { }; let response = rpc_client.query_route(request).await?; - Self::handle_response_status(response.status, OPERATION_QUERY_ROUTE)?; + handle_response_status(response.status, OPERATION_QUERY_ROUTE)?; let route = Route { index: AtomicUsize::new(0), @@ -454,7 +466,7 @@ impl Client { ) -> Result, ClientError> { let request = SendMessageRequest { messages }; let response = rpc_client.send_message(request).await?; - Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?; + handle_response_status(response.status, OPERATION_SEND_MESSAGE)?; Ok(response .entries @@ -512,7 +524,7 @@ impl Client { if status.code() == Code::MessageNotFound { return Ok(vec![]); } - Self::handle_response_status(Some(status), OPERATION_RECEIVE_MESSAGE)?; + handle_response_status(Some(status), OPERATION_RECEIVE_MESSAGE)?; } Content::Message(message) => { messages.push(message); @@ -560,7 +572,7 @@ impl Client { entries, }; let response = rpc_client.ack_message(request).await?; - Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?; + handle_response_status(response.status, OPERATION_ACK_MESSAGE)?; Ok(response.entries) } @@ -605,11 +617,31 @@ impl Client { message_id, }; let response = rpc_client.change_invisible_duration(request).await?; - Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?; + handle_response_status(response.status, OPERATION_ACK_MESSAGE)?; Ok(response.receipt_handle) } } +pub fn handle_response_status( + status: Option, + operation: &'static str, +) -> Result<(), ClientError> { + let status = status.ok_or(ClientError::new( + ErrorKind::Server, + "server do not return status, this may be a bug", + operation, + ))?; + + if status.code != Code::Ok as i32 { + return Err( + ClientError::new(ErrorKind::Server, "server return an error", operation) + .with_context("code", format!("{}", status.code)) + .with_context("message", status.message), + ); + } + Ok(()) +} + #[cfg(test)] pub(crate) mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; @@ -624,7 +656,6 @@ pub(crate) mod tests { use crate::error::{ClientError, ErrorKind}; use crate::log::terminal_logger; use crate::model::common::{ClientType, Route}; - use crate::pb::receive_message_response::Content; use crate::pb::{ AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, Code, FilterExpression, HeartbeatResponse, Message, MessageQueue, QueryRouteResponse, @@ -637,7 +668,16 @@ pub(crate) mod tests { // The lock is used to prevent the mocking static function at same time during parallel testing. pub(crate) static MTX: Lazy> = Lazy::new(|| Mutex::new(())); - fn new_client_for_test() -> Client { + #[derive(Default)] + struct MockSettings {} + + impl SettingsAware for MockSettings { + fn build_telemetry_command(&self) -> TelemetryCommand { + TelemetryCommand::default() + } + } + + fn new_client_for_test() -> Client { Client { logger: terminal_logger(), option: ClientOption { @@ -646,24 +686,24 @@ pub(crate) mod tests { }, session_manager: Arc::new(SessionManager::default()), route_table: Mutex::new(HashMap::new()), - id: Client::generate_client_id(), + id: Client::::generate_client_id(), access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(), - settings: TelemetryCommand::default(), + settings: Arc::new(RwLock::new(MockSettings::default())), telemetry_command_tx: None, shutdown_tx: None, } } - fn new_client_with_session_manager(session_manager: SessionManager) -> Client { + fn new_client_with_session_manager(session_manager: SessionManager) -> Client { let (tx, _) = mpsc::channel(16); Client { logger: terminal_logger(), option: ClientOption::default(), session_manager: Arc::new(session_manager), route_table: Mutex::new(HashMap::new()), - id: Client::generate_client_id(), + id: Client::::generate_client_id(), access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(), - settings: TelemetryCommand::default(), + settings: Arc::new(RwLock::new(MockSettings::default())), telemetry_command_tx: Some(tx), shutdown_tx: None, } @@ -684,7 +724,7 @@ pub(crate) mod tests { Client::new( &terminal_logger(), ClientOption::default(), - TelemetryCommand::default(), + Arc::new(RwLock::new(MockSettings::default())), )?; Ok(()) } @@ -728,8 +768,8 @@ pub(crate) mod tests { } #[test] - fn handle_response_status() { - let result = Client::handle_response_status(None, "test"); + fn test_handle_response_status() { + let result = handle_response_status(None, "test"); assert!(result.is_err(), "should return error when status is None"); let result = result.unwrap_err(); assert_eq!(result.kind, ErrorKind::Server); @@ -739,7 +779,7 @@ pub(crate) mod tests { ); assert_eq!(result.operation, "test"); - let result = Client::handle_response_status( + let result = handle_response_status( Some(Status { code: Code::BadRequest as i32, message: "test failed".to_string(), @@ -757,12 +797,12 @@ pub(crate) mod tests { assert_eq!( result.context, vec![ - ("code", "BAD_REQUEST".to_string()), + ("code", format!("{}", Code::BadRequest as i32)), ("message", "test failed".to_string()), ] ); - let result = Client::handle_response_status( + let result = handle_response_status( Some(Status { code: Code::Ok as i32, message: "test success".to_string(), @@ -897,9 +937,13 @@ pub(crate) mod tests { mock.expect_heartbeat() .return_once(|_| Box::pin(futures::future::ready(response))); - let send_result = - Client::heart_beat_inner(mock, &Some("group".to_string()), "", &ClientType::Producer) - .await; + let send_result = Client::::heart_beat_inner( + mock, + &Some("group".to_string()), + "", + &ClientType::Producer, + ) + .await; assert!(send_result.is_ok()); } diff --git a/rust/src/conf.rs b/rust/src/conf.rs index fa16a41a..8b270612 100644 --- a/rust/src/conf.rs +++ b/rust/src/conf.rs @@ -20,10 +20,12 @@ use std::time::Duration; use crate::model::common::ClientType; +use crate::pb::TelemetryCommand; #[allow(unused_imports)] use crate::producer::Producer; #[allow(unused_imports)] use crate::simple_consumer::SimpleConsumer; +use crate::util::{build_producer_settings, build_simple_consumer_settings}; /// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy. #[derive(Debug, Clone)] @@ -130,6 +132,7 @@ pub struct ProducerOption { topics: Option>, namespace: String, validate_message_type: bool, + timeout: Duration, } impl Default for ProducerOption { @@ -140,6 +143,7 @@ impl Default for ProducerOption { topics: None, namespace: "".to_string(), validate_message_type: true, + timeout: Duration::from_secs(3), } } } @@ -188,6 +192,10 @@ impl ProducerOption { pub fn set_validate_message_type(&mut self, validate_message_type: bool) { self.validate_message_type = validate_message_type; } + + pub fn timeout(&self) -> &Duration { + &self.timeout + } } /// The configuration of [`SimpleConsumer`]. @@ -198,6 +206,8 @@ pub struct SimpleConsumerOption { prefetch_route: bool, topics: Option>, namespace: String, + timeout: Duration, + long_polling_timeout: Duration, } impl Default for SimpleConsumerOption { @@ -208,6 +218,8 @@ impl Default for SimpleConsumerOption { prefetch_route: true, topics: None, namespace: "".to_string(), + timeout: Duration::from_secs(3), + long_polling_timeout: Duration::from_secs(40), } } } @@ -256,6 +268,30 @@ impl SimpleConsumerOption { pub(crate) fn set_namespace(&mut self, name_space: impl Into) { self.namespace = name_space.into(); } + + pub fn timeout(&self) -> &Duration { + &self.timeout + } + + pub fn long_polling_timeout(&self) -> &Duration { + &self.long_polling_timeout + } +} + +pub trait SettingsAware { + fn build_telemetry_command(&self) -> TelemetryCommand; +} + +impl SettingsAware for ProducerOption { + fn build_telemetry_command(&self) -> TelemetryCommand { + build_producer_settings(self) + } +} + +impl SettingsAware for SimpleConsumerOption { + fn build_telemetry_command(&self) -> TelemetryCommand { + build_simple_consumer_settings(self) + } } #[cfg(test)] diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs index 2a40d7bf..0a4c6871 100644 --- a/rust/src/model/transaction.rs +++ b/rust/src/model/transaction.rs @@ -21,7 +21,7 @@ use std::fmt::{Debug, Formatter}; use async_trait::async_trait; -use crate::client::Client; +use crate::client::handle_response_status; use crate::error::ClientError; use crate::model::common::SendReceipt; use crate::model::message::MessageView; @@ -95,7 +95,7 @@ impl TransactionImpl { trace_context: "".to_string(), }) .await?; - Client::handle_response_status(response.status, "end transaction") + handle_response_status(response.status, "end transaction") } } diff --git a/rust/src/producer.rs b/rust/src/producer.rs index e456cbe7..26ba496a 100644 --- a/rust/src/producer.rs +++ b/rust/src/producer.rs @@ -26,6 +26,7 @@ use tokio::select; use tokio::sync::RwLock; use tokio::sync::{mpsc, oneshot}; +use crate::client::handle_response_status; #[double] use crate::client::Client; use crate::conf::{ClientOption, ProducerOption}; @@ -40,8 +41,8 @@ use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, S use crate::pb::{Encoding, EndTransactionRequest, Resource, SystemProperties, TransactionSource}; use crate::session::RPCClient; use crate::util::{ - build_endpoints_by_message_queue, build_producer_settings, select_message_queue, - select_message_queue_by_message_group, HOST_NAME, + build_endpoints_by_message_queue, select_message_queue, select_message_queue_by_message_group, + HOST_NAME, }; use crate::{log, pb}; @@ -54,7 +55,7 @@ use crate::{log, pb}; pub struct Producer { option: Arc>, logger: Logger, - client: Client, + client: Client, transaction_checker: Option>, shutdown_tx: Option>, } @@ -85,9 +86,8 @@ impl Producer { ..client_option }; let logger = log::logger(option.logging_format()); - let settings = build_producer_settings(&option, &client_option); - let client = Client::new(&logger, client_option, settings)?; let option = Arc::new(RwLock::new(option)); + let client = Client::new(&logger, client_option, Arc::clone(&option))?; Ok(Producer { option, logger, @@ -115,9 +115,8 @@ impl Producer { ..client_option }; let logger = log::logger(option.logging_format()); - let settings = build_producer_settings(&option, &client_option); - let client = Client::new(&logger, client_option, settings)?; let option = Arc::new(RwLock::new(option)); + let client = Client::::new(&logger, client_option, Arc::clone(&option))?; Ok(Producer { option, logger, @@ -128,9 +127,7 @@ impl Producer { } async fn get_resource_namespace(&self) -> String { - let option_guard = self.option.read(); - let resource_namespace = option_guard.await.namespace().to_string(); - resource_namespace + self.option.read().await.namespace().to_string() } /// Start the producer @@ -139,14 +136,15 @@ impl Producer { let telemetry_command_tx: mpsc::Sender = telemetry_command_tx; self.client.start(telemetry_command_tx).await?; - let option_guard = self.option.read().await; - let topics = option_guard.topics(); - if let Some(topics) = topics { - for topic in topics { - self.client.topic_route(topic, true).await?; + { + let option_guard = self.option.read().await; + let topics = option_guard.topics(); + if let Some(topics) = topics { + for topic in topics { + self.client.topic_route(topic, true).await?; + } } } - drop(option_guard); let transaction_checker = self.transaction_checker.take(); if transaction_checker.is_some() { self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN)); @@ -245,7 +243,7 @@ impl Producer { trace_context: "".to_string(), }) .await?; - Client::handle_response_status(response.status, Self::OPERATION_END_TRANSACTION) + handle_response_status(response.status, Self::OPERATION_END_TRANSACTION) } else { Err(ClientError::new( ErrorKind::Config, @@ -396,9 +394,7 @@ impl Producer { select_message_queue(route) }; - let option_guard = self.option.read().await; - let validate_message_type = option_guard.validate_message_type(); - drop(option_guard); + let validate_message_type = self.validate_message_type().await; if validate_message_type { for message_type in message_types { if !message_queue.accept_type(message_type) { @@ -424,6 +420,10 @@ impl Producer { self.client.send_message(&endpoints, pb_messages).await } + async fn validate_message_type(&self) -> bool { + self.option.read().await.validate_message_type() + } + pub fn has_transaction_checker(&self) -> bool { self.transaction_checker.is_some() } @@ -500,7 +500,7 @@ mod tests { async fn producer_start() -> Result<(), ClientError> { let _m = crate::client::tests::MTX.lock(); - let ctx = Client::new_context(); + let ctx = Client::::new_context(); ctx.expect().return_once(|_, _, _| { let mut client = Client::default(); client.expect_topic_route().returning(|_, _| { @@ -533,7 +533,7 @@ mod tests { async fn transaction_producer_start() -> Result<(), ClientError> { let _m = crate::client::tests::MTX.lock(); - let ctx = Client::new_context(); + let ctx = Client::::new_context(); ctx.expect().return_once(|_, _, _| { let mut client = Client::default(); client.expect_topic_route().returning(|_, _| { @@ -774,8 +774,6 @@ mod tests { mock.expect_end_transaction() .return_once(|_| Box::pin(futures::future::ready(response))); - let context = MockClient::handle_response_status_context(); - context.expect().return_once(|_, _| Result::Ok(())); let result = Producer::handle_recover_orphaned_transaction_command( mock, pb::RecoverOrphanedTransactionCommand { diff --git a/rust/src/session.rs b/rust/src/session.rs index c69e9ee9..7b2643c8 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -110,7 +110,7 @@ impl Session { option: self.option.clone(), endpoints: self.endpoints.clone(), stub: self.stub.clone(), - telemetry_tx: None, + telemetry_tx: self.telemetry_tx.clone(), shutdown_tx: None, } } @@ -686,10 +686,7 @@ mod tests { let (tx, _) = mpsc::channel(16); let result = session - .start( - build_producer_settings(&ProducerOption::default(), &ClientOption::default()), - tx, - ) + .start(build_producer_settings(&ProducerOption::default()), tx) .await; assert!(result.is_ok()); assert!(session.is_started()); @@ -714,7 +711,7 @@ mod tests { let session = session_manager .get_or_create_session( &Endpoints::from_url(&format!("localhost:{}", server.address().port())).unwrap(), - build_producer_settings(&ProducerOption::default(), &client_option), + build_producer_settings(&ProducerOption::default()), tx, ) .await diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs index e891d384..8d655000 100644 --- a/rust/src/simple_consumer.rs +++ b/rust/src/simple_consumer.rs @@ -15,12 +15,13 @@ * limitations under the License. */ +use std::sync::Arc; use std::time::Duration; use mockall_double::double; use slog::{info, warn, Logger}; use tokio::select; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, RwLock}; #[double] use crate::client::Client; @@ -28,9 +29,7 @@ use crate::conf::{ClientOption, SimpleConsumerOption}; use crate::error::{ClientError, ErrorKind}; use crate::model::common::{ClientType, FilterExpression}; use crate::model::message::{AckMessageEntry, MessageView}; -use crate::util::{ - build_endpoints_by_message_queue, build_simple_consumer_settings, select_message_queue, -}; +use crate::util::{build_endpoints_by_message_queue, select_message_queue}; use crate::{log, pb}; /// [`SimpleConsumer`] is a lightweight consumer to consume messages from RocketMQ proxy. @@ -46,7 +45,7 @@ use crate::{log, pb}; pub struct SimpleConsumer { option: SimpleConsumerOption, logger: Logger, - client: Client, + client: Client, shutdown_tx: Option>, } @@ -75,8 +74,8 @@ impl SimpleConsumer { ..client_option }; let logger = log::logger(option.logging_format()); - let settings = build_simple_consumer_settings(&option, &client_option); - let client = Client::new(&logger, client_option, settings)?; + let settings = Arc::new(RwLock::new(option.clone())); + let client = Client::::new(&logger, client_option, settings)?; Ok(SimpleConsumer { option, logger, @@ -230,7 +229,7 @@ mod tests { async fn simple_consumer_start() -> Result<(), ClientError> { let _m = crate::client::tests::MTX.lock(); - let ctx = Client::new_context(); + let ctx = Client::::new_context(); ctx.expect().return_once(|_, _, _| { let mut client = Client::default(); client.expect_topic_route().returning(|_, _| { diff --git a/rust/src/util.rs b/rust/src/util.rs index ac935b6b..1e5cf11c 100644 --- a/rust/src/util.rs +++ b/rust/src/util.rs @@ -14,16 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use once_cell::sync::Lazy; use std::hash::Hasher; use std::sync::atomic::Ordering; use std::sync::Arc; -use crate::conf::{ClientOption, ProducerOption, SimpleConsumerOption}; +use once_cell::sync::Lazy; use siphasher::sip::SipHasher24; +use crate::conf::{ProducerOption, SimpleConsumerOption}; use crate::error::{ClientError, ErrorKind}; -use crate::model::common::{Endpoints, Route}; +use crate::model::common::{ClientType, Endpoints, Route}; use crate::pb::settings::PubSub; use crate::pb::telemetry_command::Command; use crate::pb::{ @@ -84,10 +84,7 @@ pub(crate) fn build_endpoints_by_message_queue( Ok(Endpoints::from_pb_endpoints(broker.endpoints.unwrap())) } -pub(crate) fn build_producer_settings( - option: &ProducerOption, - client_options: &ClientOption, -) -> TelemetryCommand { +pub(crate) fn build_producer_settings(option: &ProducerOption) -> TelemetryCommand { let topics = option .topics() .clone() @@ -101,10 +98,10 @@ pub(crate) fn build_producer_settings( let platform = os_info::get(); TelemetryCommand { command: Some(Command::Settings(Settings { - client_type: Some(client_options.client_type.clone() as i32), + client_type: Some(ClientType::Producer as i32), request_timeout: Some(prost_types::Duration { - seconds: client_options.timeout().as_secs() as i64, - nanos: client_options.timeout().subsec_nanos() as i32, + seconds: option.timeout().as_secs() as i64, + nanos: option.timeout().subsec_nanos() as i32, }), pub_sub: Some(PubSub::Publishing(Publishing { topics, @@ -123,17 +120,14 @@ pub(crate) fn build_producer_settings( } } -pub(crate) fn build_simple_consumer_settings( - option: &SimpleConsumerOption, - client_option: &ClientOption, -) -> TelemetryCommand { +pub(crate) fn build_simple_consumer_settings(option: &SimpleConsumerOption) -> TelemetryCommand { let platform = os_info::get(); TelemetryCommand { command: Some(Command::Settings(Settings { - client_type: Some(client_option.client_type.clone() as i32), + client_type: Some(ClientType::SimpleConsumer as i32), request_timeout: Some(prost_types::Duration { - seconds: client_option.timeout().as_secs() as i64, - nanos: client_option.timeout().subsec_nanos() as i32, + seconds: option.timeout().as_secs() as i64, + nanos: option.timeout().subsec_nanos() as i32, }), pub_sub: Some(PubSub::Subscription(Subscription { group: Some(Resource { @@ -144,8 +138,8 @@ pub(crate) fn build_simple_consumer_settings( fifo: Some(false), receive_batch_size: None, long_polling_timeout: Some(prost_types::Duration { - seconds: client_option.long_polling_timeout().as_secs() as i64, - nanos: client_option.long_polling_timeout().subsec_nanos() as i32, + seconds: option.long_polling_timeout().as_secs() as i64, + nanos: option.long_polling_timeout().subsec_nanos() as i32, }), })), user_agent: Some(Ua { @@ -162,11 +156,12 @@ pub(crate) fn build_simple_consumer_settings( #[cfg(test)] mod tests { + use std::sync::atomic::AtomicUsize; + use std::sync::Arc; + use crate::model::common::Route; use crate::pb; use crate::pb::{Broker, MessageQueue}; - use std::sync::atomic::AtomicUsize; - use std::sync::Arc; use super::*; @@ -272,11 +267,11 @@ mod tests { #[test] fn util_build_producer_settings() { - build_producer_settings(&ProducerOption::default(), &ClientOption::default()); + build_producer_settings(&ProducerOption::default()); } #[test] fn util_build_simple_consumer_settings() { - build_simple_consumer_settings(&SimpleConsumerOption::default(), &ClientOption::default()); + build_simple_consumer_settings(&SimpleConsumerOption::default()); } }