diff --git a/Cargo.lock b/Cargo.lock index 4ea9552fe4ff..50843d6e06ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2936,6 +2936,7 @@ dependencies = [ "pretty_assertions", "semver", "serde", + "serde_with", "serfig", "strum 0.24.1", ] @@ -10180,6 +10181,19 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "opentelemetry-http" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7690dc77bf776713848c4faa6501157469017eaf332baccd4eb1cea928743d94" +dependencies = [ + "async-trait", + "bytes", + "http 0.2.11", + "opentelemetry", + "reqwest 0.11.24", +] + [[package]] name = "opentelemetry-otlp" version = "0.15.0" @@ -10190,10 +10204,12 @@ dependencies = [ "futures-core", "http 0.2.11", "opentelemetry", + "opentelemetry-http", "opentelemetry-proto", "opentelemetry-semantic-conventions", "opentelemetry_sdk", "prost 0.12.3", + "reqwest 0.11.24", "thiserror", "tokio", "tonic", @@ -10228,6 +10244,7 @@ dependencies = [ "futures-channel", "futures-executor", "futures-util", + "glob", "once_cell", "opentelemetry", "ordered-float 4.2.0", @@ -12647,11 +12664,11 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.6.1" +version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15d167997bd841ec232f5b2b8e0e26606df2e7caa4c31b95ea9ca52b200bd270" +checksum = "0ad483d2ab0149d5a5ebcd9972a3852711e0153d863bf5a5d0391d28883c4a20" dependencies = [ - "base64 0.21.7", + "base64 0.22.0", "chrono", "hex", "indexmap 1.9.3", @@ -12665,9 +12682,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.6.1" +version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "865f9743393e638991566a8b7a479043c2c8da94a33e0a31f18214c9cae0a64d" +checksum = "65569b702f41443e8bc8bbb1c5779bd0450bbe723b56198980e80ec45780bce2" dependencies = [ "darling 0.20.8", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index d0fa3dfce6ef..148ecdc72732 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -234,6 +234,7 @@ arrow-udf-wasm = { package = "arrow-udf-wasm", git = "https://github.com/datafus prost = { version = "0.12.1" } prost-build = { version = "0.12.1" } serde = { version = "1.0.164", features = ["derive", "rc"] } +serde_with = { version = "3.8.1" } serde_json = { version = "1.0.85", default-features = false, features = ["preserve_order"] } tonic-build = { version = "0.11" } @@ -247,7 +248,13 @@ logcall = "0.1.5" minitrace = { version = "0.6.5", features = ["enable"] } minitrace-opentelemetry = "0.6.5" opentelemetry = { version = "0.22", features = ["trace", "logs"] } -opentelemetry-otlp = { version = "0.15", features = ["trace", "logs", "grpc-tonic"] } +opentelemetry-otlp = { version = "0.15", features = [ + "trace", + "logs", + "grpc-tonic", + "http-proto", + "reqwest-client", +] } opentelemetry_sdk = { version = "0.22", features = ["trace", "logs", "rt-tokio"] } tracing = "0.1.40" tracing-appender = "0.2.3" diff --git a/src/binaries/meta/entry.rs b/src/binaries/meta/entry.rs index dd84466e7ba9..bdb67aae706f 100644 --- a/src/binaries/meta/entry.rs +++ b/src/binaries/meta/entry.rs @@ -132,8 +132,12 @@ pub async fn entry(conf: Config) -> anyhow::Result<()> { println!("Log:"); println!(" File: {}", conf.log.file); println!(" Stderr: {}", conf.log.stderr); - println!(" OTLP: {}", conf.log.otlp); - println!(" Tracing: {}", conf.log.tracing); + if conf.log.otlp.on { + println!(" OpenTelemetry: {}", conf.log.otlp); + } + if conf.log.tracing.on { + println!(" Tracing: {}", conf.log.tracing); + } println!("Id: {}", conf.raft_config.id); println!("Raft Cluster Name: {}", conf.raft_config.cluster_name); println!("Raft Dir: {}", conf.raft_config.raft_dir); diff --git a/src/binaries/query/entry.rs b/src/binaries/query/entry.rs index 4fbbc52d9d07..c82e7c157680 100644 --- a/src/binaries/query/entry.rs +++ b/src/binaries/query/entry.rs @@ -223,9 +223,18 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> { println!("Logging:"); println!(" file: {}", conf.log.file); println!(" stderr: {}", conf.log.stderr); - println!(" otlp: {}", conf.log.otlp); - println!(" query: {}", conf.log.query); - println!(" tracing: {}", conf.log.tracing); + if conf.log.otlp.on { + println!(" otlp: {}", conf.log.otlp); + } + if conf.log.query.on { + println!(" query: {}", conf.log.query); + } + if conf.log.profile.on { + println!(" profile: {}", conf.log.profile); + } + if conf.log.structlog.on { + println!(" structlog: {}", conf.log.structlog); + } println!(); println!( @@ -355,6 +364,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> { .await; } info!("Shutdown server."); + log::logger().flush(); Ok(()) } diff --git a/src/common/tracing/src/config.rs b/src/common/tracing/src/config.rs index 99c1521e56d5..b2fe10ced6fd 100644 --- a/src/common/tracing/src/config.rs +++ b/src/common/tracing/src/config.rs @@ -121,22 +121,15 @@ impl Default for StderrConfig { pub struct OTLPConfig { pub on: bool, pub level: String, - pub endpoint: String, - pub labels: BTreeMap, + pub endpoint: OTLPEndpointConfig, } impl Display for OTLPConfig { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{}:{}", k, v)) - .collect::>() - .join(","); write!( f, - "enabled={}, level={}, endpoint={}, labels={}", - self.on, self.level, self.endpoint, labels + "enabled={}, level={}, endpoint={}", + self.on, self.level, self.endpoint ) } } @@ -146,8 +139,7 @@ impl Default for OTLPConfig { Self { on: false, level: "INFO".to_string(), - endpoint: "http://127.0.0.1:4317".to_string(), - labels: BTreeMap::new(), + endpoint: OTLPEndpointConfig::default(), } } } @@ -156,23 +148,16 @@ impl Default for OTLPConfig { pub struct QueryLogConfig { pub on: bool, pub dir: String, - pub otlp_endpoint: String, - pub labels: BTreeMap, + pub otlp: Option, } impl Display for QueryLogConfig { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{}:{}", k, v)) - .collect::>() - .join(","); - write!( - f, - "enabled={}, dir={}, otlp_endpoint={}, labels={}", - self.on, self.dir, self.otlp_endpoint, labels, - ) + write!(f, "enabled={}, dir={}", self.on, self.dir)?; + if let Some(endpoint) = &self.otlp { + write!(f, ", otlp={}", endpoint)?; + } + Ok(()) } } @@ -181,8 +166,7 @@ impl Default for QueryLogConfig { Self { on: false, dir: "".to_string(), - otlp_endpoint: "".to_string(), - labels: BTreeMap::new(), + otlp: None, } } } @@ -191,23 +175,16 @@ impl Default for QueryLogConfig { pub struct ProfileLogConfig { pub on: bool, pub dir: String, - pub otlp_endpoint: String, - pub labels: BTreeMap, + pub otlp: Option, } impl Display for ProfileLogConfig { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let labels = self - .labels - .iter() - .map(|(k, v)| format!("{}:{}", k, v)) - .collect::>() - .join(","); - write!( - f, - "enabled={}, dir={}, otlp_endpoint={}, labels={}", - self.on, self.dir, self.otlp_endpoint, labels, - ) + write!(f, "enabled={}, dir={}", self.on, self.dir)?; + if let Some(endpoint) = &self.otlp { + write!(f, ", otlp={}", endpoint)?; + } + Ok(()) } } @@ -216,8 +193,7 @@ impl Default for ProfileLogConfig { Self { on: false, dir: "".to_string(), - otlp_endpoint: "".to_string(), - labels: BTreeMap::new(), + otlp: None, } } } @@ -247,15 +223,15 @@ impl Default for StructLogConfig { pub struct TracingConfig { pub on: bool, pub capture_log_level: String, - pub otlp_endpoint: String, + pub otlp: OTLPEndpointConfig, } impl Display for TracingConfig { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!( f, - "enabled={}, capture_log_level={}, otlp_endpoint={}", - self.on, self.capture_log_level, self.otlp_endpoint + "enabled={}, capture_log_level={}, otlp={}", + self.on, self.capture_log_level, self.otlp ) } } @@ -265,7 +241,86 @@ impl Default for TracingConfig { Self { on: false, capture_log_level: "INFO".to_string(), - otlp_endpoint: "http://127.0.0.1:4317".to_string(), + otlp: OTLPEndpointConfig::default(), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum OTLPProtocol { + Http, + Grpc, +} + +impl Default for OTLPProtocol { + fn default() -> Self { + Self::Grpc + } +} + +impl Display for OTLPProtocol { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + OTLPProtocol::Http => write!(f, "http"), + OTLPProtocol::Grpc => write!(f, "grpc"), + } + } +} + +impl serde::Serialize for OTLPProtocol { + fn serialize(&self, serializer: S) -> Result + where S: serde::Serializer { + serializer.serialize_str(match self { + OTLPProtocol::Http => "http", + OTLPProtocol::Grpc => "grpc", + }) + } +} + +impl<'de> serde::Deserialize<'de> for OTLPProtocol { + fn deserialize(deserializer: D) -> Result + where D: serde::Deserializer<'de> { + let protocol = String::deserialize(deserializer)?; + match protocol.as_str() { + "http" => Ok(OTLPProtocol::Http), + "grpc" => Ok(OTLPProtocol::Grpc), + _ => Err(serde::de::Error::custom(format!( + "unknown protocol: {}", + protocol + ))), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct OTLPEndpointConfig { + pub endpoint: String, + pub protocol: OTLPProtocol, + pub labels: BTreeMap, +} + +impl Display for OTLPEndpointConfig { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}:{}", k, v)) + .collect::>() + .join(","); + write!( + f, + "[endpoint={}, protocol={}, labels={}]", + self.endpoint, self.protocol, labels + ) + } +} + +impl Default for OTLPEndpointConfig { + fn default() -> Self { + Self { + endpoint: "http://127.0.0.1:4317".to_string(), + protocol: OTLPProtocol::Grpc, + labels: BTreeMap::new(), } } } diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 9f31ba7fe0c9..0ba555abe414 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -25,6 +25,7 @@ use log::Log; use minitrace::prelude::*; use opentelemetry_otlp::WithExportConfig; +use crate::config::OTLPProtocol; use crate::loggers::formatter; use crate::loggers::new_file_log_writer; use crate::loggers::MinitraceLogger; @@ -73,7 +74,7 @@ pub fn inject_span_to_tonic_request(msg: impl tonic::IntoRequest) -> tonic pub fn init_logging( name: &str, cfg: &Config, - mut labels: BTreeMap, + labels: BTreeMap, ) -> Vec> { let mut guards: Vec> = Vec::new(); let log_name = name; @@ -89,15 +90,44 @@ pub fn init_logging( } ), }; - // use name as service name if not specified - if !labels.contains_key("service") { - labels.insert("service".to_string(), trace_name.to_string()); - } // Initialize tracing reporter if cfg.tracing.on { - let otlp_endpoint = cfg.tracing.otlp_endpoint.clone(); - + let endpoint = cfg.tracing.otlp.endpoint.clone(); + let mut kvs = cfg + .tracing + .otlp + .labels + .iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string())) + .collect::>(); + kvs.push(opentelemetry::KeyValue::new( + "service.name", + trace_name.clone(), + )); + for (k, v) in labels { + kvs.push(opentelemetry::KeyValue::new(k, v)); + } + let exporter = match cfg.tracing.otlp.protocol { + OTLPProtocol::Grpc => opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint) + .with_protocol(opentelemetry_otlp::Protocol::Grpc) + .with_timeout(Duration::from_secs( + opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, + )) + .build_span_exporter() + .expect("initialize oltp grpc exporter"), + OTLPProtocol::Http => opentelemetry_otlp::new_exporter() + .http() + .with_endpoint(endpoint) + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .with_timeout(Duration::from_secs( + opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, + )) + .build_span_exporter() + .expect("initialize oltp http exporter"), + }; let (reporter_rt, otlp_reporter) = Thread::spawn(|| { // Init runtime with 2 threads. let rt = tokio::runtime::Builder::new_multi_thread() @@ -107,19 +137,9 @@ pub fn init_logging( .unwrap(); let reporter = rt.block_on(async { minitrace_opentelemetry::OpenTelemetryReporter::new( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(otlp_endpoint) - .with_protocol(opentelemetry_otlp::Protocol::Grpc) - .with_timeout(Duration::from_secs( - opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, - )) - .build_span_exporter() - .expect("initialize oltp exporter"), + exporter, opentelemetry::trace::SpanKind::Server, - Cow::Owned(opentelemetry_sdk::Resource::new([ - opentelemetry::KeyValue::new("service.name", trace_name.clone()), - ])), + Cow::Owned(opentelemetry_sdk::Resource::new(kvs)), opentelemetry::InstrumentationLibrary::new( trace_name, None::<&'static str>, @@ -181,10 +201,7 @@ pub fn init_logging( // OpenTelemetry logger if cfg.otlp.on { - let mut labels = labels.clone(); - labels.insert("category".to_string(), "system".to_string()); - labels.extend(cfg.otlp.labels.clone()); - let logger = OpenTelemetryLogger::new(log_name, &cfg.otlp.endpoint, labels); + let logger = OpenTelemetryLogger::new(log_name, "system", &cfg.otlp.endpoint); let dispatch = fern::Dispatch::new() .level(cfg.otlp.level.parse().unwrap_or(LevelFilter::Info)) .format(formatter("json")) @@ -215,11 +232,8 @@ pub fn init_logging( guards.push(Box::new(flush_guard)); query_logger = query_logger.chain(Box::new(query_log_file) as Box); } - if !cfg.query.otlp_endpoint.is_empty() { - let mut labels = labels.clone(); - labels.insert("category".to_string(), "query".to_string()); - labels.extend(cfg.query.labels.clone()); - let logger = OpenTelemetryLogger::new(log_name, &cfg.query.otlp_endpoint, labels); + if let Some(endpoint) = &cfg.query.otlp { + let logger = OpenTelemetryLogger::new(log_name, "query", endpoint); query_logger = query_logger.chain(Box::new(logger) as Box); } } @@ -233,11 +247,8 @@ pub fn init_logging( profile_logger = profile_logger.chain(Box::new(profile_log_file) as Box); } - if !cfg.profile.otlp_endpoint.is_empty() { - let mut labels = labels.clone(); - labels.insert("category".to_string(), "profile".to_string()); - labels.extend(cfg.profile.labels.clone()); - let logger = OpenTelemetryLogger::new(log_name, &cfg.profile.otlp_endpoint, labels); + if let Some(endpoint) = &cfg.profile.otlp { + let logger = OpenTelemetryLogger::new(log_name, "profile", endpoint); profile_logger = profile_logger.chain(Box::new(logger) as Box); } } diff --git a/src/common/tracing/src/lib.rs b/src/common/tracing/src/lib.rs index 12a57bb5661f..dc49e0cb1fc6 100644 --- a/src/common/tracing/src/lib.rs +++ b/src/common/tracing/src/lib.rs @@ -24,6 +24,8 @@ mod structlog; pub use crate::config::Config; pub use crate::config::FileConfig; pub use crate::config::OTLPConfig; +pub use crate::config::OTLPEndpointConfig; +pub use crate::config::OTLPProtocol; pub use crate::config::ProfileLogConfig; pub use crate::config::QueryLogConfig; pub use crate::config::StderrConfig; diff --git a/src/common/tracing/src/loggers.rs b/src/common/tracing/src/loggers.rs index e2a7fd24e275..a09b59d9437c 100644 --- a/src/common/tracing/src/loggers.rs +++ b/src/common/tracing/src/loggers.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; use std::fmt; use std::io::BufWriter; use std::path::Path; @@ -32,6 +31,9 @@ use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::rolling::RollingFileAppender; use tracing_appender::rolling::Rotation; +use crate::config::OTLPEndpointConfig; +use crate::config::OTLPProtocol; + /// Create a `BufWriter` for a rolling file logger. /// /// `BufWriter` collects log segments into a whole before sending to underlying writer. @@ -85,6 +87,8 @@ impl log::Log for MinitraceLogger { #[derive(Debug)] pub(crate) struct OpenTelemetryLogger { + name: String, + category: String, library: Arc, provider: opentelemetry_sdk::logs::LoggerProvider, } @@ -92,28 +96,56 @@ pub(crate) struct OpenTelemetryLogger { impl OpenTelemetryLogger { pub(crate) fn new( name: impl ToString, - endpoint: &str, - labels: BTreeMap, + category: impl ToString, + config: &OTLPEndpointConfig, ) -> Self { - let kvs = labels - .into_iter() - .map(|(k, v)| opentelemetry::KeyValue::new(k, v)) - .collect::>(); - let export_config = opentelemetry_otlp::ExportConfig { - endpoint: endpoint.to_string(), - protocol: opentelemetry_otlp::Protocol::Grpc, - timeout: Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT), + let exporter = match config.protocol { + OTLPProtocol::Grpc => { + let export_config = opentelemetry_otlp::ExportConfig { + endpoint: config.endpoint.clone(), + protocol: opentelemetry_otlp::Protocol::Grpc, + timeout: Duration::from_secs( + opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, + ), + }; + let exporter_builder: opentelemetry_otlp::LogExporterBuilder = + opentelemetry_otlp::new_exporter() + .tonic() + .with_export_config(export_config) + .into(); + exporter_builder + .build_log_exporter() + .expect("build grpc log exporter") + } + OTLPProtocol::Http => { + let export_config = opentelemetry_otlp::ExportConfig { + endpoint: config.endpoint.clone(), + protocol: opentelemetry_otlp::Protocol::HttpBinary, + timeout: Duration::from_secs( + opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, + ), + }; + let exporter_builder: opentelemetry_otlp::LogExporterBuilder = + opentelemetry_otlp::new_exporter() + .http() + .with_export_config(export_config) + .into(); + exporter_builder + .build_log_exporter() + .expect("build http log exporter") + } }; - let exporter_builder: opentelemetry_otlp::LogExporterBuilder = - opentelemetry_otlp::new_exporter() - .tonic() - .with_export_config(export_config) - .into(); - let exporter = exporter_builder - .build_log_exporter() - .expect("build log exporter"); + let mut kvs = config + .labels + .iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string())) + .collect::>(); + kvs.push(opentelemetry::KeyValue::new( + "category", + category.to_string(), + )); let provider = opentelemetry_sdk::logs::LoggerProvider::builder() - .with_simple_exporter(exporter) + .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio) .with_config( opentelemetry_sdk::logs::Config::default() .with_resource(opentelemetry_sdk::Resource::new(kvs)), @@ -125,7 +157,12 @@ impl OpenTelemetryLogger { None::<&str>, None, )); - Self { library, provider } + Self { + name: name.to_string(), + category: category.to_string(), + library, + provider, + } } pub fn instrumentation_library(&self) -> &InstrumentationLibrary { @@ -163,7 +200,7 @@ impl log::Log for OpenTelemetryLogger { let result = self.provider.force_flush(); for r in result { if let Err(e) = r { - eprintln!("flush log failed: {}", e); + eprintln!("flush logger {}:{} failed: {}", self.name, self.category, e); } } } diff --git a/src/query/config/Cargo.toml b/src/query/config/Cargo.toml index ebd14861e32a..cb731358c365 100644 --- a/src/query/config/Cargo.toml +++ b/src/query/config/Cargo.toml @@ -33,6 +33,7 @@ hex = "0.4.3" log = { workspace = true } semver = { workspace = true } serde = { workspace = true } +serde_with = { workspace = true } serfig = { workspace = true } strum = "0.24.1" diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 426c4818b607..42b316eafac3 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -48,6 +48,8 @@ use databend_common_storage::StorageConfig as InnerStorageConfig; use databend_common_tracing::Config as InnerLogConfig; use databend_common_tracing::FileConfig as InnerFileLogConfig; use databend_common_tracing::OTLPConfig as InnerOTLPLogConfig; +use databend_common_tracing::OTLPEndpointConfig as InnerOTLPEndpointConfig; +use databend_common_tracing::OTLPProtocol; use databend_common_tracing::ProfileLogConfig as InnerProfileLogConfig; use databend_common_tracing::QueryLogConfig as InnerQueryLogConfig; use databend_common_tracing::StderrConfig as InnerStderrLogConfig; @@ -56,6 +58,7 @@ use databend_common_tracing::TracingConfig as InnerTracingConfig; use databend_common_users::idm_config::IDMConfig as InnerIDMConfig; use serde::Deserialize; use serde::Serialize; +use serde_with::with_prefix; use serfig::collectors::from_env; use serfig::collectors::from_file; use serfig::collectors::from_self; @@ -1932,14 +1935,14 @@ impl TryInto for LogConfig { } let otlp: InnerOTLPLogConfig = self.otlp.try_into()?; - if otlp.on && otlp.endpoint.is_empty() { + if otlp.on && otlp.endpoint.endpoint.is_empty() { return Err(ErrorCode::InvalidConfig( "`endpoint` must be set when `otlp.on` is true".to_string(), )); } let mut query: InnerQueryLogConfig = self.query.try_into()?; - if query.on && query.dir.is_empty() && query.otlp_endpoint.is_empty() { + if query.on && query.dir.is_empty() && query.otlp.is_none() { if file.dir.is_empty() { return Err(ErrorCode::InvalidConfig( "`dir` or `file.dir` must be set when `query.dir` is empty".to_string(), @@ -1950,7 +1953,7 @@ impl TryInto for LogConfig { } let mut profile: InnerProfileLogConfig = self.profile.try_into()?; - if profile.on && profile.dir.is_empty() && profile.otlp_endpoint.is_empty() { + if profile.on && profile.dir.is_empty() && profile.otlp.is_none() { if file.dir.is_empty() { return Err(ErrorCode::InvalidConfig( "`dir` or `file.dir` must be set when `profile.dir` is empty".to_string(), @@ -2147,19 +2150,9 @@ pub struct OTLPLogConfig { #[serde(rename = "level")] pub otlp_level: String, - /// Log OpenTelemetry OTLP endpoint - #[clap( - long = "log-otlp-endpoint", - value_name = "VALUE", - default_value = "http://127.0.0.1:4317" - )] - #[serde(rename = "endpoint")] - pub otlp_endpoint: String, - - /// Log Labels #[clap(skip)] - #[serde(rename = "labels")] - pub otlp_labels: BTreeMap, + #[serde(flatten, with = "prefix_otlp")] + pub otlp_endpoint: OTLPEndpointConfig, } impl Default for OTLPLogConfig { @@ -2175,8 +2168,7 @@ impl TryInto for OTLPLogConfig { Ok(InnerOTLPLogConfig { on: self.otlp_on, level: self.otlp_level, - endpoint: self.otlp_endpoint, - labels: self.otlp_labels, + endpoint: self.otlp_endpoint.try_into()?, }) } } @@ -2186,8 +2178,7 @@ impl From for OTLPLogConfig { Self { otlp_on: inner.on, otlp_level: inner.level, - otlp_endpoint: inner.endpoint, - otlp_labels: inner.labels, + otlp_endpoint: inner.endpoint.into(), } } } @@ -2204,19 +2195,9 @@ pub struct QueryLogConfig { #[serde(rename = "dir")] pub log_query_dir: String, - /// Query Log OpenTelemetry OTLP endpoint - #[clap( - long = "log-query-otlp-endpoint", - value_name = "VALUE", - default_value = "" - )] - #[serde(rename = "otlp_endpoint")] - pub log_query_otlp_endpoint: String, - - /// Query Log Labels #[clap(skip)] - #[serde(rename = "labels")] - pub log_query_otlp_labels: BTreeMap, + #[serde(flatten, with = "prefix_otlp")] + pub log_query_otlp: Option, } impl Default for QueryLogConfig { @@ -2232,8 +2213,7 @@ impl TryInto for QueryLogConfig { Ok(InnerQueryLogConfig { on: self.log_query_on, dir: self.log_query_dir, - otlp_endpoint: self.log_query_otlp_endpoint, - labels: self.log_query_otlp_labels, + otlp: self.log_query_otlp.map(|cfg| cfg.try_into()).transpose()?, }) } } @@ -2243,8 +2223,7 @@ impl From for QueryLogConfig { Self { log_query_on: inner.on, log_query_dir: inner.dir, - log_query_otlp_endpoint: inner.otlp_endpoint, - log_query_otlp_labels: inner.labels, + log_query_otlp: inner.otlp.map(|cfg| cfg.into()), } } } @@ -2261,19 +2240,9 @@ pub struct ProfileLogConfig { #[serde(rename = "dir")] pub log_profile_dir: String, - /// Profile Log OpenTelemetry OTLP endpoint - #[clap( - long = "log-profile-otlp-endpoint", - value_name = "VALUE", - default_value = "" - )] - #[serde(rename = "otlp_endpoint")] - pub log_profile_otlp_endpoint: String, - - /// Profile Log Labels #[clap(skip)] - #[serde(rename = "labels")] - pub log_profile_otlp_labels: BTreeMap, + #[serde(flatten, with = "prefix_otlp")] + pub log_profile_otlp: Option, } impl Default for ProfileLogConfig { @@ -2289,8 +2258,10 @@ impl TryInto for ProfileLogConfig { Ok(InnerProfileLogConfig { on: self.log_profile_on, dir: self.log_profile_dir, - otlp_endpoint: self.log_profile_otlp_endpoint, - labels: self.log_profile_otlp_labels, + otlp: self + .log_profile_otlp + .map(|cfg| cfg.try_into()) + .transpose()?, }) } } @@ -2300,8 +2271,7 @@ impl From for ProfileLogConfig { Self { log_profile_on: inner.on, log_profile_dir: inner.dir, - log_profile_otlp_endpoint: inner.otlp_endpoint, - log_profile_otlp_labels: inner.labels, + log_profile_otlp: inner.otlp.map(|cfg| cfg.into()), } } } @@ -2361,14 +2331,9 @@ pub struct TracingConfig { #[serde(rename = "capture_log_level")] pub tracing_capture_log_level: String, - /// Tracing otlp endpoint - #[clap( - long = "log-tracing-otlp-endpoint", - value_name = "VALUE", - default_value = "http://127.0.0.1:4317" - )] - #[serde(rename = "otlp_endpoint")] - pub tracing_otlp_endpoint: String, + #[clap(skip)] + #[serde(flatten, with = "prefix_otlp")] + pub tracing_otlp: OTLPEndpointConfig, } impl Default for TracingConfig { @@ -2384,7 +2349,7 @@ impl TryInto for TracingConfig { Ok(InnerTracingConfig { on: self.tracing_on, capture_log_level: self.tracing_capture_log_level, - otlp_endpoint: self.tracing_otlp_endpoint, + otlp: self.tracing_otlp.try_into()?, }) } } @@ -2394,7 +2359,52 @@ impl From for TracingConfig { Self { tracing_on: inner.on, tracing_capture_log_level: inner.capture_log_level, - tracing_otlp_endpoint: inner.otlp_endpoint, + tracing_otlp: inner.otlp.into(), + } + } +} + +with_prefix!(prefix_otlp "otlp_"); + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)] +#[serde(default)] +pub struct OTLPEndpointConfig { + /// Log OpenTelemetry OTLP endpoint + pub endpoint: String, + + /// Log OpenTelemetry OTLP protocol + #[clap(skip)] + pub protocol: OTLPProtocol, + + /// Log OpenTelemetry Labels + #[clap(skip)] + pub labels: BTreeMap, +} + +impl Default for OTLPEndpointConfig { + fn default() -> Self { + InnerOTLPEndpointConfig::default().into() + } +} + +impl TryInto for OTLPEndpointConfig { + type Error = ErrorCode; + + fn try_into(self) -> Result { + Ok(InnerOTLPEndpointConfig { + endpoint: self.endpoint, + protocol: self.protocol, + labels: self.labels, + }) + } +} + +impl From for OTLPEndpointConfig { + fn from(inner: InnerOTLPEndpointConfig) -> Self { + Self { + endpoint: inner.endpoint, + protocol: inner.protocol, + labels: inner.labels, } } } diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 4826106fc5b7..2306654caf83 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -35,15 +35,14 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'log' | 'log_dir' | 'null' | '' | | 'log' | 'log_level' | 'null' | '' | | 'log' | 'log_query_enabled' | 'null' | '' | -| 'log' | 'otlp.endpoint' | 'http://127.0.0.1:4317' | '' | | 'log' | 'otlp.level' | 'INFO' | '' | | 'log' | 'otlp.on' | 'false' | '' | +| 'log' | 'otlp.otlp_endpoint' | 'http://127.0.0.1:4317' | '' | +| 'log' | 'otlp.otlp_protocol' | 'grpc' | '' | | 'log' | 'profile.dir' | '' | '' | | 'log' | 'profile.on' | 'false' | '' | -| 'log' | 'profile.otlp_endpoint' | '' | '' | | 'log' | 'query.dir' | '' | '' | | 'log' | 'query.on' | 'false' | '' | -| 'log' | 'query.otlp_endpoint' | '' | '' | | 'log' | 'query_enabled' | 'null' | '' | | 'log' | 'stderr.format' | 'text' | '' | | 'log' | 'stderr.level' | 'WARN' | '' | @@ -53,6 +52,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'log' | 'tracing.capture_log_level' | 'INFO' | '' | | 'log' | 'tracing.on' | 'false' | '' | | 'log' | 'tracing.otlp_endpoint' | 'http://127.0.0.1:4317' | '' | +| 'log' | 'tracing.otlp_protocol' | 'grpc' | '' | | 'meta' | 'auto_sync_interval' | '0' | '' | | 'meta' | 'client_timeout_in_second' | '10' | '' | | 'meta' | 'embedded_dir' | '' | '' |