diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 855941fdc..cdd69c0f0 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -756,8 +756,7 @@ pub(super) async fn get_signing_keys( notary: Option>, query: bool, ) -> Result { - let server_name = - server_name.unwrap_or_else(|| self.services.server.name.clone().into()); + let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into()); if let Some(notary) = notary { let signing_keys = self @@ -793,8 +792,7 @@ pub(super) async fn get_verify_keys( &self, server_name: Option>, ) -> Result { - let server_name = - server_name.unwrap_or_else(|| self.services.server.name.clone().into()); + let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into()); let keys = self .services diff --git a/src/service/sending/send.rs b/src/service/federation/execute.rs similarity index 65% rename from src/service/sending/send.rs rename to src/service/federation/execute.rs index c8a64f3cb..27d989681 100644 --- a/src/service/sending/send.rs +++ b/src/service/federation/execute.rs @@ -1,4 +1,4 @@ -use std::mem; +use std::{fmt::Debug, mem}; use bytes::Bytes; use conduwuit::{ @@ -20,82 +20,109 @@ use ruma::{ use crate::resolver::actual::ActualDest; -impl super::Service { - #[tracing::instrument( +/// Sends a request to a federation server +#[implement(super::Service)] +#[tracing::instrument(skip_all, name = "request", level = "debug")] +pub async fn execute(&self, dest: &ServerName, request: T) -> Result +where + T: OutgoingRequest + Debug + Send, +{ + let client = &self.services.client.federation; + self.execute_on(client, dest, request).await +} + +/// Like execute() but with a very large timeout +#[implement(super::Service)] +#[tracing::instrument(skip_all, name = "synapse", level = "debug")] +pub async fn execute_synapse( + &self, + dest: &ServerName, + request: T, +) -> Result +where + T: OutgoingRequest + Debug + Send, +{ + let client = &self.services.client.synapse; + self.execute_on(client, dest, request).await +} + +#[implement(super::Service)] +#[tracing::instrument( level = "debug" skip(self, client, request), )] - pub async fn send( - &self, - client: &Client, - dest: &ServerName, - request: T, - ) -> Result - where - T: OutgoingRequest + Send, - { - if !self.server.config.allow_federation { - return Err!(Config("allow_federation", "Federation is disabled.")); - } - - if self - .server - .config - .forbidden_remote_server_names - .contains(dest) - { - return Err!(Request(Forbidden(debug_warn!( - "Federation with {dest} is not allowed." - )))); - } - - let actual = self.services.resolver.get_actual_dest(dest).await?; - let request = into_http_request::(&actual, request)?; - let request = self.prepare(dest, request)?; - self.execute::(dest, &actual, request, client).await +pub async fn execute_on( + &self, + client: &Client, + dest: &ServerName, + request: T, +) -> Result +where + T: OutgoingRequest + Send, +{ + if !self.services.server.config.allow_federation { + return Err!(Config("allow_federation", "Federation is disabled.")); } - async fn execute( - &self, - dest: &ServerName, - actual: &ActualDest, - request: Request, - client: &Client, - ) -> Result - where - T: OutgoingRequest + Send, + if self + .services + .server + .config + .forbidden_remote_server_names + .contains(dest) { - let url = request.url().clone(); - let method = request.method().clone(); - - debug!(?method, ?url, "Sending request"); - match client.execute(request).await { - | Ok(response) => handle_response::(dest, actual, &method, &url, response).await, - | Err(error) => - Err(handle_error(actual, &method, &url, error).expect_err("always returns error")), - } + return Err!(Request(Forbidden(debug_warn!("Federation with {dest} is not allowed.")))); } - fn prepare(&self, dest: &ServerName, mut request: http::Request>) -> Result { - self.sign_request(&mut request, dest); - - let request = Request::try_from(request)?; - self.validate_url(request.url())?; - self.server.check_running()?; + let actual = self.services.resolver.get_actual_dest(dest).await?; + let request = into_http_request::(&actual, request)?; + let request = self.prepare(dest, request)?; + self.perform::(dest, &actual, request, client).await +} - Ok(request) +#[implement(super::Service)] +async fn perform( + &self, + dest: &ServerName, + actual: &ActualDest, + request: Request, + client: &Client, +) -> Result +where + T: OutgoingRequest + Send, +{ + let url = request.url().clone(); + let method = request.method().clone(); + + debug!(?method, ?url, "Sending request"); + match client.execute(request).await { + | Ok(response) => handle_response::(dest, actual, &method, &url, response).await, + | Err(error) => + Err(handle_error(actual, &method, &url, error).expect_err("always returns error")), } +} - fn validate_url(&self, url: &Url) -> Result<()> { - if let Some(url_host) = url.host_str() { - if let Ok(ip) = IPAddress::parse(url_host) { - trace!("Checking request URL IP {ip:?}"); - self.services.resolver.validate_ip(&ip)?; - } - } +#[implement(super::Service)] +fn prepare(&self, dest: &ServerName, mut request: http::Request>) -> Result { + self.sign_request(&mut request, dest); + + let request = Request::try_from(request)?; + self.validate_url(request.url())?; + self.services.server.check_running()?; - Ok(()) + Ok(request) +} + +#[implement(super::Service)] +fn validate_url(&self, url: &Url) -> Result<()> { + if let Some(url_host) = url.host_str() { + if let Ok(ip) = IPAddress::parse(url_host) { + trace!("Checking request URL IP {ip:?}"); + self.services.resolver.validate_ip(&ip)?; + } } + + Ok(()) } async fn handle_response( @@ -195,7 +222,7 @@ fn sign_request(&self, http_request: &mut http::Request>, dest: &ServerN type Value = CanonicalJsonValue; type Object = CanonicalJsonObject; - let origin = self.services.globals.server_name(); + let origin = &self.services.server.name; let body = http_request.body(); let uri = http_request .uri() diff --git a/src/service/federation/mod.rs b/src/service/federation/mod.rs new file mode 100644 index 000000000..dacdb20e0 --- /dev/null +++ b/src/service/federation/mod.rs @@ -0,0 +1,33 @@ +mod execute; + +use std::sync::Arc; + +use conduwuit::{Result, Server}; + +use crate::{client, resolver, server_keys, Dep}; + +pub struct Service { + services: Services, +} + +struct Services { + server: Arc, + client: Dep, + resolver: Dep, + server_keys: Dep, +} + +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { + services: Services { + server: args.server.clone(), + client: args.depend::("client"), + resolver: args.depend::("resolver"), + server_keys: args.depend::("server_keys"), + }, + })) + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} diff --git a/src/service/mod.rs b/src/service/mod.rs index 789994d30..2102921fc 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -10,6 +10,7 @@ pub mod admin; pub mod appservice; pub mod client; pub mod emergency; +pub mod federation; pub mod globals; pub mod key_backups; pub mod media; diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index e52bfb256..80bca1127 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -1,7 +1,6 @@ mod appservice; mod data; mod dest; -mod send; mod sender; use std::{ @@ -30,8 +29,8 @@ pub use self::{ sender::{EDU_LIMIT, PDU_LIMIT}, }; use crate::{ - account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId, - server_keys, users, Dep, + account_data, client, federation, globals, presence, pusher, rooms, + rooms::timeline::RawPduId, users, Dep, }; pub struct Service { @@ -44,7 +43,6 @@ pub struct Service { struct Services { client: Dep, globals: Dep, - resolver: Dep, state: Dep, state_cache: Dep, user: Dep, @@ -55,7 +53,7 @@ struct Services { account_data: Dep, appservice: Dep, pusher: Dep, - server_keys: Dep, + federation: Dep, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -83,7 +81,6 @@ impl crate::Service for Service { services: Services { client: args.depend::("client"), globals: args.depend::("globals"), - resolver: args.depend::("resolver"), state: args.depend::("rooms::state"), state_cache: args.depend::("rooms::state_cache"), user: args.depend::("rooms::user"), @@ -94,7 +91,7 @@ impl crate::Service for Service { account_data: args.depend::("account_data"), appservice: args.depend::("appservice"), pusher: args.depend::("pusher"), - server_keys: args.depend::("server_keys"), + federation: args.depend::("federation"), }, channels: (0..num_senders).map(|_| loole::unbounded()).collect(), })) @@ -277,7 +274,7 @@ impl Service { } /// Sends a request to a federation server - #[tracing::instrument(skip_all, name = "request", level = "debug")] + #[inline] pub async fn send_federation_request( &self, dest: &ServerName, @@ -286,12 +283,11 @@ impl Service { where T: OutgoingRequest + Debug + Send, { - let client = &self.services.client.federation; - self.send(client, dest, request).await + self.services.federation.execute(dest, request).await } /// Like send_federation_request() but with a very large timeout - #[tracing::instrument(skip_all, name = "synapse", level = "debug")] + #[inline] pub async fn send_synapse_request( &self, dest: &ServerName, @@ -300,8 +296,10 @@ impl Service { where T: OutgoingRequest + Debug + Send, { - let client = &self.services.client.synapse; - self.send(client, dest, request).await + self.services + .federation + .execute_synapse(dest, request) + .await } /// Sends a request to an appservice diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 122e75c5a..c91e1d311 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -858,7 +858,7 @@ impl Service { }; let client = &self.services.client.sender; - self.send(client, &server, request) + self.services.federation.execute_on(client, &server, request) .await .inspect(|response| { response diff --git a/src/service/services.rs b/src/service/services.rs index 1aa87f58b..cb5cc12f1 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -10,7 +10,7 @@ use database::Database; use tokio::sync::Mutex; use crate::{ - account_data, admin, appservice, client, emergency, globals, key_backups, + account_data, admin, appservice, client, emergency, federation, globals, key_backups, manager::Manager, media, presence, pusher, resolver, rooms, sending, server_keys, service, service::{Args, Map, Service}, @@ -30,6 +30,7 @@ pub struct Services { pub pusher: Arc, pub resolver: Arc, pub rooms: rooms::Service, + pub federation: Arc, pub sending: Arc, pub server_keys: Arc, pub sync: Arc, @@ -95,6 +96,7 @@ impl Services { typing: build!(rooms::typing::Service), user: build!(rooms::user::Service), }, + federation: build!(federation::Service), sending: build!(sending::Service), server_keys: build!(server_keys::Service), sync: build!(sync::Service),