Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support named sockets #9

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ indexmap = { version = "1", default-features = false }
tokio = { version = "1", features = ["rt", "net", "time"] }
tracing = { version = "0.1.26" }

[dev-dependencies]
tempfile = "3.9.0"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
96 changes: 81 additions & 15 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::fs::File;
use std::future::Future;
use std::io;
use std::io::{self, Write};
use std::pin::Pin;
use std::time::Duration;

Expand Down Expand Up @@ -28,12 +29,13 @@ use std::net::{SocketAddr, ToSocketAddrs};

type ExporterFuture = Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'static>>;

#[derive(Clone)]
enum ExporterConfig {
PushGateway {
endpoint: SocketAddr,
interval: Duration,
},
/// Write data to a path, probably to a [unix domain socket](https://docs.datadoghq.com/developers/dogstatsd/unix_socket/?tab=kubernetes)
FileGateway { file: File, interval: Duration },

#[allow(dead_code)]
Unconfigured,
Expand All @@ -43,6 +45,7 @@ impl ExporterConfig {
fn as_type_str(&self) -> &'static str {
match self {
Self::PushGateway { .. } => "push-gateway",
Self::FileGateway { .. } => "file-gatewy",
Self::Unconfigured => "unconfigured,",
}
}
Expand Down Expand Up @@ -117,6 +120,27 @@ impl StatsdBuilder {
Ok(self)
}

/// Configures the exporter to push periodic requests to a file (presumably a named socket)
///
/// ## Errors
///
/// If the given endpoint cannot be parsed into a valid SocketAddr, an error variant will be
/// returned describing the error.
pub fn with_named_socket(
mut self,
path: impl AsRef<std::path::Path>,
interval: Duration,
) -> Result<Self, BuildError> {
let path = path.as_ref().to_path_buf();
let file = File::create(&path).map_err(|e| {
BuildError::InvalidPushGatewayEndpoint(format!("Unable to write to {path:?}: {e}"))
})?;

self.exporter_config = ExporterConfig::FileGateway { file, interval };

Ok(self)
}

/// Sets the quantiles to use when rendering histograms.
///
/// Quantiles represent a scale of 0 to 1, where percentiles represent a scale of 1 to 100, so
Expand Down Expand Up @@ -278,14 +302,33 @@ impl StatsdBuilder {

/// Builds the recorder and exporter and installs them globally.
///
/// When called from within a Tokio runtime, the exporter future is spawned directly
/// into the runtime. Otherwise, a new single-threaded Tokio runtime is created
/// on a background thread, and the exporter is spawned there.
/// When called from within a Tokio runtime, the exporter future is spawned
/// directly into the runtime. Otherwise, a new single-threaded Tokio
/// runtime is created on a background thread, and the exporter is spawned
/// there.
///
/// # Example
/// Here we write to a named socket with path `socket` (except in the test
/// it is a temporary file):
/// ```
/// # use metrics_exporter_dogstatsd::StatsdBuilder;
/// # use std::io::Read;
/// # let socket = tempfile::NamedTempFile::new().unwrap();
/// StatsdBuilder::new()
/// .with_named_socket(&socket, std::time::Duration::from_secs(1)).unwrap()
/// .install()
/// .unwrap();
/// // metrics are now being exported to the given socket
/// metrics::increment_counter!("tests.counter");
/// std::thread::sleep(std::time::Duration::from_secs(2));
/// assert_eq!(std::fs::read_to_string(&socket).unwrap(), "tests.counter:1|c\n\n");
/// ```
///
/// ## Errors
///
/// If there is an error while either building the recorder and exporter, or installing the
/// recorder and exporter, an error variant will be returned describing the error.
/// If there is an error while either building the recorder and exporter, or
/// installing the recorder and exporter, an error variant will be returned
/// describing the error.
pub fn install(self) -> Result<(), BuildError> {
let recorder = if let Ok(handle) = runtime::Handle::try_current() {
let (recorder, exporter) = {
Expand Down Expand Up @@ -342,11 +385,10 @@ impl StatsdBuilder {
/// returned describing the error.
pub fn build(self) -> Result<(StatsdRecorder, ExporterFuture), BuildError> {
let max_packet_size = self.max_packet_size;
let exporter_config = self.exporter_config.clone();
let recorder = self.build_recorder();
let handle = recorder.handle();

match exporter_config {
match self.exporter_config {
ExporterConfig::Unconfigured => Err(BuildError::MissingExporterConfiguration),
ExporterConfig::PushGateway { endpoint, interval } => {
let exporter = async move {
Expand All @@ -364,27 +406,51 @@ impl StatsdBuilder {
}
};

Ok((recorder, Box::pin(exporter)))
}
ExporterConfig::FileGateway { mut file, interval } => {
// Some of the code paths are a little ridiculous: there is no
// benefit to using tokio for writing to a file, so we simply
// spawn an OS thread to do the writing. In general, if we are
// using `FileGateway` our life would be easier if we didn't use
// tokio at all. (And the code would still equally well support
// use within an async environment.)
let exporter = async move {
std::thread::spawn(move || {
loop {
// Sleep for `interval` amount of time, and then do a push.
std::thread::sleep(interval);
let output = handle.render();
match file.write_all(output.as_bytes()) {
Ok(_) => (),
Err(e) => error!("error sending request to push gateway: {:?}", e),
}
}
});
Ok(())
};

Ok((recorder, Box::pin(exporter)))
}
}
}

/// Builds the recorder and returns it.
pub fn build_recorder(self) -> StatsdRecorder {
pub fn build_recorder(&self) -> StatsdRecorder {
self.build_with_clock(Clock::new())
}

pub(crate) fn build_with_clock(self, clock: Clock) -> StatsdRecorder {
pub(crate) fn build_with_clock(&self, clock: Clock) -> StatsdRecorder {
let inner = Inner {
prefix: self.prefix.clone(),
registry: Registry::new(GenerationalStorage::new(AtomicStorage)),
recency: Recency::new(clock, self.recency_mask, self.idle_timeout),
distribution_builder: DistributionBuilder::new(
self.quantiles,
self.buckets,
self.bucket_overrides,
self.quantiles.clone(),
self.buckets.clone(),
self.bucket_overrides.clone(),
),
global_tags: self.global_tags.unwrap_or_default(),
global_tags: self.global_tags.clone().unwrap_or_default(),
};

StatsdRecorder::from(inner)
Expand Down