Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect agent info from sidecar #701

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions data-pipeline/src/agent_info/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ use tokio::time::sleep;
#[allow(clippy::declare_interior_mutable_const)]
const DATADOG_AGENT_STATE: HeaderName = HeaderName::from_static("datadog-agent-state");

/// Whether the agent reported the same value or not.
#[derive(Debug)]
enum FetchInfoStatus {
pub enum FetchInfoStatus {
/// Unchanged
SameState,
/// Has a new state
NewState(Box<AgentInfo>),
}

Expand All @@ -28,7 +31,7 @@ enum FetchInfoStatus {
/// If the state hash is different from the current one:
/// - Return a `FetchInfoStatus::NewState` of the info struct
/// - Else return `FetchInfoStatus::SameState`
async fn fetch_info_with_state(
pub async fn fetch_info_with_state(
info_endpoint: &Endpoint,
current_state_hash: Option<&str>,
) -> Result<FetchInfoStatus> {
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/agent_info/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ mod fetcher;
/// Stores an AgentInfo in an ArcSwap to be updated by an AgentInfoFetcher
pub type AgentInfoArc = Arc<ArcSwapOption<schema::AgentInfo>>;

pub use fetcher::{fetch_info, AgentInfoFetcher};
pub use fetcher::{fetch_info, fetch_info_with_state, AgentInfoFetcher, FetchInfoStatus};
6 changes: 3 additions & 3 deletions data-pipeline/src/agent_info/schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
//! This module provides struct representing the info endpoint response
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Wrapper for an agent info response storing the state hash from the agent
Expand All @@ -15,7 +15,7 @@ pub struct AgentInfo {

/// Schema of an agent info response
#[allow(missing_docs)]
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
pub struct AgentInfoStruct {
/// Version of the agent
pub version: Option<String>,
Expand All @@ -38,7 +38,7 @@ pub struct AgentInfoStruct {
}

#[allow(missing_docs)]
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
pub struct Config {
pub default_env: Option<String>,
pub target_tps: Option<f64>,
Expand Down
33 changes: 33 additions & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datadog_sidecar::config;
use datadog_sidecar::config::LogMethod;
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener};
use datadog_sidecar::service::agent_info::AgentInfoReader;
use datadog_sidecar::service::{
blocking::{self, SidecarTransport},
InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction,
Expand Down Expand Up @@ -911,3 +912,35 @@ pub unsafe extern "C" fn ddog_sidecar_get_crashtracker_unix_socket_path() -> ffi
buf.copy_from_slice(str.as_bytes());
ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size)
}

/// Gets an agent info reader.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_get_agent_info_reader(endpoint: &Endpoint) -> Box<AgentInfoReader> {
Box::new(AgentInfoReader::new(endpoint))
}

/// Gets the current agent info environment (or empty if not existing)
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_get_agent_info_env<'a>(
reader: &'a mut AgentInfoReader,
changed: &mut bool,
) -> ffi::CharSlice<'a> {
let (has_changed, info) = reader.read();
*changed = has_changed;
let config = if let Some(info) = info {
info.config.as_ref()
} else {
None
};
config
.and_then(|c| c.default_env.as_ref())
.map(|s| ffi::CharSlice::from(s.as_str()))
.unwrap_or(ffi::CharSlice::empty())
}

/// Drops the agent info reader.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_drop_agent_info_reader(_: Box<AgentInfoReader>) {}
1 change: 1 addition & 0 deletions sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ddcommon = { path = "../ddcommon" }
datadog-sidecar-macros = { path = "macros" }

ddtelemetry = { path = "../ddtelemetry", features = ["tracing"] }
data-pipeline = { path = "../data-pipeline" }
datadog-trace-protobuf = { path = "../trace-protobuf" }
datadog-trace-utils = { path = "../trace-utils" }
datadog-trace-normalization = { path = "../trace-normalization" }
Expand Down
243 changes: 243 additions & 0 deletions sidecar/src/service/agent_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! This file contains code for fetching and sharing the info from the Datadog Agent.
//! It will keep one fetcher per Endpoint. The SidecarServer is expected to keep the AgentInfoGuard
//! alive for the lifetime of the session.
//! The fetcher will remain alive for a short while after all guards have been dropped.
//! It writes the raw agent response to shared memory at a fixed per-endpoint location, to be
//! consumed be tracers.

use crate::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter};
use crate::primary_sidecar_identifier;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
use data_pipeline::agent_info::schema::AgentInfoStruct;
use data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus};
use datadog_ipc::platform::NamedShmHandle;
use ddcommon::Endpoint;
use futures::future::Shared;
use futures::FutureExt;
use http::uri::PathAndQuery;
use manual_future::ManualFuture;
use std::ffi::CString;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::time::sleep;
use tracing::{error, warn};
use zwohash::{HashMap, ZwoHasher};

