diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..83fa992 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,53 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + test: + runs-on: ubuntu-latest + container: quay.io/centos/centos:stream9 + env: + CARGO_HOME: ${{ github.workspace}}/.cargo + RUSTUP_HOME: ${{ github.workspace}}/.rustup + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Install NFV SIG repository + run: | + dnf install -y centos-release-nfv-openvswitch + + - name: Install needed packages + run: | + dnf install -y gcc openvswitch3.1 + + - name: Install Rustup + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + source $CARGO_HOME/env + + - name: Add Cargo to PATH + run: echo "$GITHUB_WORKSPACE/.cargo/bin" >> $GITHUB_PATH + + - name: Install Rust toolchain + run: rustup install stable + + - name: Install Clippy + run: rustup component add clippy + + - name: Run Linter + run: cargo fmt --check + + - name: Run Clippy + run: cargo clippy --all-targets --all-features -- -D warnings + + - name: Run Integration Tests + run: cargo test -F test_integration + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..754e66c --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,103 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "anyhow" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" + +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "ovs-unixctl" +version = "0.1.0" +dependencies = [ + "anyhow", + "serde", + "serde_json", +] + +[[package]] +name = "proc-macro2" +version = "1.0.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "serde" +version = "1.0.214" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.214" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.132" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..2887e08 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ovs-unixctl" +version = "0.1.0" +license = "GPL-2.0-only" +description = "Control OVS daemons using the Unixctl interface (a.k.a ovs-appctl)" +readme = "README.md" +edition = "2021" + +[features] +test_integration = [] + +[dependencies] +anyhow = "1.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0", features = ["raw_value"] } diff --git a/README.md b/README.md index 5e1a2eb..045c6a0 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,17 @@ # ovs-unixctl -Control OVS daemons using the Unixctl interface (a.k.a ovs-appctl) +Library to send commands to OVS daemons though their JSON interface. +See **ovs-appctl(8)**. + +## Test + +Run unit tests: + +``` +$ cargo test +``` + +Run integration tests, if openvswitch is installed in the system: + +``` +$ cargo test -F test_integration +``` diff --git a/src/jsonrpc.rs b/src/jsonrpc.rs new file mode 100644 index 0000000..e3d9979 --- /dev/null +++ b/src/jsonrpc.rs @@ -0,0 +1,154 @@ +//! A simple JSON-RPC client compatible with OVS unixctl. + +use std::{ + fmt, path, + sync::atomic::{AtomicUsize, Ordering::Relaxed}, + time, +}; + +use anyhow::{anyhow, bail, Result}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +use crate::unix; + +// JsonStreams are capable of sending and receiving JSON messages. +pub(crate) trait JsonStream: Send + Sync + 'static { + /// Send a message to the target. + fn send(&mut self, msg: M) -> Result<()>; + + /// Receivea message from the target (blocking). + fn recv(&mut self) -> Result + where + R: for<'a> Deserialize<'a>; +} + +// Client streams can connect and disconnect from targets creating +// some JsonStream. +pub(crate) trait JsonStreamClient: fmt::Display { + type Stream: JsonStream; + /// Connect to the target. + fn connect(&mut self) -> Result; +} + +/// A JSON-RPC request. +#[derive(Debug, Serialize)] +pub struct Request<'a, P: Serialize + AsRef = &'a str> { + /// The name of the RPC call. + pub method: &'a str, + /// Parameters to the RPC call. + pub params: &'a [P], + /// Identifier for this request, which should appear in the response. + pub id: usize, +} + +/// A JSONRPC response object. +/// TODO make generic +#[derive(Debug, Clone, Deserialize)] +pub(crate) struct Response { + /// The result of the request. + pub result: Option, + /// An error if it occurred. + pub error: Option, + /// Identifier for this response. It should match that of the associated request. + pub id: Option, +} + +/// JSON-RPC client. +#[derive(Debug)] +pub(crate) struct Client { + stream_client: C, + stream: Option, + last_id: AtomicUsize, +} + +impl Client { + /// Creates a new client with the given transport. + pub(crate) fn new(stream_client: C) -> Client { + Client { + stream_client, + stream: None, + last_id: AtomicUsize::new(1), + } + } + + /// Creates a new client with a Unix socket transport. + pub(crate) fn unix>( + sock_path: P, + timeout: Option, + ) -> Client { + let mut stream_client = unix::UnixJsonStreamClient::new(sock_path); + if let Some(timeout) = timeout { + stream_client = stream_client.timeout(timeout); + } + Client::new(stream_client) + } + + /// Builds a request with the given method and parameters. + /// + /// It internally deals with incrementing the id. + fn build_request<'a, P: Serialize + AsRef>( + &self, + method: &'a str, + params: &'a [P], + ) -> Request<'a, P> { + Request { + method, + params, + id: self.last_id.fetch_add(1, Relaxed), + } + } + + /// Sends a request and returns the response. + pub fn send_request>( + &mut self, + request: Request

