Skip to content

Commit

Permalink
Collect agent info from sidecar (#701)
Browse files Browse the repository at this point in the history
* Collect agent info from sidecar

Signed-off-by: Bob Weinand <[email protected]>

* Add comments and test for AgentInfo in sidecar

Signed-off-by: Bob Weinand <[email protected]>

---------

Signed-off-by: Bob Weinand <[email protected]>
  • Loading branch information
bwoebi authored Nov 6, 2024
1 parent aed5ac8 commit 7039e0f
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions data-pipeline/src/agent_info/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ use tokio::time::sleep;
#[allow(clippy::declare_interior_mutable_const)]
const DATADOG_AGENT_STATE: HeaderName = HeaderName::from_static("datadog-agent-state");

/// Whether the agent reported the same value or not.
#[derive(Debug)]
enum FetchInfoStatus {
pub enum FetchInfoStatus {
/// Unchanged
SameState,
/// Has a new state
NewState(Box<AgentInfo>),
}

Expand All @@ -28,7 +31,7 @@ enum FetchInfoStatus {
/// If the state hash is different from the current one:
/// - Return a `FetchInfoStatus::NewState` of the info struct
/// - Else return `FetchInfoStatus::SameState`
async fn fetch_info_with_state(
pub async fn fetch_info_with_state(
info_endpoint: &Endpoint,
current_state_hash: Option<&str>,
) -> Result<FetchInfoStatus> {
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/agent_info/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ mod fetcher;
/// Stores an AgentInfo in an ArcSwap to be updated by an AgentInfoFetcher
pub type AgentInfoArc = Arc<ArcSwapOption<schema::AgentInfo>>;

pub use fetcher::{fetch_info, AgentInfoFetcher};
pub use fetcher::{fetch_info, fetch_info_with_state, AgentInfoFetcher, FetchInfoStatus};
6 changes: 3 additions & 3 deletions data-pipeline/src/agent_info/schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
//! This module provides struct representing the info endpoint response
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Wrapper for an agent info response storing the state hash from the agent
Expand All @@ -15,7 +15,7 @@ pub struct AgentInfo {

/// Schema of an agent info response
#[allow(missing_docs)]
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
pub struct AgentInfoStruct {
/// Version of the agent
pub version: Option<String>,
Expand All @@ -38,7 +38,7 @@ pub struct AgentInfoStruct {
}

#[allow(missing_docs)]
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
pub struct Config {
pub default_env: Option<String>,
pub target_tps: Option<f64>,
Expand Down
33 changes: 33 additions & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datadog_sidecar::config;
use datadog_sidecar::config::LogMethod;
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener};
use datadog_sidecar::service::agent_info::AgentInfoReader;
use datadog_sidecar::service::{
blocking::{self, SidecarTransport},
InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction,
Expand Down Expand Up @@ -911,3 +912,35 @@ pub unsafe extern "C" fn ddog_sidecar_get_crashtracker_unix_socket_path() -> ffi
buf.copy_from_slice(str.as_bytes());
ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size)
}

/// Gets an agent info reader.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_get_agent_info_reader(endpoint: &Endpoint) -> Box<AgentInfoReader> {
Box::new(AgentInfoReader::new(endpoint))
}

/// Gets the current agent info environment (or empty if not existing)
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_get_agent_info_env<'a>(
reader: &'a mut AgentInfoReader,
changed: &mut bool,
) -> ffi::CharSlice<'a> {
let (has_changed, info) = reader.read();
*changed = has_changed;
let config = if let Some(info) = info {
info.config.as_ref()
} else {
None
};
config
.and_then(|c| c.default_env.as_ref())
.map(|s| ffi::CharSlice::from(s.as_str()))
.unwrap_or(ffi::CharSlice::empty())
}

/// Drops the agent info reader.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_drop_agent_info_reader(_: Box<AgentInfoReader>) {}
1 change: 1 addition & 0 deletions sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ddcommon = { path = "../ddcommon" }
datadog-sidecar-macros = { path = "macros" }

ddtelemetry = { path = "../ddtelemetry", features = ["tracing"] }
data-pipeline = { path = "../data-pipeline" }
datadog-trace-protobuf = { path = "../trace-protobuf" }
datadog-trace-utils = { path = "../trace-utils" }
datadog-trace-normalization = { path = "../trace-normalization" }
Expand Down
243 changes: 243 additions & 0 deletions sidecar/src/service/agent_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! This file contains code for fetching and sharing the info from the Datadog Agent.
//! It will keep one fetcher per Endpoint. The SidecarServer is expected to keep the AgentInfoGuard
//! alive for the lifetime of the session.
//! The fetcher will remain alive for a short while after all guards have been dropped.
//! It writes the raw agent response to shared memory at a fixed per-endpoint location, to be
//! consumed be tracers.

use crate::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter};
use crate::primary_sidecar_identifier;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
use data_pipeline::agent_info::schema::AgentInfoStruct;
use data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus};
use datadog_ipc::platform::NamedShmHandle;
use ddcommon::Endpoint;
use futures::future::Shared;
use futures::FutureExt;
use http::uri::PathAndQuery;
use manual_future::ManualFuture;
use std::ffi::CString;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::time::sleep;
use tracing::{error, warn};
use zwohash::{HashMap, ZwoHasher};

#[derive(Default, Clone)]
pub struct AgentInfos(Arc<Mutex<HashMap<Endpoint, AgentInfoFetcher>>>);

impl AgentInfos {
/// Ensures a fetcher for the endpoints agent info and keeps it alive for at least as long as
/// the returned guard exists.
pub fn query_for(&self, endpoint: Endpoint) -> AgentInfoGuard {
let mut infos_guard = self.0.lock().unwrap();
if let Some(info) = infos_guard.get_mut(&endpoint) {
info.rc += 1;
} else {
infos_guard.insert(
endpoint.clone(),
AgentInfoFetcher::new(self.clone(), endpoint.clone()),
);
}

AgentInfoGuard {
infos: self.clone(),
endpoint,
}
}
}

pub struct AgentInfoGuard {
infos: AgentInfos,
endpoint: Endpoint,
}

impl AgentInfoGuard {
pub fn get(&self) -> Shared<ManualFuture<AgentInfoStruct>> {
let infos_guard = self.infos.0.lock().unwrap();
let infos = infos_guard.get(&self.endpoint).unwrap();
infos.infos.clone()
}
}

impl Drop for AgentInfoGuard {
fn drop(&mut self) {
let mut infos_guard = self.infos.0.lock().unwrap();
let info = infos_guard.get_mut(&self.endpoint).unwrap();
info.last_update = Instant::now();
info.rc -= 1;
}
}

pub struct AgentInfoFetcher {
/// Once the last_update is too old, we'll stop the fetcher.
last_update: Instant,
/// Will be kept alive forever if rc > 0.
rc: u32,
/// The initial fetch is an unresolved future (to be able to await on it), subsequent fetches
/// are simply directly replacing this with a resolved future.
infos: Shared<ManualFuture<AgentInfoStruct>>,
}

impl AgentInfoFetcher {
fn new(agent_infos: AgentInfos, endpoint: Endpoint) -> AgentInfoFetcher {
let (future, completer) = ManualFuture::new();
tokio::spawn(async move {
let mut state: Option<String> = None;
let mut writer = None;
let mut completer = Some(completer);
let mut fetch_endpoint = endpoint.clone();
let mut parts = fetch_endpoint.url.into_parts();
parts.path_and_query = Some(PathAndQuery::from_static("/info"));
fetch_endpoint.url = hyper::Uri::from_parts(parts).unwrap();
loop {
let fetched = fetch_info_with_state(&fetch_endpoint, state.as_deref()).await;
let mut complete_fut = None;
{
let mut infos_guard = agent_infos.0.lock().unwrap();
let infos = infos_guard.get_mut(&endpoint).unwrap();
if infos.rc == 0 && infos.last_update.elapsed().as_secs() > 60 {
break;
}
match fetched {
Ok(FetchInfoStatus::SameState) => {}
Ok(FetchInfoStatus::NewState(status)) => {
state = Some(status.state_hash);
if writer.is_none() {
writer = match OneWayShmWriter::<NamedShmHandle>::new(info_path(
&endpoint,
)) {
Ok(writer) => Some(writer),
Err(e) => {
error!("Failed acquiring an agent info writer: {e:?}");
None
}
};
}
if let Some(ref writer) = writer {
writer.write(&serde_json::to_vec(&status.info).unwrap())
}
if let Some(completer) = completer {
complete_fut = Some(completer.complete(status.info));
} else {
infos.infos = ManualFuture::new_completed(status.info).shared();
}
completer = None;
}
Err(e) => {
// We'll just return the old values as long as the endpoint is
// unreachable.
warn!(
"The agent info for {} could not be fetched: {}",
fetch_endpoint.url, e
);
}
}
}
if let Some(complete_fut) = complete_fut.take() {
complete_fut.await;
}
sleep(Duration::from_secs(60)).await;
}
agent_infos.0.lock().unwrap().remove(&endpoint);
});

AgentInfoFetcher {
last_update: Instant::now(),
rc: 1,
infos: future.shared(),
}
}
}

fn info_path(endpoint: &Endpoint) -> CString {
let mut hasher = ZwoHasher::default();
endpoint.hash(&mut hasher);
let mut path = format!(
"/ddinf{}-{}",
primary_sidecar_identifier(),
BASE64_URL_SAFE_NO_PAD.encode(hasher.finish().to_ne_bytes()),
);
// datadog agent info, on macos we're restricted to 31 chars
path.truncate(31); // should not be larger than 31 chars, but be sure.
CString::new(path).unwrap()
}

pub struct AgentInfoReader {
reader: OneWayShmReader<NamedShmHandle, CString>,
info: Option<AgentInfoStruct>,
}

impl AgentInfoReader {
pub fn new(endpoint: &Endpoint) -> AgentInfoReader {
let path = info_path(endpoint);
AgentInfoReader {
reader: OneWayShmReader::new(open_named_shm(&path).ok(), path),
info: None,
}
}

pub fn read(&mut self) -> (bool, &Option<AgentInfoStruct>) {
let (updated, data) = self.reader.read();
if updated {
match serde_json::from_slice(data) {
Ok(info) => self.info = Some(info),
Err(e) => error!("Failed deserializing the agent info: {e:?}"),
}
}
(updated, &self.info)
}
}

#[cfg(test)]
mod tests {
use super::*;
use httpmock::prelude::*;

const TEST_INFO: &str = r#"{
"config": {
"default_env": "testenv"
}
}"#;

const TEST_INFO_HASH: &str = "8c732aba385d605b010cd5bd12c03fef402eaefce989f0055aa4c7e92fe30077";

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn test_fetch_info_without_state() {
let server = MockServer::start();
let mock = server
.mock_async(|when, then| {
when.path("/info");
then.status(200)
.header("content-type", "application/json")
.header("datadog-agent-state", TEST_INFO_HASH)
.body(TEST_INFO);
})
.await;
let endpoint = Endpoint::from_url(server.url("/").parse().unwrap());
let agent_infos = AgentInfos::default();

let mut reader = AgentInfoReader::new(&endpoint);
assert_eq!(reader.read(), (false, &None));

let info = agent_infos.query_for(endpoint).get().await;
mock.assert();
assert_eq!(
info.config.unwrap().default_env,
Some("testenv".to_string())
);

let (updated, info) = reader.read();
assert!(updated);
assert_eq!(
info.as_ref().unwrap().config.as_ref().unwrap().default_env,
Some("testenv".to_string())
);
}
}
1 change: 1 addition & 0 deletions sidecar/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use runtime_info::RuntimeInfo;
use session_info::SessionInfo;
use sidecar_interface::{SidecarInterface, SidecarInterfaceRequest, SidecarInterfaceResponse};