#[derive(Default, Clone)]
pub struct AgentInfos(Arc<Mutex<HashMap<Endpoint, AgentInfoFetcher>>>);

impl AgentInfos {
/// Ensures a fetcher for the endpoints agent info and keeps it alive for at least as long as
/// the returned guard exists.
pub fn query_for(&self, endpoint: Endpoint) -> AgentInfoGuard {
let mut infos_guard = self.0.lock().unwrap();
if let Some(info) = infos_guard.get_mut(&endpoint) {
info.rc += 1;
} else {
infos_guard.insert(
endpoint.clone(),
AgentInfoFetcher::new(self.clone(), endpoint.clone()),
);
}

AgentInfoGuard {
infos: self.clone(),
endpoint,
}
}
}

pub struct AgentInfoGuard {
infos: AgentInfos,
endpoint: Endpoint,
}

impl AgentInfoGuard {
pub fn get(&self) -> Shared<ManualFuture<AgentInfoStruct>> {
let infos_guard = self.infos.0.lock().unwrap();
let infos = infos_guard.get(&self.endpoint).unwrap();
infos.infos.clone()
}
}

impl Drop for AgentInfoGuard {
fn drop(&mut self) {
let mut infos_guard = self.infos.0.lock().unwrap();
let info = infos_guard.get_mut(&self.endpoint).unwrap();
info.last_update = Instant::now();
info.rc -= 1;
}
}

pub struct AgentInfoFetcher {
/// Once the last_update is too old, we'll stop the fetcher.
last_update: Instant,
/// Will be kept alive forever if rc > 0.
rc: u32,
/// The initial fetch is an unresolved future (to be able to await on it), subsequent fetches
/// are simply directly replacing this with a resolved future.
infos: Shared<ManualFuture<AgentInfoStruct>>,
}

impl AgentInfoFetcher {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has no comments or tests 😞 I also find it confusing that this has the same name AgentInfoFetcher as the AgentInfoFetcher in data-pipeline. Yes, I know that there are differences, but couldn't the break check, and the processing be moved behind a trait and there only be one impl of the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't intentionally choosing the same name. Maybe I can call it AgentInfoWriter, but not sure if that helps?

You theoretically could do it, but the actual amount of shared code would be quite low & the boilerplate for the sharing of code quite large in comparison? I feel like fetch_info_with_state is the right level of abstraction for this.

I can though add at least a test for the happy path here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine, let's not try to share more code then.

Please add a happy path test and some comments. I mean at least something like:

//! This file contains code for fetching and sharing the info from the Datadog Agent
//! ... etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will do, just haven't gotten round to it yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bantonsson Done :-)

fn new(agent_infos: AgentInfos, endpoint: Endpoint) -> AgentInfoFetcher {
let (future, completer) = ManualFuture::new();
tokio::spawn(async move {
let mut state: Option<String> = None;
let mut writer = None;
let mut completer = Some(completer);
let mut fetch_endpoint = endpoint.clone();
let mut parts = fetch_endpoint.url.into_parts();
parts.path_and_query = Some(PathAndQuery::from_static("/info"));
fetch_endpoint.url = hyper::Uri::from_parts(parts).unwrap();
loop {
let fetched = fetch_info_with_state(&fetch_endpoint, state.as_deref()).await;
let mut complete_fut = None;
{
let mut infos_guard = agent_infos.0.lock().unwrap();
let infos = infos_guard.get_mut(&endpoint).unwrap();
if infos.rc == 0 && infos.last_update.elapsed().as_secs() > 60 {
break;
}
match fetched {
Ok(FetchInfoStatus::SameState) => {}
Ok(FetchInfoStatus::NewState(status)) => {
state = Some(status.state_hash);
if writer.is_none() {
writer = match OneWayShmWriter::<NamedShmHandle>::new(info_path(
&endpoint,
)) {
Ok(writer) => Some(writer),
Err(e) => {
error!("Failed acquiring an agent info writer: {e:?}");
None
}
};
}
if let Some(ref writer) = writer {
writer.write(&serde_json::to_vec(&status.info).unwrap())
}
if let Some(completer) = completer {
complete_fut = Some(completer.complete(status.info));
} else {
infos.infos = ManualFuture::new_completed(status.info).shared();
}
completer = None;
}
Err(e) => {
// We'll just return the old values as long as the endpoint is
// unreachable.
warn!(
"The agent info for {} could not be fetched: {}",
fetch_endpoint.url, e
);
}
}
}
if let Some(complete_fut) = complete_fut.take() {
complete_fut.await;
}
sleep(Duration::from_secs(60)).await;
}
agent_infos.0.lock().unwrap().remove(&endpoint);
});

AgentInfoFetcher {
last_update: Instant::now(),
rc: 1,
infos: future.shared(),
}
}
}