, + ) -> Result> { + if self.stream.is_none() { + self.stream = Some(self.stream_client.connect()?); + } + + let stream = self.stream.as_mut().unwrap(); + let req_id = request.id; + + stream.send(request)?; + let res: Response = stream.recv()?; + if res.id.ok_or_else(|| anyhow!("no id present in response"))? != req_id { + bail!("ID missmatch"); + } + + Ok(res) + } + + /// Calls a method with some arguments and returns the result. + pub(crate) fn call_params>( + &mut self, + method: &str, + params: &[P], + ) -> Result> { + let request = self.build_request(method, params); + let response = self.send_request(request)?; + if let Some(error) = response.error { + bail!( + "Failed to run command {} with params [{}]: {}", + method, + params + .iter() + .map(|p| p.as_ref()) + .collect::>() + .join(", "), + error, + ) + } + Ok(response) + } + + /// Calls a method without arguments and resturns the result. + pub(crate) fn call(&mut self, method: &str) -> Result> { + let request = self.build_request::<&str>(method, &[]); + let response = self.send_request(request)?; + if let Some(error) = response.error { + bail!("Failed to run command {}: {}", method, error,) + } + Ok(response) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..5a79f42 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,8 @@ +//! OpenvSwitch application control (appctl) library. + +//FIXME +#[allow(dead_code)] +pub mod jsonrpc; +pub mod ovs; +pub use ovs::*; +pub mod unix; diff --git a/src/ovs.rs b/src/ovs.rs new file mode 100644 index 0000000..58ea0af --- /dev/null +++ b/src/ovs.rs @@ -0,0 +1,299 @@ +//! OVS unixctl interface + +use std::{ + env, + path::{Path, PathBuf}, +}; + +use std::fs; +use std::time; + +use anyhow::{anyhow, bail, Result}; + +use crate::{jsonrpc, unix}; + +/// OVS Unix control interface. +/// +/// It allows the execution of well control commands against ovs-vswitchd. +#[derive(Debug)] +pub struct OvsUnixCtl { + // JSON-RPC client. For now, only Unix is supported. If more are supported in the future, this + // would have to be a generic type. + client: jsonrpc::Client, +} + +impl OvsUnixCtl { + /// Creates a new OvsUnixCtl against ovs-vswitchd. + /// + /// Tries to find the pidfile and socket in the default path or in the one specified in the + /// OVS_RUNDIR env variable. + pub fn new() -> Result { + let sockpath = Self::find_socket("ovs-vswitchd".into())?; + Self::unix(sockpath, Some(time::Duration::from_secs(5))) + } + + /// Creates a new OvsUnixCtl against the provided target, e.g.: ovs-vswitchd, ovsdb-server, + /// northd, etc. + /// + /// Tries to find the pidfile and socket in the default path or in the one specified in the + /// OVS_RUNDIR env variable. + pub fn with_target(target: String) -> Result { + let sockpath = Self::find_socket(target)?; + Self::unix(sockpath, Some(time::Duration::from_secs(5))) + } + + /// Creates a new OvsUnixCtl by specifing a concrete unix socket path. + /// + /// Tries to find the socket in the default paths. + pub fn unix>(path: P, timeout: Option) -> Result { + Ok(Self { + client: jsonrpc::Client::::unix(path, timeout), + }) + } + + fn find_socket_at>(target: &str, rundir: P) -> Result { + // Find $OVS_RUNDIR/{target}.pid + let pidfile_path = rundir.as_ref().join(format!("{}.pid", &target)); + let pid_str = fs::read_to_string(pidfile_path.clone())?; + let pid_str = pid_str.trim(); + + if pid_str.is_empty() { + bail!("pidfile is empty: {:?}", &pidfile_path); + } + + // Find $OVS_RUNDIR/{target}.{pid}.ctl + let sock_path = rundir.as_ref().join(format!("{}.{}.ctl", &target, pid_str)); + if !fs::exists(&sock_path)? { + bail!("failed to find control socket for target {}", &target); + } + Ok(sock_path) + } + + fn find_socket(target: String) -> Result { + let rundir: String = match env::var_os("OVS_RUNDIR") { + Some(rundir) => rundir + .into_string() + .map_err(|_| anyhow!("OVS_RUNDIR non-unicode content"))?, + None => "/var/run/openvswitch".into(), + }; + Self::find_socket_at(target.as_str(), PathBuf::from(rundir)) + } + + /// Runs the common "list-commands" command and returns the list of commands and their + /// arguments. + pub fn list_commands(&mut self) -> Result> { + let response: jsonrpc::Response = self.client.call("list-commands")?; + Ok(response + .result + .ok_or_else(|| anyhow!("expected result"))? + .strip_prefix("The available commands are:\n") + .ok_or_else(|| anyhow!("unexpected response format"))? + .lines() + .map(|l| { + let (cmd, args) = l.trim().split_once(char::is_whitespace).unwrap_or((l, "")); + (cmd.trim().into(), args.trim().into()) + }) + .collect()) + } + + /// Retrieve the version of the running daemon. + pub fn version(&mut self) -> Result<(u32, u32, u32, String)> { + let response: jsonrpc::Response = self.client.call("version")?; + match response + .result + .ok_or_else(|| anyhow!("expected result"))? + .strip_prefix("ovs-vswitchd (Open vSwitch) ") + .ok_or_else(|| anyhow!("unexpected version string"))? + .splitn(4, &['.', '-']) + .collect::>()[..] + { + [x, y, z] => Ok(( + x.to_string().parse()?, + y.to_string().parse()?, + z.to_string().parse()?, + String::default(), + )), + [x, y, z, patch] => Ok(( + x.to_string().parse()?, + y.to_string().parse()?, + z.to_string().parse()?, + String::from(patch), + )), + _ => Err(anyhow!("failed to unpack version string")), + } + } + + /// Run an arbitrary command. + pub fn run(&mut self, cmd: &str, params: &[&str]) -> Result> { + let response: jsonrpc::Response = self.client.call_params(cmd, params)?; + Ok(response.result) + } +} + +#[cfg(test)] +mod tests { + + use anyhow::{anyhow, Result}; + + use std::{ + path::{Path, PathBuf}, + process::{id, Command, Stdio}, + }; + + use super::*; + + fn ovs_setup(test: &str) -> Result { + let tmpdir = format!("/tmp/ovs-unixctl-test-{}-{}", id(), test); + let ovsdb_path = PathBuf::from(&tmpdir).join("conf.db"); + + let schema: PathBuf = match env::var_os("OVS_DATADIR") { + Some(datadir) => (datadir + .into_string() + .map_err(|_| anyhow!("OVS_DATADIR has non-unicode content"))? + + "/vswitch.ovsschema") + .into(), + None => "/usr/share/openvswitch/vswitch.ovsschema".into(), + }; + + fs::create_dir_all(&tmpdir)?; + + Command::new("ovsdb-tool") + .arg("create") + .arg(&ovsdb_path) + .arg(&schema) + .status() + .expect("Failed to create OVS database"); + + let ovsdb_logfile = Path::new(&tmpdir).join("ovsdb-server.log"); + Command::new("ovsdb-server") + .env("OVS_RUNDIR", &tmpdir) + .arg(&ovsdb_path) + .arg("--detach") + .arg("--no-chdir") + .arg("--pidfile") + .arg(format!( + "--remote=punix:{}", + Path::new(&tmpdir).join("db.sock").to_str().unwrap() + )) + .arg(format!("--log-file={}", ovsdb_logfile.to_str().unwrap())) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .expect("Failed to start ovsdb-server"); + + let ovs_logfile = Path::new(&tmpdir).join("ovs-vswitchd.log"); + Command::new("ovs-vswitchd") + .env("OVS_RUNDIR", &tmpdir) + .arg("--detach") + .arg("--no-chdir") + .arg("--pidfile") + .arg(format!("--log-file={}", ovs_logfile.to_str().unwrap())) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .expect("Failed to start ovs-vswitchd"); + std::thread::sleep(time::Duration::from_secs(1)); + Ok(PathBuf::from(tmpdir)) + } + + fn ovs_cleanup(tmpdir: &Path) -> Result<()> { + // Find and kill the processes based on PID files + for daemon in &["ovsdb-server", "ovs-vswitchd"] { + let log_file = tmpdir.join(format!("{}.log", daemon)); + if let Ok(log) = fs::read_to_string(&log_file) { + println!("{}.log: \n{}", daemon, log); + } + let pid_file = tmpdir.join(format!("{}.pid", daemon)); + + if pid_file.exists() { + if let Ok(pid) = fs::read_to_string(&pid_file) { + if let Ok(pid) = pid.trim().parse::() { + Command::new("kill") + .arg("-9") + .arg(pid.to_string()) + .status() + .expect("Failed to kill daemon process"); + } + } + } + } + fs::remove_dir_all(tmpdir)?; + Ok(()) + } + + fn ovs_test(name: &str, test: T) -> Result<()> + where + T: Fn(OvsUnixCtl) -> Result<()>, + { + let tmp = ovs_setup(name)?; + let tmp_copy = tmp.clone(); + + std::panic::set_hook(Box::new(move |info| { + ovs_cleanup(&tmp_copy).unwrap(); + println!("panic: {}", info); + })); + let ovs = OvsUnixCtl::unix( + OvsUnixCtl::find_socket_at("ovs-vswitchd", &tmp).expect("Failed to find socket"), + None, + ); + let ovs = ovs.unwrap(); + + test(ovs)?; + + ovs_cleanup(&tmp).unwrap(); + Ok(()) + } + + #[test] + #[cfg_attr(not(feature = "test_integration"), ignore)] + fn list_commands() -> Result<()> { + ovs_test("list_commands", |mut ovs| { + let cmds = ovs.list_commands().unwrap(); + assert!(cmds.iter().any(|(cmd, _args)| cmd == "list-commands")); + + assert!(cmds.iter().any(|(cmd, args)| (cmd, args) + == (&"dpif-netdev/bond-show".to_string(), &"[dp]".to_string()))); + Ok(()) + }) + } + + #[test] + #[cfg_attr(not(feature = "test_integration"), ignore)] + fn version() -> Result<()> { + ovs_test("version", |mut ovs| { + let (x, y, z, _) = ovs.version().unwrap(); + // We don't know what version is running, let's check at least it's not 0.0.0. + assert!(x + y + z > 0); + Ok(()) + }) + } + + #[test] + #[cfg_attr(not(feature = "test_integration"), ignore)] + fn vlog() -> Result<()> { + ovs_test("vlog", |mut ovs| { + fn get_vlog_level(vlog: String, name: &str) -> String { + let levels: Vec<(&str, &str)> = vlog + .lines() + .skip(2) + .map(|l| { + let parts = l.split_whitespace().collect::>(); + assert_eq!(parts.len(), 4); + (parts[0], parts[3]) + }) + .collect(); + let (_, level) = levels.iter().find(|(module, _)| *module == name).unwrap(); + level.to_string() + } + + let vlog = ovs.run("vlog/list", &[])?.unwrap(); + assert_eq!(get_vlog_level(vlog, "unixctl"), "INFO"); + + ovs.run("vlog/set", &["unixctl:dbg"]).unwrap(); + + let vlog = ovs.run("vlog/list", &[])?.unwrap(); + assert_eq!(get_vlog_level(vlog, "unixctl"), "DBG"); + Ok(()) + }) + } +} diff --git a/src/unix.rs b/src/unix.rs new file mode 100644 index 0000000..6258a6c --- /dev/null +++ b/src/unix.rs @@ -0,0 +1,173 @@ +//! Synchronous jsonrpc transport over Unix sockets. + +use std::{ + fmt, + os::unix::net::UnixStream, + path::{Path, PathBuf}, + time::Duration, +}; + +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use serde_json::Deserializer; + +use crate::jsonrpc::{JsonStream, JsonStreamClient}; + +/// Unix socket transport. +#[derive(Debug)] +pub(crate) struct UnixJsonStream { + sock: UnixStream, +} + +impl JsonStream for UnixJsonStream { + fn send(&mut self, msg: M) -> Result<()> { + serde_json::to_writer(&self.sock, &msg)?; + Ok(()) + } + + fn recv(&mut self) -> Result + where + R: for<'a> Deserialize<'a>, + { + let resp: R = Deserializer::from_reader(&mut self.sock) + .into_iter() + .next() + .ok_or_else(|| anyhow!("Receive timeout"))??; + Ok(resp) + } +} + +#[derive(Debug)] +pub(crate) struct UnixJsonStreamClient { + /// The path to the Unix Domain Socket. + path: PathBuf, + /// The read and write timeout to use. + timeout: Option, +} + +impl UnixJsonStreamClient { + /// Creates a new [`UdsTransport`] without timeouts to use. + pub(crate) fn new>(path: P) -> UnixJsonStreamClient { + UnixJsonStreamClient { + path: path.as_ref().to_path_buf(), + timeout: None, + } + } + + /// Sets the timeout. + pub(crate) fn timeout(mut self, timeout: Duration) -> UnixJsonStreamClient { + self.timeout = Some(timeout); + self + } +} + +impl JsonStreamClient for UnixJsonStreamClient { + type Stream = UnixJsonStream; + + fn connect(&mut self) -> Result { + let sock = UnixStream::connect(&self.path)?; + sock.set_read_timeout(self.timeout)?; + sock.set_write_timeout(self.timeout)?; + Ok(UnixJsonStream { sock }) + } +} + +impl fmt::Display for UnixJsonStreamClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(f, "unix://{}", self.path.to_string_lossy()) + } +} + +#[cfg(test)] +mod tests { + use std::{fs, os::unix::net::UnixListener, path, process, thread}; + + use super::*; + use crate::jsonrpc; + + #[test] + fn ping_pong() { + #[derive(Clone, serde::Deserialize, serde::Serialize)] + struct Result { + val: String, + extra: u32, + } + + let socket_path: path::PathBuf = format!("unix_test-{}.socket", process::id()).into(); + let server = UnixListener::bind(&socket_path).unwrap(); + + // Client thread + let cli_socket_path = socket_path.clone(); + let client_thread = thread::spawn(move || { + let stream_client = + UnixJsonStreamClient::new(cli_socket_path).timeout(Duration::from_secs(2)); + assert_eq!( + format!("{}", stream_client), + format!("unix://unix_test-{}.socket", process::id()) + ); + + let mut client = jsonrpc::Client::new(stream_client); + + for _n in 1..5 { + let response: jsonrpc::Response = client + .call_params("ping", &["hello world".to_string()]) + .unwrap(); + assert!(response.error.is_none()); + assert!(response.result.is_some()); + assert_eq!(response.result.as_ref().unwrap().val, "pong"); + assert_eq!(response.result.as_ref().unwrap().extra, 42); + } + }); + + // Response and Request are optimized for used by the client, not the server. + #[derive(Debug, Clone, Deserialize)] + struct ReceiveRequest { + method: String, + params: Option, + id: usize, + } + + #[derive(Debug, Clone, Serialize)] + struct SendResponse { + result: Option, + error: Option, + id: Option, + } + + // Fake server + let (sock, _) = server.accept().unwrap(); + sock.set_read_timeout(Some(Duration::from_secs(2))).unwrap(); + let mut stream = UnixJsonStream { sock }; + for _n in 1..5 { + let request: ReceiveRequest = stream.recv().unwrap(); + if request.method == "ping" { + let params: Vec = + serde_json::from_value(request.params.expect("params should exist")) + .expect("params should be Vector of Strings"); + assert_eq!(params.first().unwrap(), "hello world"); + + let response = SendResponse { + result: Some(Result { + val: "pong".into(), + extra: 42, + }), + error: None, + id: Some(request.id), + }; + stream.send(response).unwrap(); + } else { + let response = SendResponse::<()> { + result: None, + error: Some("method not found".into()), + id: Some(request.id), + }; + stream.send(response).unwrap(); + } + } + + client_thread.join().unwrap(); + + // Clean up + fs::remove_file(&socket_path).unwrap(); + } +}