From 7de5cf6ba834ef664251694afc2b391244a2043e Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Wed, 15 May 2024 22:05:27 +0800 Subject: [PATCH] g3-ctl: support windows --- lib/g3-ctl/src/opts.rs | 52 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/lib/g3-ctl/src/opts.rs b/lib/g3-ctl/src/opts.rs index 462917578..ef3d898ed 100644 --- a/lib/g3-ctl/src/opts.rs +++ b/lib/g3-ctl/src/opts.rs @@ -20,9 +20,7 @@ use std::path::PathBuf; use anyhow::anyhow; use clap::{value_parser, Arg, ArgMatches, Command, ValueHint}; use clap_complete::Shell; -use tokio::io::AsyncWriteExt; -#[cfg(unix)] -use tokio::net::UnixStream; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; #[cfg(unix)] const DEFAULT_TMP_CONTROL_DIR: &str = "/tmp/g3"; @@ -84,7 +82,10 @@ impl DaemonCtlArgs { } #[cfg(unix)] - pub async fn connect_to_daemon(&self, daemon_name: &'static str) -> anyhow::Result { + pub async fn connect_to_daemon( + &self, + daemon_name: &'static str, + ) -> anyhow::Result { let control_dir = self.control_dir.clone().unwrap_or_else(|| { let mut sys_ctl_dir = PathBuf::from("/run"); sys_ctl_dir.push(daemon_name); @@ -101,12 +102,43 @@ impl DaemonCtlArgs { control_dir.join(format!("{}.sock", self.daemon_group)) }; - let mut stream = UnixStream::connect(&socket_path).await.map_err(|e| { - anyhow!( - "failed to connect to control socket {}: {e:?}", - socket_path.display() + let mut stream = tokio::net::UnixStream::connect(&socket_path) + .await + .map_err(|e| { + anyhow!( + "failed to connect to control socket {}: {e:?}", + socket_path.display() + ) + })?; + self.enter_rpc_mode(&mut stream).await?; + Ok(stream) + } + + #[cfg(windows)] + pub async fn connect_to_daemon( + &self, + daemon_name: &'static str, + ) -> anyhow::Result { + let pipe_name = if self.pid != 0 { + format!( + r"\\.\pipe\{daemon_name}_{}_{}.pipe", + self.daemon_group, self.pid ) - })?; + } else { + format!(r"\\.\pipe\{daemon_name}_{}.pipe", self.daemon_group) + }; + + let mut stream = tokio::net::windows::named_pipe::ClientOptions::new() + .open(&pipe_name) + .map_err(|e| anyhow!("failed to open connection to pipe {}: {e:?}", pipe_name))?; + self.enter_rpc_mode(&mut stream).await?; + Ok(stream) + } + + async fn enter_rpc_mode(&self, stream: &mut S) -> anyhow::Result<()> + where + S: AsyncWrite + Unpin, + { stream .write_all(b"capnp\n") .await @@ -115,7 +147,7 @@ impl DaemonCtlArgs { .flush() .await .map_err(|e| anyhow!("enter capnp mod failed: {e:?}"))?; - Ok(stream) + Ok(()) } }