From 7039e0ff45246baa873f68aa4de11da99474581b Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Wed, 6 Nov 2024 13:36:58 +0100 Subject: [PATCH] Collect agent info from sidecar (#701) * Collect agent info from sidecar Signed-off-by: Bob Weinand * Add comments and test for AgentInfo in sidecar Signed-off-by: Bob Weinand --------- Signed-off-by: Bob Weinand --- Cargo.lock | 1 + data-pipeline/src/agent_info/fetcher.rs | 7 +- data-pipeline/src/agent_info/mod.rs | 2 +- data-pipeline/src/agent_info/schema.rs | 6 +- sidecar-ffi/src/lib.rs | 33 ++++ sidecar/Cargo.toml | 1 + sidecar/src/service/agent_info.rs | 243 ++++++++++++++++++++++++ sidecar/src/service/mod.rs | 1 + sidecar/src/service/session_info.rs | 4 + sidecar/src/service/sidecar_server.rs | 8 + 10 files changed, 300 insertions(+), 6 deletions(-) create mode 100644 sidecar/src/service/agent_info.rs diff --git a/Cargo.lock b/Cargo.lock index 960da31e9..4bdd1ae22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1611,6 +1611,7 @@ dependencies = [ "cadence", "chrono", "console-subscriber", + "data-pipeline", "datadog-crashtracker", "datadog-dynamic-configuration", "datadog-ipc", diff --git a/data-pipeline/src/agent_info/fetcher.rs b/data-pipeline/src/agent_info/fetcher.rs index 0ec466161..895eb3621 100644 --- a/data-pipeline/src/agent_info/fetcher.rs +++ b/data-pipeline/src/agent_info/fetcher.rs @@ -17,9 +17,12 @@ use tokio::time::sleep; #[allow(clippy::declare_interior_mutable_const)] const DATADOG_AGENT_STATE: HeaderName = HeaderName::from_static("datadog-agent-state"); +/// Whether the agent reported the same value or not. #[derive(Debug)] -enum FetchInfoStatus { +pub enum FetchInfoStatus { + /// Unchanged SameState, + /// Has a new state NewState(Box), } @@ -28,7 +31,7 @@ enum FetchInfoStatus { /// If the state hash is different from the current one: /// - Return a `FetchInfoStatus::NewState` of the info struct /// - Else return `FetchInfoStatus::SameState` -async fn fetch_info_with_state( +pub async fn fetch_info_with_state( info_endpoint: &Endpoint, current_state_hash: Option<&str>, ) -> Result { diff --git a/data-pipeline/src/agent_info/mod.rs b/data-pipeline/src/agent_info/mod.rs index 9f82a666b..457414d61 100644 --- a/data-pipeline/src/agent_info/mod.rs +++ b/data-pipeline/src/agent_info/mod.rs @@ -14,4 +14,4 @@ mod fetcher; /// Stores an AgentInfo in an ArcSwap to be updated by an AgentInfoFetcher pub type AgentInfoArc = Arc>; -pub use fetcher::{fetch_info, AgentInfoFetcher}; +pub use fetcher::{fetch_info, fetch_info_with_state, AgentInfoFetcher, FetchInfoStatus}; diff --git a/data-pipeline/src/agent_info/schema.rs b/data-pipeline/src/agent_info/schema.rs index 362dda09c..accfaa071 100644 --- a/data-pipeline/src/agent_info/schema.rs +++ b/data-pipeline/src/agent_info/schema.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 //! This module provides struct representing the info endpoint response -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; /// Wrapper for an agent info response storing the state hash from the agent @@ -15,7 +15,7 @@ pub struct AgentInfo { /// Schema of an agent info response #[allow(missing_docs)] -#[derive(Clone, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] pub struct AgentInfoStruct { /// Version of the agent pub version: Option, @@ -38,7 +38,7 @@ pub struct AgentInfoStruct { } #[allow(missing_docs)] -#[derive(Clone, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] pub struct Config { pub default_env: Option, pub target_tps: Option, diff --git a/sidecar-ffi/src/lib.rs b/sidecar-ffi/src/lib.rs index aaee3a43a..7f096d3c1 100644 --- a/sidecar-ffi/src/lib.rs +++ b/sidecar-ffi/src/lib.rs @@ -14,6 +14,7 @@ use datadog_sidecar::config; use datadog_sidecar::config::LogMethod; use datadog_sidecar::crashtracker::crashtracker_unix_socket_path; use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener}; +use datadog_sidecar::service::agent_info::AgentInfoReader; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, @@ -911,3 +912,35 @@ pub unsafe extern "C" fn ddog_sidecar_get_crashtracker_unix_socket_path() -> ffi buf.copy_from_slice(str.as_bytes()); ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size) } + +/// Gets an agent info reader. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_get_agent_info_reader(endpoint: &Endpoint) -> Box { + Box::new(AgentInfoReader::new(endpoint)) +} + +/// Gets the current agent info environment (or empty if not existing) +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_get_agent_info_env<'a>( + reader: &'a mut AgentInfoReader, + changed: &mut bool, +) -> ffi::CharSlice<'a> { + let (has_changed, info) = reader.read(); + *changed = has_changed; + let config = if let Some(info) = info { + info.config.as_ref() + } else { + None + }; + config + .and_then(|c| c.default_env.as_ref()) + .map(|s| ffi::CharSlice::from(s.as_str())) + .unwrap_or(ffi::CharSlice::empty()) +} + +/// Drops the agent info reader. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_drop_agent_info_reader(_: Box) {} diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index b26030afc..8389d69ab 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -21,6 +21,7 @@ ddcommon = { path = "../ddcommon" } datadog-sidecar-macros = { path = "macros" } ddtelemetry = { path = "../ddtelemetry", features = ["tracing"] } +data-pipeline = { path = "../data-pipeline" } datadog-trace-protobuf = { path = "../trace-protobuf" } datadog-trace-utils = { path = "../trace-utils" } datadog-trace-normalization = { path = "../trace-normalization" } diff --git a/sidecar/src/service/agent_info.rs b/sidecar/src/service/agent_info.rs new file mode 100644 index 000000000..226fc1a0d --- /dev/null +++ b/sidecar/src/service/agent_info.rs @@ -0,0 +1,243 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! This file contains code for fetching and sharing the info from the Datadog Agent. +//! It will keep one fetcher per Endpoint. The SidecarServer is expected to keep the AgentInfoGuard +//! alive for the lifetime of the session. +//! The fetcher will remain alive for a short while after all guards have been dropped. +//! It writes the raw agent response to shared memory at a fixed per-endpoint location, to be +//! consumed be tracers. + +use crate::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter}; +use crate::primary_sidecar_identifier; +use base64::prelude::BASE64_URL_SAFE_NO_PAD; +use base64::Engine; +use data_pipeline::agent_info::schema::AgentInfoStruct; +use data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus}; +use datadog_ipc::platform::NamedShmHandle; +use ddcommon::Endpoint; +use futures::future::Shared; +use futures::FutureExt; +use http::uri::PathAndQuery; +use manual_future::ManualFuture; +use std::ffi::CString; +use std::hash::{Hash, Hasher}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use tokio::time::sleep; +use tracing::{error, warn}; +use zwohash::{HashMap, ZwoHasher}; + +#[derive(Default, Clone)] +pub struct AgentInfos(Arc>>); + +impl AgentInfos { + /// Ensures a fetcher for the endpoints agent info and keeps it alive for at least as long as + /// the returned guard exists. + pub fn query_for(&self, endpoint: Endpoint) -> AgentInfoGuard { + let mut infos_guard = self.0.lock().unwrap(); + if let Some(info) = infos_guard.get_mut(&endpoint) { + info.rc += 1; + } else { + infos_guard.insert( + endpoint.clone(), + AgentInfoFetcher::new(self.clone(), endpoint.clone()), + ); + } + + AgentInfoGuard { + infos: self.clone(), + endpoint, + } + } +} + +pub struct AgentInfoGuard { + infos: AgentInfos, + endpoint: Endpoint, +} + +impl AgentInfoGuard { + pub fn get(&self) -> Shared> { + let infos_guard = self.infos.0.lock().unwrap(); + let infos = infos_guard.get(&self.endpoint).unwrap(); + infos.infos.clone() + } +} + +impl Drop for AgentInfoGuard { + fn drop(&mut self) { + let mut infos_guard = self.infos.0.lock().unwrap(); + let info = infos_guard.get_mut(&self.endpoint).unwrap(); + info.last_update = Instant::now(); + info.rc -= 1; + } +} + +pub struct AgentInfoFetcher { + /// Once the last_update is too old, we'll stop the fetcher. + last_update: Instant, + /// Will be kept alive forever if rc > 0. + rc: u32, + /// The initial fetch is an unresolved future (to be able to await on it), subsequent fetches + /// are simply directly replacing this with a resolved future. + infos: Shared>, +} + +impl AgentInfoFetcher { + fn new(agent_infos: AgentInfos, endpoint: Endpoint) -> AgentInfoFetcher { + let (future, completer) = ManualFuture::new(); + tokio::spawn(async move { + let mut state: Option = None; + let mut writer = None; + let mut completer = Some(completer); + let mut fetch_endpoint = endpoint.clone(); + let mut parts = fetch_endpoint.url.into_parts(); + parts.path_and_query = Some(PathAndQuery::from_static("/info")); + fetch_endpoint.url = hyper::Uri::from_parts(parts).unwrap(); + loop { + let fetched = fetch_info_with_state(&fetch_endpoint, state.as_deref()).await; + let mut complete_fut = None; + { + let mut infos_guard = agent_infos.0.lock().unwrap(); + let infos = infos_guard.get_mut(&endpoint).unwrap(); + if infos.rc == 0 && infos.last_update.elapsed().as_secs() > 60 { + break; + } + match fetched { + Ok(FetchInfoStatus::SameState) => {} + Ok(FetchInfoStatus::NewState(status)) => { + state = Some(status.state_hash); + if writer.is_none() { + writer = match OneWayShmWriter::::new(info_path( + &endpoint, + )) { + Ok(writer) => Some(writer), + Err(e) => { + error!("Failed acquiring an agent info writer: {e:?}"); + None + } + }; + } + if let Some(ref writer) = writer { + writer.write(&serde_json::to_vec(&status.info).unwrap()) + } + if let Some(completer) = completer { + complete_fut = Some(completer.complete(status.info)); + } else { + infos.infos = ManualFuture::new_completed(status.info).shared(); + } + completer = None; + } + Err(e) => { + // We'll just return the old values as long as the endpoint is + // unreachable. + warn!( + "The agent info for {} could not be fetched: {}", + fetch_endpoint.url, e + ); + } + } + } + if let Some(complete_fut) = complete_fut.take() { + complete_fut.await; + } + sleep(Duration::from_secs(60)).await; + } + agent_infos.0.lock().unwrap().remove(&endpoint); + }); + + AgentInfoFetcher { + last_update: Instant::now(), + rc: 1, + infos: future.shared(), + } + } +} + +fn info_path(endpoint: &Endpoint) -> CString { + let mut hasher = ZwoHasher::default(); + endpoint.hash(&mut hasher); + let mut path = format!( + "/ddinf{}-{}", + primary_sidecar_identifier(), + BASE64_URL_SAFE_NO_PAD.encode(hasher.finish().to_ne_bytes()), + ); + // datadog agent info, on macos we're restricted to 31 chars + path.truncate(31); // should not be larger than 31 chars, but be sure. + CString::new(path).unwrap() +} + +pub struct AgentInfoReader { + reader: OneWayShmReader, + info: Option, +} + +impl AgentInfoReader { + pub fn new(endpoint: &Endpoint) -> AgentInfoReader { + let path = info_path(endpoint); + AgentInfoReader { + reader: OneWayShmReader::new(open_named_shm(&path).ok(), path), + info: None, + } + } + + pub fn read(&mut self) -> (bool, &Option) { + let (updated, data) = self.reader.read(); + if updated { + match serde_json::from_slice(data) { + Ok(info) => self.info = Some(info), + Err(e) => error!("Failed deserializing the agent info: {e:?}"), + } + } + (updated, &self.info) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use httpmock::prelude::*; + + const TEST_INFO: &str = r#"{ + "config": { + "default_env": "testenv" + } + }"#; + + const TEST_INFO_HASH: &str = "8c732aba385d605b010cd5bd12c03fef402eaefce989f0055aa4c7e92fe30077"; + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_fetch_info_without_state() { + let server = MockServer::start(); + let mock = server + .mock_async(|when, then| { + when.path("/info"); + then.status(200) + .header("content-type", "application/json") + .header("datadog-agent-state", TEST_INFO_HASH) + .body(TEST_INFO); + }) + .await; + let endpoint = Endpoint::from_url(server.url("/").parse().unwrap()); + let agent_infos = AgentInfos::default(); + + let mut reader = AgentInfoReader::new(&endpoint); + assert_eq!(reader.read(), (false, &None)); + + let info = agent_infos.query_for(endpoint).get().await; + mock.assert(); + assert_eq!( + info.config.unwrap().default_env, + Some("testenv".to_string()) + ); + + let (updated, info) = reader.read(); + assert!(updated); + assert_eq!( + info.as_ref().unwrap().config.as_ref().unwrap().default_env, + Some("testenv".to_string()) + ); + } +} diff --git a/sidecar/src/service/mod.rs b/sidecar/src/service/mod.rs index 4760f601b..a8e2ba8ef 100644 --- a/sidecar/src/service/mod.rs +++ b/sidecar/src/service/mod.rs @@ -27,6 +27,7 @@ use runtime_info::RuntimeInfo; use session_info::SessionInfo; use sidecar_interface::{SidecarInterface, SidecarInterfaceRequest, SidecarInterfaceResponse}; +pub mod agent_info; pub mod blocking; pub mod exception_hash_rate_limiter; mod instance_id; diff --git a/sidecar/src/service/session_info.rs b/sidecar/src/service/session_info.rs index b57b5a932..353f98d60 100644 --- a/sidecar/src/service/session_info.rs +++ b/sidecar/src/service/session_info.rs @@ -18,7 +18,9 @@ use tracing::{debug, error, info, trace}; use crate::log::{MultiEnvFilterGuard, MultiWriterGuard}; use crate::{spawn_map_err, tracer}; +use crate::service::agent_info::AgentInfoGuard; use crate::service::{InstanceId, QueueId, RuntimeInfo}; + /// `SessionInfo` holds information about a session. /// /// It contains a list of runtimes, session configuration, tracer configuration, and log guards. @@ -31,6 +33,7 @@ pub(crate) struct SessionInfo { tracer_config: Arc>, dogstatsd: Arc>>, remote_config_invariants: Arc>>, + pub(crate) agent_infos: Arc>>, pub(crate) remote_config_interval: Arc>, #[cfg(windows)] pub(crate) remote_config_notify_function: @@ -50,6 +53,7 @@ impl Clone for SessionInfo { tracer_config: self.tracer_config.clone(), dogstatsd: self.dogstatsd.clone(), remote_config_invariants: self.remote_config_invariants.clone(), + agent_infos: self.agent_infos.clone(), remote_config_interval: self.remote_config_interval.clone(), #[cfg(windows)] remote_config_notify_function: self.remote_config_notify_function.clone(), diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index a5d63ed01..b6e67bf56 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -39,6 +39,7 @@ use serde::{Deserialize, Serialize}; use tokio::task::{JoinError, JoinHandle}; use crate::config::get_product_endpoint; +use crate::service::agent_info::AgentInfos; use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::runtime_info::ActiveApplication; @@ -105,6 +106,8 @@ pub struct SidecarServer { Arc>>>, /// Keeps track of the number of submitted payloads. pub submitted_payloads: Arc, + /// All tracked agent infos per endpoint + pub agent_infos: AgentInfos, /// All remote config handling remote_configs: RemoteConfigs, /// The ProcessHandle tied to the connection @@ -689,6 +692,11 @@ impl SidecarInterface for SidecarServer { ); cfg.set_endpoint(logs_endpoint, diagnostics_endpoint).ok(); }); + if config.endpoint.api_key.is_none() { + // no agent info if agentless + *session.agent_infos.lock().unwrap() = + Some(self.agent_infos.query_for(config.endpoint.clone())); + } session.set_remote_config_invariants(ConfigInvariants { language: config.language, tracer_version: config.tracer_version,