Skip to content

Commit

Permalink
g3-daemon: add cfg guard for unix sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed May 10, 2024
1 parent 1dc578b commit e4fc5e5
Showing 1 changed file with 79 additions and 68 deletions.
147 changes: 79 additions & 68 deletions lib/g3-daemon/src/control/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@ use std::sync::Mutex;
use anyhow::anyhow;
use futures_util::future::{AbortHandle, Abortable};
use log::{debug, warn};
use tokio::io::BufReader;
use tokio::net::{UnixListener, UnixStream};
use tokio::io::{AsyncRead, AsyncWrite, BufReader};
#[cfg(unix)]
use tokio::net::UnixListener;

use super::{CtlProtoCtx, CtlProtoType, LocalControllerConfig};

static UNIQUE_CONTROLLER_ABORT_HANDLER: Mutex<Option<AbortHandle>> = Mutex::new(None);
static DAEMON_CONTROLLER_ABORT_HANDLER: Mutex<Option<AbortHandle>> = Mutex::new(None);

fn ctl_handle(stream: UnixStream) {
let (reader, writer) = tokio::io::split(stream);
fn ctl_handle<R, W>(r: R, w: W)
where
R: AsyncRead + Send + Unpin + 'static,
W: AsyncWrite + Send + Unpin + 'static,
{
let ctx = CtlProtoCtx::new(
BufReader::new(reader),
writer,
BufReader::new(r),
w,
LocalControllerConfig::get_general(),
CtlProtoType::Text,
);
Expand All @@ -46,11 +50,13 @@ fn ctl_handle(stream: UnixStream) {
});
}

#[cfg(unix)]
pub struct LocalController {
listen_path: PathBuf,
listener: UnixListener,
}

#[cfg(unix)]
impl LocalController {
fn new(listen_path: PathBuf) -> io::Result<Self> {
let listener = UnixListener::bind(&listen_path)?;
Expand All @@ -64,48 +70,79 @@ impl LocalController {
self.listen_path.clone()
}

fn start(self, mutex: &Mutex<Option<AbortHandle>>) -> anyhow::Result<impl Future> {
let mut abort_handler_container = mutex.lock().unwrap();
if abort_handler_container.is_some() {
return Err(anyhow!("controller already existed"));
}
pub fn create_unique(daemon_group: &str) -> anyhow::Result<Self> {
let socket_name = format!("{daemon_group}_{}.sock", std::process::id());
let mut listen_path = crate::opts::control_dir();
listen_path.push(Path::new(&socket_name));
check_then_finalize_path(&listen_path)?;

debug!("setting up unique controller {}", listen_path.display());
let controller = LocalController::new(listen_path)?;
debug!("unique controller created");
Ok(controller)
}

pub fn create_daemon(daemon_group: &str) -> anyhow::Result<Self> {
let socket_name = if daemon_group.is_empty() {
"_.sock".to_string()
} else {
format!("{daemon_group}.sock")
};
let mut listen_path = crate::opts::control_dir();
listen_path.push(Path::new(&socket_name));
check_then_finalize_path(&listen_path)?;

let controller = async move {
loop {
let result = self.listener.accept().await;
match result {
Ok((stream, addr)) => {
if let Ok(ucred) = stream.peer_cred() {
if let Some(addr) = addr.as_pathname() {
debug!(
"new ctl client from {} uid {} pid {}",
addr.display(),
ucred.uid(),
ucred.gid(),
);
} else {
debug!(
"new ctl client from uid {} pid {}",
ucred.uid(),
ucred.gid()
);
}
debug!("setting up daemon controller {}", listen_path.display());
let controller = LocalController::new(listen_path)?;
debug!("daemon controller created");
Ok(controller)
}

async fn into_running(self) {
loop {
let result = self.listener.accept().await;
match result {
Ok((stream, addr)) => {
if let Ok(ucred) = stream.peer_cred() {
if let Some(addr) = addr.as_pathname() {
debug!(
"new ctl client from {} uid {} pid {}",
addr.display(),
ucred.uid(),
ucred.gid(),
);
} else {
debug!("new ctl local control client");
debug!(
"new ctl client from uid {} pid {}",
ucred.uid(),
ucred.gid()
);
}

ctl_handle(stream);
}
Err(e) => {
warn!("controller {} accept: {e}", self.listen_path.display());
break;
} else {
debug!("new ctl local control client");
}

let (r, w) = stream.into_split();
ctl_handle(r, w);
}
Err(e) => {
warn!("controller {} accept: {e}", self.listen_path.display());
break;
}
}
};
}
}
}

impl LocalController {
fn start(self, mutex: &Mutex<Option<AbortHandle>>) -> anyhow::Result<impl Future> {
let mut abort_handler_container = mutex.lock().unwrap();
if abort_handler_container.is_some() {
return Err(anyhow!("controller already existed"));
}

let (abort_handle, abort_registration) = AbortHandle::new_pair();
let future = Abortable::new(controller, abort_registration);
let future = Abortable::new(self.into_running(), abort_registration);
*abort_handler_container = Some(abort_handle);

Ok(future)
Expand All @@ -118,18 +155,6 @@ impl LocalController {
}
}

pub fn create_unique(daemon_group: &str) -> anyhow::Result<Self> {
let socket_name = format!("{daemon_group}_{}.sock", std::process::id());
let mut listen_path = crate::opts::control_dir();
listen_path.push(Path::new(&socket_name));
check_then_finalize_path(&listen_path)?;

debug!("setting up unique controller {}", listen_path.display());
let controller = LocalController::new(listen_path)?;
debug!("unique controller created");
Ok(controller)
}

pub fn start_as_unique(self) -> anyhow::Result<impl Future> {
let fut = self.start(&UNIQUE_CONTROLLER_ABORT_HANDLER)?;
debug!("unique controller started");
Expand All @@ -144,22 +169,6 @@ impl LocalController {
LocalController::abort(&UNIQUE_CONTROLLER_ABORT_HANDLER);
}

pub fn create_daemon(daemon_group: &str) -> anyhow::Result<Self> {
let socket_name = if daemon_group.is_empty() {
"_.sock".to_string()
} else {
format!("{daemon_group}.sock")
};
let mut listen_path = crate::opts::control_dir();
listen_path.push(Path::new(&socket_name));
check_then_finalize_path(&listen_path)?;

debug!("setting up daemon controller {}", listen_path.display());
let controller = LocalController::new(listen_path)?;
debug!("daemon controller created");
Ok(controller)
}

pub fn start_as_daemon(self) -> anyhow::Result<impl Future> {
let fut = self.start(&DAEMON_CONTROLLER_ABORT_HANDLER)?;
debug!("daemon controller started");
Expand All @@ -175,6 +184,7 @@ impl LocalController {
}
}

#[cfg(unix)]
impl Drop for LocalController {
fn drop(&mut self) {
if self.listen_path.exists() {
Expand All @@ -184,6 +194,7 @@ impl Drop for LocalController {
}
}

#[cfg(unix)]
fn check_then_finalize_path(path: &Path) -> anyhow::Result<()> {
if path.exists() {
return Err(anyhow!(
Expand Down

0 comments on commit e4fc5e5

Please sign in to comment.