diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4d8c19c..5ec44dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,9 +14,12 @@ jobs: build: needs: build_dev_container uses: ./.github/workflows/ci_build.yml - test: + unit_test: needs: build_dev_container - uses: ./.github/workflows/ci_test.yml + uses: ./.github/workflows/ci_unit_test.yml + integration_test: + needs: build_dev_container + uses: ./.github/workflows/ci_integration_test.yml lint: needs: build_dev_container uses: ./.github/workflows/ci_lint.yml diff --git a/.github/workflows/ci_integration_test.yml b/.github/workflows/ci_integration_test.yml new file mode 100644 index 0000000..970c048 --- /dev/null +++ b/.github/workflows/ci_integration_test.yml @@ -0,0 +1,21 @@ +name: CI Integration Test +'on': + workflow_call: null +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Checkout (GitHub) + uses: actions/checkout@v4 + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Cargo integration tests + uses: devcontainers/ci@v0.3 + with: + cacheFrom: ghcr.io/vorausrobotik/voraus-ros-bridge-dev + runCmd: cargo test --test '*' --verbose -- --test-threads=1 + push: never diff --git a/.github/workflows/ci_test.yml b/.github/workflows/ci_unit_test.yml similarity index 84% rename from .github/workflows/ci_test.yml rename to .github/workflows/ci_unit_test.yml index 384221e..d989814 100644 --- a/.github/workflows/ci_test.yml +++ b/.github/workflows/ci_unit_test.yml @@ -1,4 +1,4 @@ -name: CI Test +name: CI Unit Test 'on': workflow_call: null jobs: @@ -13,9 +13,9 @@ jobs: registry: ghcr.io username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Cargo test + - name: Cargo unit tests uses: devcontainers/ci@v0.3 with: cacheFrom: ghcr.io/vorausrobotik/voraus-ros-bridge-dev - runCmd: cargo test --verbose + runCmd: cargo test --bins --verbose push: never diff --git a/Cargo.lock b/Cargo.lock index bbc3ee5..8a4d635 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,6 +1026,23 @@ dependencies = [ "rosidl_runtime_rs", ] +[[package]] +name = "std_srvs" +version = "4.2.4" +dependencies = [ + "rosidl_runtime_rs", +] + +[[package]] +name = "subprocess" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2e86926081dda636c546d8c5e641661049d7562a68f5488be4a1f7f66f6086" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "syn" version = "1.0.109" @@ -1188,6 +1205,8 @@ dependencies = [ "sensor_msgs", "socket2", "std_msgs", + "std_srvs", + "subprocess", "tokio", "voraus_interfaces", ] @@ -1271,6 +1290,22 @@ dependencies = [ "rustix", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + [[package]] name = "winapi-util" version = "0.1.8" @@ -1280,6 +1315,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.52.0" @@ -1492,10 +1533,6 @@ version = "4.2.4" name = "statistics_msgs" version = "1.2.1" -[[patch.unused]] -name = "std_srvs" -version = "4.2.4" - [[patch.unused]] name = "stereo_msgs" version = "4.2.4" diff --git a/Cargo.toml b/Cargo.toml index 8cf1e84..386ec41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ opcua = { version = "0.12.0", features = ["client", "console-logging"] } # Explicitly pinning openssl >= 0.10.66 is required, as long as opcua is not 0.13.0, yet. openssl = { version = "0.10.66" } std_msgs = { version = "4.2.4" } +std_srvs = { version = "4.2.4" } sensor_msgs = { version = "4.2.4" } builtin_interfaces = { version = "1.2.1" } voraus_interfaces = { version = "0.1.0" } @@ -25,6 +26,8 @@ rclrs = { version = "0.4.1" } rosidl_runtime_rs = { version = "0.4.1" } log = "0.4.22" env_logger = "0.11.5" +tokio = "1.38.0" +subprocess = "0.2.9" [dev-dependencies] rclrs = { version = "0.4.1" } diff --git a/examples/opc_ua/rapid-clock.rs b/examples/opc_ua/rapid-clock.rs index d54c0c8..91b624c 100644 --- a/examples/opc_ua/rapid-clock.rs +++ b/examples/opc_ua/rapid-clock.rs @@ -11,11 +11,7 @@ fn rapid_clock() { let mut server = Server::new(ServerConfig::load(&PathBuf::from("tests/resources/clock.conf")).unwrap()); - let namespace = { - let address_space = server.address_space(); - let mut address_space = address_space.write(); - address_space.register_namespace("urn:rapid-clock").unwrap() - }; + let namespace = 1u16; add_timed_variable(&mut server, namespace); diff --git a/src/main.rs b/src/main.rs index 5ea305b..d1d50e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,18 @@ -mod simple_opc_ua_subscriber; +mod opc_ua_client; use env_logger::{Builder, Env}; -use log::debug; +use log::{debug, info}; +use opc_ua_client::OPCUAClient; use opcua::types::Variant; use rclrs::{create_node, Context, RclrsError}; -use ros_service_server::handle_service; -use simple_opc_ua_subscriber::SimpleSubscriber; -use std::{env, sync::Arc}; +use ros_services::ROSServices; +use std::{ + env, + sync::{Arc, Mutex}, +}; mod ros_publisher; -mod ros_service_server; +mod ros_services; use ros_publisher::{create_joint_state_msg, RosPublisher}; @@ -22,14 +25,18 @@ fn main() -> Result<(), RclrsError> { let node_copy = Arc::clone(&node); let joint_state_publisher = Arc::new(RosPublisher::new(&node, "joint_states").unwrap()); - let _server = node_copy - .create_service::("add_two_ints", handle_service)?; - - let mut simple_subscriber = SimpleSubscriber::new("opc.tcp://127.0.0.1:4855"); - let Ok(_connection_result) = simple_subscriber.connect() else { + let opc_ua_client = Arc::new(Mutex::new(OPCUAClient::new("opc.tcp://127.0.0.1:4855"))); + let Ok(_connection_result) = opc_ua_client.lock().unwrap().connect() else { panic!("Connection could not be established, but is required."); }; + let ros_services = Arc::new(ROSServices::new(Arc::clone(&opc_ua_client))); + let _enable_impedance_control = + node_copy.create_service::("enable_impedance_control", { + let rsc = Arc::clone(&ros_services); + move |request_header, request| rsc.enable_impedance_control(request_header, request) + }); + let callback = { let provider = Arc::clone(&joint_state_publisher); move |x: Variant| { @@ -52,10 +59,22 @@ fn main() -> Result<(), RclrsError> { .expect("Error while publishing.") } }; - simple_subscriber + opc_ua_client + .lock() + .unwrap() .create_subscription(1, "100111", callback, 10) .expect("ERROR: Got an error while subscribing to variables"); // Loops forever. The publish thread will call the callback with changes on the variables - simple_subscriber.run(); + info!("Starting OPC UA client"); + let _session = opc_ua_client.lock().unwrap().run_async(); + info!("Spinning ROS"); rclrs::spin(node_copy) } + +#[cfg(test)] +mod tests { + #[test] + fn test_dummy() { + assert_eq!(1, 1); + } +} diff --git a/src/simple_opc_ua_subscriber.rs b/src/opc_ua_client.rs similarity index 70% rename from src/simple_opc_ua_subscriber.rs rename to src/opc_ua_client.rs index 2732f8e..f077b66 100644 --- a/src/simple_opc_ua_subscriber.rs +++ b/src/opc_ua_client.rs @@ -1,18 +1,19 @@ use std::sync::Arc; -use log::{debug, error}; +use log::debug; use opcua::client::prelude::{ - Client, ClientBuilder, DataChangeCallback, IdentityToken, MonitoredItem, MonitoredItemService, - Session, SubscriptionService, + Client, ClientBuilder, DataChangeCallback, IdentityToken, MethodService, MonitoredItem, + MonitoredItemService, Session, SessionCommand, SubscriptionService, }; use opcua::crypto::SecurityPolicy; use opcua::sync::RwLock; use opcua::types::{ - MessageSecurityMode, MonitoredItemCreateRequest, NodeId, StatusCode, TimestampsToReturn, - UserTokenPolicy, Variant, + CallMethodRequest, MessageSecurityMode, MonitoredItemCreateRequest, NodeId, StatusCode, + TimestampsToReturn, UserTokenPolicy, Variant, }; +use tokio::sync::oneshot; -pub struct SimpleSubscriber { +pub struct OPCUAClient { endpoint: String, session: Option>>, } @@ -25,7 +26,7 @@ fn extract_value(item: &MonitoredItem) -> Variant { .expect("No value found - check value bit in EncodingMask.") } -impl SimpleSubscriber { +impl OPCUAClient { pub fn new>(endpoint: S) -> Self { Self { endpoint: endpoint.into(), @@ -35,9 +36,9 @@ impl SimpleSubscriber { pub fn connect(&mut self) -> Result<(), &str> { let mut client: Client = ClientBuilder::new() - .application_name("Simple Subscriber") - .application_uri("urn:SimpleSubscriber") - .product_uri("urn:SimpleSubscriber") + .application_name("voraus ROS brigde") + .application_uri("urn:vorausRosBridge") + .product_uri("urn:vorausRosBridge") .trust_server_certs(true) .create_sample_keypair(true) .session_retry_limit(5) @@ -117,12 +118,27 @@ impl SimpleSubscriber { Ok(()) } - pub fn run(self) { - match self.session { - Some(session) => Session::run(session), - None => { - error!("Could not run inexistent session."); - } - } + pub fn call_method(&self, object_id: NodeId, method_id: NodeId, args: Vec) + where + T: Into, + { + let cloned_session_lock = self.session.clone().unwrap(); + let session = cloned_session_lock.read(); + let _arguments: Option> = Some(args.into_iter().map(Into::into).collect()); + let method = CallMethodRequest { + object_id, + method_id, + input_arguments: None, + }; + let result = session.call(method).unwrap(); + debug!("result of call: {:?}", result); + } + + pub fn run_async(&self) -> oneshot::Sender { + let cloned_session_lock = self + .session + .clone() + .expect("The session should be connected. Please call connect() first."); + Session::run_async(cloned_session_lock) } } diff --git a/src/ros_service_server.rs b/src/ros_service_server.rs deleted file mode 100644 index a7efb57..0000000 --- a/src/ros_service_server.rs +++ /dev/null @@ -1,11 +0,0 @@ -use log::debug; - -pub fn handle_service( - _request_header: &rclrs::rmw_request_id_t, - request: voraus_interfaces::srv::Voraus_Request, -) -> voraus_interfaces::srv::Voraus_Response { - debug!("request: {} + {}", request.a, request.b); - voraus_interfaces::srv::Voraus_Response { - sum: request.a + request.b, - } -} diff --git a/src/ros_services.rs b/src/ros_services.rs new file mode 100644 index 0000000..08c6411 --- /dev/null +++ b/src/ros_services.rs @@ -0,0 +1,34 @@ +use std::sync::{Arc, Mutex}; + +use log::info; +use opcua::types::NodeId; +use std_srvs::srv::{Empty_Request, Empty_Response}; + +use crate::opc_ua_client::OPCUAClient; + +pub struct ROSServices { + opc_ua_client: Arc>, +} + +impl ROSServices { + pub fn new(opc_ua_client: Arc>) -> Self { + Self { opc_ua_client } + } + + pub fn enable_impedance_control( + &self, + _request_header: &rclrs::rmw_request_id_t, + _request: Empty_Request, + ) -> Empty_Response { + info!("ROS service called"); + let object_id = NodeId::new(1, 100182); + let method_id = NodeId::new(1, 100263); + self.opc_ua_client + .lock() + .unwrap() + .call_method(object_id, method_id, vec![()]); + Empty_Response { + structure_needs_at_least_one_member: 0, + } + } +} diff --git a/tests/common.rs b/tests/common/mod.rs similarity index 100% rename from tests/common.rs rename to tests/common/mod.rs diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index f8dc5cb..b6d4d80 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -1,2 +1,3 @@ pub mod opc_ua_publisher_single_linear; +pub mod ros_service_caller; pub mod ros_subscriber; diff --git a/tests/helpers/opc_ua_publisher_single_linear.rs b/tests/helpers/opc_ua_publisher_single_linear.rs index 0d6fa80..030171b 100644 --- a/tests/helpers/opc_ua_publisher_single_linear.rs +++ b/tests/helpers/opc_ua_publisher_single_linear.rs @@ -1,5 +1,14 @@ -use opcua::server::prelude::{Config, DateTime, NodeId, Server, ServerConfig, Variable}; +use log::info; +use opcua::server::callbacks; +use opcua::server::prelude::{ + Config, DateTime, EventNotifier, MethodBuilder, NodeId, ObjectBuilder, Server, ServerConfig, + Variable, +}; +use opcua::server::session::SessionManager; +use opcua::sync::RwLock; +use opcua::types::{CallMethodRequest, CallMethodResult, ObjectId, StatusCode}; use std::path::PathBuf; +use std::sync::Arc; use std::thread; pub async fn run_rapid_clock() -> thread::JoinHandle<()> { @@ -17,6 +26,7 @@ fn rapid_clock() { let questionable_namespace = 1u16; add_timed_variable(&mut server, questionable_namespace); + add_no_op_method(&mut server, questionable_namespace); server.run(); } @@ -60,3 +70,40 @@ fn add_timed_variable(server: &mut Server, namespace: u16) { }); } } + +fn add_no_op_method(server: &mut Server, namespace: u16) { + let address_space = server.address_space(); + let mut address_space = address_space.write(); + + let object_id = NodeId::new(namespace, 100182); + ObjectBuilder::new(&object_id, "Functions", "Functions") + .event_notifier(EventNotifier::SUBSCRIBE_TO_EVENTS) + .organized_by(ObjectId::ObjectsFolder) + .insert(&mut address_space); + + // NoOp has 0 inputs and 0 outputs + let method_id = NodeId::new(namespace, 100263); + MethodBuilder::new(&method_id, "NoOp", "NoOp") + .component_of(object_id.clone()) + .callback(Box::new(NoOp)) + .insert(&mut address_space); +} + +struct NoOp; + +impl callbacks::Method for NoOp { + fn call( + &mut self, + _session_id: &NodeId, + _session_map: Arc>, + _request: &CallMethodRequest, + ) -> Result { + info!("NoOp method called"); + Ok(CallMethodResult { + status_code: StatusCode::Good, + input_argument_results: None, + input_argument_diagnostic_infos: None, + output_arguments: None, + }) + } +} diff --git a/tests/helpers/ros_service_caller.rs b/tests/helpers/ros_service_caller.rs new file mode 100644 index 0000000..d68a2cf --- /dev/null +++ b/tests/helpers/ros_service_caller.rs @@ -0,0 +1,62 @@ +use rclrs::{Node, RclrsError}; +use std::{ + env, + sync::{Arc, Mutex}, + time::Duration, +}; + +pub struct ServiceCaller { + pub client: Arc>, + pub node: Arc, + pub request: std_srvs::srv::Empty_Request, + pub number_of_calls: Arc>, +} + +impl ServiceCaller { + pub fn new(service: &str) -> Result, RclrsError> { + let context = rclrs::Context::new(env::args())?; + + let node = rclrs::create_node(&context, "minimal_client")?; + + let client = node.create_client::(service)?; + + let request = std_srvs::srv::Empty_Request { + structure_needs_at_least_one_member: 0, + }; + let action_caller = Arc::new(ServiceCaller { + client, + node, + request, + number_of_calls: Arc::new(Mutex::new(0)), + }); + + Ok(action_caller) + } + + pub fn start(&self) { + println!("Starting ROS Service Client"); + + while !self.client.service_is_ready().unwrap() { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + println!("ROS Service Client is ready"); + } + pub fn call(&self) { + println!("Calling a ROS Service"); + let num_calls_clone = Arc::clone(&self.number_of_calls); + self.client + .async_send_request_with_callback( + &self.request, + move |_response: std_srvs::srv::Empty_Response| { + println!("Got an answer"); + let mut num_calls = num_calls_clone.lock().unwrap(); + *num_calls += 1; + }, + ) + .unwrap(); + + let timeout = Some(Duration::new(5, 0)); + let node_clone = Arc::clone(&self.node); + rclrs::spin_once(node_clone, timeout).unwrap(); + } +} diff --git a/tests/test_bridge.rs b/tests/test_bridge.rs index 1b8cf1c..c87e5a1 100644 --- a/tests/test_bridge.rs +++ b/tests/test_bridge.rs @@ -1,32 +1,83 @@ use std::{ - process::Command, + env, + path::PathBuf, sync::{atomic::Ordering, Arc}, time::Duration, }; +use common::wait_for_function_to_pass; +use subprocess::{Popen, PopenConfig, Redirection}; + +pub mod common; pub mod helpers; +struct ManagedRosBridge { + process: Popen, +} + +impl ManagedRosBridge { + fn new() -> subprocess::Result { + // Start the ROS Brigde + // We can't use ros2 run here because of https://github.com/ros2/ros2cli/issues/895 + let root_dir = env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR is not set"); + let mut path_to_executable = PathBuf::from(root_dir); + path_to_executable + .push("install/voraus-ros-bridge/lib/voraus-ros-bridge/voraus-ros-bridge"); + + let process = Popen::create( + &[path_to_executable], + PopenConfig { + stdout: Redirection::Pipe, + stderr: Redirection::Pipe, + detached: false, + ..Default::default() + }, + )?; + + Ok(ManagedRosBridge { process }) + } + + fn is_running(&mut self) -> bool { + self.process.poll().is_none() + } + + fn get_std_err(&mut self) -> Option { + let (_out, err) = self + .process + .communicate(None) + .expect("Failed to capture output"); + err + } + + fn terminate(&mut self) { + let _ = self.process.terminate(); + let _ = self.process.wait(); + } +} + +impl Drop for ManagedRosBridge { + fn drop(&mut self) { + if self.is_running() { + self.terminate(); + } + } +} + #[tokio::test] async fn e2e_opc_ua_var_to_ros_topic() { // Start the OPC UA server in the background helpers::opc_ua_publisher_single_linear::run_rapid_clock().await; - println!("Starting ros bridge"); - let mut bridge = Command::new("ros2") - .args(["run"]) - .args(["voraus-ros-bridge"]) - .args(["voraus-ros-bridge"]) - .spawn() - .expect("Failed to run command"); + let mut _bridge_process = ManagedRosBridge::new().expect("Failed to start subprocess"); // Spawn ROS Subscriber against a sample topic - println!("Creating Subscription"); let subscription = Arc::new( helpers::ros_subscriber::Subscriber::new("joint_states_subscriber", "joint_states") .unwrap(), ); - let timeout = Some(Duration::new(5, 0)); - rclrs::spin_once(subscription.node.clone(), timeout).expect("Could not spin"); + let clone = Arc::clone(&subscription.node); + // TODO: Figure out why this takes almost 10 s [RR-836] + rclrs::spin_once(clone, Some(Duration::from_secs(25))).expect("Could not spin"); let mut number_of_messages_received = subscription.num_messages.load(Ordering::SeqCst); let first_value = *subscription @@ -40,7 +91,7 @@ async fn e2e_opc_ua_var_to_ros_topic() { .unwrap(); assert_eq!(number_of_messages_received, 1); - rclrs::spin_once(subscription.node.clone(), timeout).expect("Could not spin"); + rclrs::spin_once(subscription.node.clone(), Some(Duration::new(5, 0))).expect("Could not spin"); number_of_messages_received = subscription.num_messages.load(Ordering::SeqCst); assert_eq!(number_of_messages_received, 2); @@ -61,8 +112,32 @@ async fn e2e_opc_ua_var_to_ros_topic() { second_value, first_value ); - bridge - .kill() - .expect("voraus-ros-bridge process could not be killed."); - println!("done"); +} + +#[tokio::test] +async fn e2e_ros_service_to_opc_ua_call() { + // Start the OPC UA server in the background + helpers::opc_ua_publisher_single_linear::run_rapid_clock().await; + + let mut bridge_process = ManagedRosBridge::new().expect("Failed to start subprocess"); + + let service_caller = Arc::new( + helpers::ros_service_caller::ServiceCaller::new("enable_impedance_control").unwrap(), + ); + + // TODO: Figure out why this takes almost 10 s [RR-836] + service_caller.start(); + assert!(*service_caller.number_of_calls.lock().unwrap() == 0); + service_caller.call(); + + wait_for_function_to_pass( + || *service_caller.number_of_calls.lock().unwrap() == 1, + 5000, + ) + .unwrap(); + bridge_process.terminate(); + assert!(bridge_process + .get_std_err() + .unwrap() + .contains("ROS service called")); }