pub mod agent_info;
pub mod blocking;
pub mod exception_hash_rate_limiter;
mod instance_id;
Expand Down
4 changes: 4 additions & 0 deletions sidecar/src/service/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use tracing::{debug, error, info, trace};
use crate::log::{MultiEnvFilterGuard, MultiWriterGuard};
use crate::{spawn_map_err, tracer};

use crate::service::agent_info::AgentInfoGuard;
use crate::service::{InstanceId, QueueId, RuntimeInfo};

/// `SessionInfo` holds information about a session.
///
/// It contains a list of runtimes, session configuration, tracer configuration, and log guards.
Expand All @@ -31,6 +33,7 @@ pub(crate) struct SessionInfo {
tracer_config: Arc<Mutex<tracer::Config>>,
dogstatsd: Arc<Mutex<Option<dogstatsd_client::Client>>>,
remote_config_invariants: Arc<Mutex<Option<ConfigInvariants>>>,
pub(crate) agent_infos: Arc<Mutex<Option<AgentInfoGuard>>>,
pub(crate) remote_config_interval: Arc<Mutex<Duration>>,
#[cfg(windows)]
pub(crate) remote_config_notify_function:
Expand All @@ -50,6 +53,7 @@ impl Clone for SessionInfo {
tracer_config: self.tracer_config.clone(),
dogstatsd: self.dogstatsd.clone(),
remote_config_invariants: self.remote_config_invariants.clone(),
agent_infos: self.agent_infos.clone(),
remote_config_interval: self.remote_config_interval.clone(),
#[cfg(windows)]
remote_config_notify_function: self.remote_config_notify_function.clone(),
Expand Down
Loading

0 comments on commit 7039e0f

Please sign in to comment.