Skip to content

Commit

Permalink
switch to UnixStream for writing to a unix domain socket
Browse files Browse the repository at this point in the history
  • Loading branch information
droundy committed Jan 22, 2024
1 parent 1f35140 commit 7b03dff
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 47 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ thiserror = { version = "1", default-features = false }
quanta = { version = "0.11.0", default-features = false }
indexmap = { version = "1", default-features = false }

tokio = { version = "1", features = ["rt", "net", "time"] }
tokio = { version = "1", features = ["rt", "net", "time", "io-util"] }
tracing = { version = "0.1.26" }

[dev-dependencies]
tempfile = "3.9.0"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
74 changes: 31 additions & 43 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::fs::File;
use std::future::Future;
use std::io::{self, Write};
use std::io::{self};
use std::pin::Pin;
use std::time::Duration;

Expand All @@ -14,6 +13,7 @@ use metrics_util::{
registry::{GenerationalStorage, Recency, Registry},
MetricKindMask, Quantile,
};
use tokio::net::UnixStream;

use crate::common::BuildError;
use crate::common::Matcher;
Expand All @@ -35,7 +35,10 @@ enum ExporterConfig {
interval: Duration,
},
/// Write data to a path, probably to a [unix domain socket](https://docs.datadoghq.com/developers/dogstatsd/unix_socket/?tab=kubernetes)
FileGateway { file: File, interval: Duration },
SocketGateway {
socket: UnixStream,
interval: Duration,
},

#[allow(dead_code)]
Unconfigured,
Expand All @@ -45,7 +48,7 @@ impl ExporterConfig {
fn as_type_str(&self) -> &'static str {
match self {
Self::PushGateway { .. } => "push-gateway",
Self::FileGateway { .. } => "file-gatewy",
Self::SocketGateway { .. } => "socket-gateway",
Self::Unconfigured => "unconfigured,",
}
}
Expand Down Expand Up @@ -126,17 +129,24 @@ impl StatsdBuilder {
///
/// If the given endpoint cannot be parsed into a valid SocketAddr, an error variant will be
/// returned describing the error.
pub fn with_named_socket(
pub async fn with_named_socket(
mut self,
path: impl AsRef<std::path::Path>,
interval: Duration,
) -> Result<Self, BuildError> {
let path = path.as_ref().to_path_buf();
let file = File::create(&path).map_err(|e| {
BuildError::InvalidPushGatewayEndpoint(format!("Unable to write to {path:?}: {e}"))
let socket = UnixStream::connect(&path).await.map_err(|e| {
BuildError::InvalidPushGatewayEndpoint(format!(
"Unable to open socket at {path:?}: {e}"
))
})?;
socket.writable().await.map_err(|e| {
BuildError::InvalidPushGatewayEndpoint(format!(
"Unable to write to socket at {path:?}: {e}"
))
})?;

self.exporter_config = ExporterConfig::FileGateway { file, interval };
self.exporter_config = ExporterConfig::SocketGateway { socket, interval };

Ok(self)
}
Expand Down Expand Up @@ -307,23 +317,6 @@ impl StatsdBuilder {
/// runtime is created on a background thread, and the exporter is spawned
/// there.
///
/// # Example
/// Here we write to a named socket with path `socket` (except in the test
/// it is a temporary file):
/// ```
/// # use metrics_exporter_dogstatsd::StatsdBuilder;
/// # use std::io::Read;
/// # let socket = tempfile::NamedTempFile::new().unwrap();
/// StatsdBuilder::new()
/// .with_named_socket(&socket, std::time::Duration::from_secs(1)).unwrap()
/// .install()
/// .unwrap();
/// // metrics are now being exported to the given socket
/// metrics::increment_counter!("tests.counter");
/// std::thread::sleep(std::time::Duration::from_secs(2));
/// assert_eq!(std::fs::read_to_string(&socket).unwrap(), "tests.counter:1|c\n\n");
/// ```
///
/// ## Errors
///
/// If there is an error while either building the recorder and exporter, or
Expand Down Expand Up @@ -408,26 +401,21 @@ impl StatsdBuilder {

Ok((recorder, Box::pin(exporter)))
}
ExporterConfig::FileGateway { mut file, interval } => {
// Some of the code paths are a little ridiculous: there is no
// benefit to using tokio for writing to a file, so we simply
// spawn an OS thread to do the writing. In general, if we are
// using `FileGateway` our life would be easier if we didn't use
// tokio at all. (And the code would still equally well support
// use within an async environment.)
ExporterConfig::SocketGateway {
mut socket,
interval,
} => {
let exporter = async move {
std::thread::spawn(move || {
loop {
// Sleep for `interval` amount of time, and then do a push.
std::thread::sleep(interval);
let output = handle.render();
match file.write_all(output.as_bytes()) {
Ok(_) => (),
Err(e) => error!("error sending request to push gateway: {:?}", e),
}
loop {
// Sleep for `interval` amount of time, and then do a push.
std::thread::sleep(interval);
let output = handle.render();
use tokio::io::AsyncWriteExt;
match socket.write_all(output.as_bytes()).await {
Ok(_) => (),
Err(e) => error!("error sending request to push gateway: {:?}", e),
}
});
Ok(())
}
};

Ok((recorder, Box::pin(exporter)))
Expand Down

0 comments on commit 7b03dff

Please sign in to comment.