Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unix socket server support + rename grpc -> server #78

Merged
merged 1 commit into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions nexosim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ keywords = [

[features]
# gRPC service.
grpc = [
server = [
"dep:bytes",
"dep:ciborium",
"dep:prost",
"dep:prost-types",
"dep:serde",
"dep:tonic",
"dep:tokio",
"dep:tokio-stream",
"dep:tonic",
"tai-time/serde",
]
Expand Down Expand Up @@ -84,6 +85,9 @@ tracing = { version = "0.1.40", default-features = false, features = [
], optional = true }
tracing-subscriber = { version = "0.3.18", optional = true }

[target.'cfg(unix)'.dependencies]
tokio-stream = { version = "0.1.10", features = ["net"], optional = true }

[dev-dependencies]
futures-util = "0.3"
futures-executor = "0.3"
Expand All @@ -93,15 +97,15 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
loom = "0.7"
waker-fn = "1.1"

[target.'cfg(nexosim_grpc_codegen)'.build-dependencies]
[target.'cfg(nexosim_server_codegen)'.build-dependencies]
tonic-build = { version = "0.12" }

[lints.rust]
# `nexosim_loom` flag: run loom-based tests.
# `nexosim_grpc_codegen` flag: regenerate gRPC code from .proto definitions.
# `nexosim_server_codegen` flag: regenerate gRPC code from .proto definitions.
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(nexosim_loom)',
'cfg(nexosim_grpc_codegen)',
'cfg(nexosim_server_codegen)',
] }

[package.metadata.docs.rs]
Expand Down
6 changes: 3 additions & 3 deletions nexosim/build.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(nexosim_grpc_codegen)]
#[cfg(nexosim_server_codegen)]
tonic_build::configure()
.build_client(false)
.out_dir("src/grpc/codegen/")
.compile_protos(&["simulation.proto"], &["src/grpc/api/"])?;
.out_dir("src/server/codegen/")
.compile_protos(&["simulation.proto"], &["src/server/api/"])?;

Ok(())
}
12 changes: 6 additions & 6 deletions nexosim/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,14 @@
//!
//! See the [`tracing`] module for more information.
//!
//! ## gRPC server
//! ## Server
//!
//! The `grpc` feature provides a gRPC server for remote control and monitoring,
//! The `server` feature provides a gRPC server for remote control and monitoring,
//! e.g. from a Python client. It can be activated with:
//!
//! ```toml
//! [dependencies]
//! nexosim = { version = "0.3.0-beta.0", features = ["grpc"] }
//! nexosim = { version = "0.3.0-beta.0", features = ["server"] }
//! ```
//!
//! # Other resources
Expand Down Expand Up @@ -449,10 +449,10 @@ pub mod simulation;
pub mod time;
pub(crate) mod util;

#[cfg(feature = "grpc")]
pub mod grpc;
#[cfg(feature = "grpc")]
#[cfg(feature = "server")]
pub mod registry;
#[cfg(feature = "server")]
pub mod server;

#[cfg(feature = "tracing")]
pub mod tracing;
Expand Down
3 changes: 3 additions & 0 deletions nexosim/src/grpc.rs → nexosim/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ mod run;
mod services;

pub use run::run;

#[cfg(unix)]
pub use run::run_local;
File renamed without changes.
File renamed without changes.
File renamed without changes.
87 changes: 82 additions & 5 deletions nexosim/src/grpc/run.rs → nexosim/src/server/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! gRPC simulation service.
//! Simulation server.

use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
Expand All @@ -16,7 +17,7 @@ use super::key_registry::KeyRegistry;
use super::services::InitService;
use super::services::{ControllerService, MonitorService, SchedulerService};

/// Runs a gRPC simulation server.
/// Runs a simulation from a network server.
///
/// The first argument is a closure that takes an initialization configuration
/// and is called every time the simulation is (re)started by the remote client.
Expand All @@ -30,16 +31,16 @@ where
run_service(GrpcSimulationService::new(sim_gen), addr)
}

/// Monomorphization of the networking code.
/// Monomorphization of the network server.
///
/// Keeping this as a separate monomorphized fragment can even triple
/// compilation speed for incremental release builds.
fn run_service(
service: GrpcSimulationService,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
// Use 2 threads so that the even if the controller service is blocked due
// to ongoing simulation execution, other services can still be used
// Use 2 threads so that even if the controller service is blocked due to
// ongoing simulation execution, other services can still be used
// concurrently.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
Expand All @@ -56,6 +57,82 @@ fn run_service(
})
}

/// Runs a simulation locally from a Unix Domain Sockets server.
///
/// The first argument is a closure that takes an initialization configuration
/// and is called every time the simulation is (re)started by the remote client.
/// It must create a new simulation, complemented by a registry that exposes the
/// public event and query interface.
#[cfg(unix)]
pub fn run_local<F, I, P>(sim_gen: F, path: P) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(I) -> Result<(Simulation, EndpointRegistry), SimulationError> + Send + 'static,
I: DeserializeOwned,
P: AsRef<Path>,
{
let path = path.as_ref();
run_local_service(GrpcSimulationService::new(sim_gen), path)
}

/// Monomorphization of the Unix Domain Sockets server.
///
/// Keeping this as a separate monomorphized fragment can even triple
/// compilation speed for incremental release builds.
#[cfg(unix)]
fn run_local_service(
service: GrpcSimulationService,
path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
use std::fs;
use std::io;
use std::os::unix::fs::FileTypeExt;

use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;

// Unlink the socket if it already exists to prevent an `AddrInUse` error.
match fs::metadata(path) {
// The path is valid: make sure it actually points to a socket.
Ok(socket_meta) => {
if !socket_meta.file_type().is_socket() {
return Err(Box::new(io::Error::new(
io::ErrorKind::AlreadyExists,
"the specified path points to an existing non-socket file",
)));
}

fs::remove_file(path)?;
}
// Nothing to do: the socket does not exist yet.
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
// We don't have permission to use the socket.
Err(e) => return Err(Box::new(e)),
}

// (Re-)Create the socket.
fs::create_dir_all(path.parent().unwrap())?;

// Use 2 threads so that even if the controller service is blocked due to
// ongoing simulation execution, other services can still be used
// concurrently.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_io()
.build()?;

rt.block_on(async move {
let uds = UnixListener::bind(path)?;
let uds_stream = UnixListenerStream::new(uds);

Server::builder()
.add_service(simulation_server::SimulationServer::new(service))
.serve_with_incoming(uds_stream)
.await?;

Ok(())
})
}

struct GrpcSimulationService {
init_service: Mutex<InitService>,
controller_service: Mutex<ControllerService>,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::fmt;
use std::sync::Arc;

use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::registry::EventSourceRegistry;
use crate::server::key_registry::{KeyRegistry, KeyRegistryId};
use crate::simulation::Scheduler;

use super::super::codegen::simulation::*;
Expand Down
2 changes: 1 addition & 1 deletion nexosim/src/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ impl Simulation {
}

/// Returns a scheduler handle.
#[cfg(feature = "grpc")]
#[cfg(feature = "server")]
pub(crate) fn scheduler(&self) -> Scheduler {
Scheduler::new(
self.scheduler_queue.clone(),
Expand Down
Loading