fn info_path(endpoint: &Endpoint) -> CString {
let mut hasher = ZwoHasher::default();
endpoint.hash(&mut hasher);
let mut path = format!(
"/ddinf{}-{}",
primary_sidecar_identifier(),
BASE64_URL_SAFE_NO_PAD.encode(hasher.finish().to_ne_bytes()),
);
// datadog agent info, on macos we're restricted to 31 chars
path.truncate(31); // should not be larger than 31 chars, but be sure.
CString::new(path).unwrap()
}

pub struct AgentInfoReader {
reader: OneWayShmReader<NamedShmHandle, CString>,
info: Option<AgentInfoStruct>,
}

impl AgentInfoReader {
pub fn new(endpoint: &Endpoint) -> AgentInfoReader {
let path = info_path(endpoint);
AgentInfoReader {
reader: OneWayShmReader::new(open_named_shm(&path).ok(), path),
info: None,
}
}

pub fn read(&mut self) -> (bool, &Option<AgentInfoStruct>) {
let (updated, data) = self.reader.read();
if updated {
match serde_json::from_slice(data) {
Ok(info) => self.info = Some(info),
Err(e) => error!("Failed deserializing the agent info: {e:?}"),
}
}
(updated, &self.info)
}
}

#[cfg(test)]
mod tests {
use super::*;
use httpmock::prelude::*;

const TEST_INFO: &str = r#"{
"config": {
"default_env": "testenv"
}
}"#;

const TEST_INFO_HASH: &str = "8c732aba385d605b010cd5bd12c03fef402eaefce989f0055aa4c7e92fe30077";

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn test_fetch_info_without_state() {
let server = MockServer::start();
let mock = server
.mock_async(|when, then| {
when.path("/info");
then.status(200)
.header("content-type", "application/json")
.header("datadog-agent-state", TEST_INFO_HASH)
.body(TEST_INFO);
})
.await;
let endpoint = Endpoint::from_url(server.url("/").parse().unwrap());
let agent_infos = AgentInfos::default();

let mut reader = AgentInfoReader::new(&endpoint);
assert_eq!(reader.read(), (false, &None));

let info = agent_infos.query_for(endpoint).get().await;
mock.assert();
assert_eq!(
info.config.unwrap().default_env,
Some("testenv".to_string())
);

let (updated, info) = reader.read();
assert!(updated);
assert_eq!(
info.as_ref().unwrap().config.as_ref().unwrap().default_env,
Some("testenv".to_string())
);
}
}
1 change: 1 addition & 0 deletions sidecar/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use runtime_info::RuntimeInfo;
use session_info::SessionInfo;
use sidecar_interface::{SidecarInterface, SidecarInterfaceRequest, SidecarInterfaceResponse};

pub mod agent_info;
pub mod blocking;
pub mod exception_hash_rate_limiter;
mod instance_id;
Expand Down
4 changes: 4 additions & 0 deletions sidecar/src/service/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use tracing::{debug, error, info, trace};
use crate::log::{MultiEnvFilterGuard, MultiWriterGuard};
use crate::{spawn_map_err, tracer};

use crate::service::agent_info::AgentInfoGuard;
use crate::service::{InstanceId, QueueId, RuntimeInfo};

/// `SessionInfo` holds information about a session.
///
/// It contains a list of runtimes, session configuration, tracer configuration, and log guards.
Expand All @@ -31,6 +33,7 @@ pub(crate) struct SessionInfo {
tracer_config: Arc<Mutex<tracer::Config>>,
dogstatsd: Arc<Mutex<Option<dogstatsd_client::Client>>>,
remote_config_invariants: Arc<Mutex<Option<ConfigInvariants>>>,
pub(crate) agent_infos: Arc<Mutex<Option<AgentInfoGuard>>>,
pub(crate) remote_config_interval: Arc<Mutex<Duration>>,
#[cfg(windows)]
pub(crate) remote_config_notify_function:
Expand All @@ -50,6 +53,7 @@ impl Clone for SessionInfo {
tracer_config: self.tracer_config.clone(),
dogstatsd: self.dogstatsd.clone(),
remote_config_invariants: self.remote_config_invariants.clone(),
agent_infos: self.agent_infos.clone(),
remote_config_interval: self.remote_config_interval.clone(),
#[cfg(windows)]
remote_config_notify_function: self.remote_config_notify_function.clone(),
Expand Down
Loading
Loading