Skip to content

Commit

Permalink
split federation request from sending service
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Volk <[email protected]>
  • Loading branch information
jevolk committed Jan 26, 2025
1 parent e094169 commit 7456ea6
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 84 deletions.
6 changes: 2 additions & 4 deletions src/admin/debug/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,7 @@ pub(super) async fn get_signing_keys(
notary: Option<Box<ServerName>>,
query: bool,
) -> Result<RoomMessageEventContent> {
let server_name =
server_name.unwrap_or_else(|| self.services.server.name.clone().into());
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into());

if let Some(notary) = notary {
let signing_keys = self
Expand Down Expand Up @@ -793,8 +792,7 @@ pub(super) async fn get_verify_keys(
&self,
server_name: Option<Box<ServerName>>,
) -> Result<RoomMessageEventContent> {
let server_name =
server_name.unwrap_or_else(|| self.services.server.name.clone().into());
let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone().into());

let keys = self
.services
Expand Down
157 changes: 92 additions & 65 deletions src/service/sending/send.rs → src/service/federation/execute.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::mem;
use std::{fmt::Debug, mem};

use bytes::Bytes;
use conduwuit::{
Expand All @@ -20,82 +20,109 @@ use ruma::{

use crate::resolver::actual::ActualDest;

impl super::Service {
#[tracing::instrument(
/// Sends a request to a federation server
#[implement(super::Service)]
#[tracing::instrument(skip_all, name = "request", level = "debug")]
pub async fn execute<T>(&self, dest: &ServerName, request: T) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.federation;
self.execute_on(client, dest, request).await
}

/// Like execute() but with a very large timeout
#[implement(super::Service)]
#[tracing::instrument(skip_all, name = "synapse", level = "debug")]
pub async fn execute_synapse<T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.synapse;
self.execute_on(client, dest, request).await
}

#[implement(super::Service)]
#[tracing::instrument(
level = "debug"
skip(self, client, request),
)]
pub async fn send<T>(
&self,
client: &Client,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
if !self.server.config.allow_federation {
return Err!(Config("allow_federation", "Federation is disabled."));
}

if self
.server
.config
.forbidden_remote_server_names
.contains(dest)
{
return Err!(Request(Forbidden(debug_warn!(
"Federation with {dest} is not allowed."
))));
}

let actual = self.services.resolver.get_actual_dest(dest).await?;
let request = into_http_request::<T>(&actual, request)?;
let request = self.prepare(dest, request)?;
self.execute::<T>(dest, &actual, request, client).await
pub async fn execute_on<T>(
&self,
client: &Client,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
if !self.services.server.config.allow_federation {
return Err!(Config("allow_federation", "Federation is disabled."));
}

async fn execute<T>(
&self,
dest: &ServerName,
actual: &ActualDest,
request: Request,
client: &Client,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
if self
.services
.server
.config
.forbidden_remote_server_names
.contains(dest)
{
let url = request.url().clone();
let method = request.method().clone();

debug!(?method, ?url, "Sending request");
match client.execute(request).await {
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
| Err(error) =>
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
}
return Err!(Request(Forbidden(debug_warn!("Federation with {dest} is not allowed."))));
}

fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Result<Request> {
self.sign_request(&mut request, dest);

let request = Request::try_from(request)?;
self.validate_url(request.url())?;
self.server.check_running()?;
let actual = self.services.resolver.get_actual_dest(dest).await?;
let request = into_http_request::<T>(&actual, request)?;
let request = self.prepare(dest, request)?;
self.perform::<T>(dest, &actual, request, client).await
}

Ok(request)
#[implement(super::Service)]
async fn perform<T>(
&self,
dest: &ServerName,
actual: &ActualDest,
request: Request,
client: &Client,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Send,
{
let url = request.url().clone();
let method = request.method().clone();

debug!(?method, ?url, "Sending request");
match client.execute(request).await {
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await,
| Err(error) =>
Err(handle_error(actual, &method, &url, error).expect_err("always returns error")),
}
}

fn validate_url(&self, url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {
if let Ok(ip) = IPAddress::parse(url_host) {
trace!("Checking request URL IP {ip:?}");
self.services.resolver.validate_ip(&ip)?;
}
}
#[implement(super::Service)]
fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Result<Request> {
self.sign_request(&mut request, dest);

let request = Request::try_from(request)?;
self.validate_url(request.url())?;
self.services.server.check_running()?;

Ok(())
Ok(request)
}

#[implement(super::Service)]
fn validate_url(&self, url: &Url) -> Result<()> {
if let Some(url_host) = url.host_str() {
if let Ok(ip) = IPAddress::parse(url_host) {
trace!("Checking request URL IP {ip:?}");
self.services.resolver.validate_ip(&ip)?;
}
}

Ok(())
}

async fn handle_response<T>(
Expand Down Expand Up @@ -195,7 +222,7 @@ fn sign_request(&self, http_request: &mut http::Request<Vec<u8>>, dest: &ServerN
type Value = CanonicalJsonValue;
type Object = CanonicalJsonObject;

let origin = self.services.globals.server_name();
let origin = &self.services.server.name;
let body = http_request.body();
let uri = http_request
.uri()
Expand Down
33 changes: 33 additions & 0 deletions src/service/federation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
mod execute;

use std::sync::Arc;

use conduwuit::{Result, Server};

use crate::{client, resolver, server_keys, Dep};

pub struct Service {
services: Services,
}

struct Services {
server: Arc<Server>,
client: Dep<client::Service>,
resolver: Dep<resolver::Service>,
server_keys: Dep<server_keys::Service>,
}

impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
services: Services {
server: args.server.clone(),
client: args.depend::<client::Service>("client"),
resolver: args.depend::<resolver::Service>("resolver"),
server_keys: args.depend::<server_keys::Service>("server_keys"),
},
}))
}

fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
1 change: 1 addition & 0 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod admin;
pub mod appservice;
pub mod client;
pub mod emergency;
pub mod federation;
pub mod globals;
pub mod key_backups;
pub mod media;
Expand Down
24 changes: 11 additions & 13 deletions src/service/sending/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod appservice;
mod data;
mod dest;
mod send;
mod sender;

use std::{
Expand Down Expand Up @@ -30,8 +29,8 @@ pub use self::{
sender::{EDU_LIMIT, PDU_LIMIT},
};
use crate::{
account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId,
server_keys, users, Dep,
account_data, client, federation, globals, presence, pusher, rooms,
rooms::timeline::RawPduId, users, Dep,
};

pub struct Service {
Expand All @@ -44,7 +43,6 @@ pub struct Service {
struct Services {
client: Dep<client::Service>,
globals: Dep<globals::Service>,
resolver: Dep<resolver::Service>,
state: Dep<rooms::state::Service>,
state_cache: Dep<rooms::state_cache::Service>,
user: Dep<rooms::user::Service>,
Expand All @@ -55,7 +53,7 @@ struct Services {
account_data: Dep<account_data::Service>,
appservice: Dep<crate::appservice::Service>,
pusher: Dep<pusher::Service>,
server_keys: Dep<server_keys::Service>,
federation: Dep<federation::Service>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -83,7 +81,6 @@ impl crate::Service for Service {
services: Services {
client: args.depend::<client::Service>("client"),
globals: args.depend::<globals::Service>("globals"),
resolver: args.depend::<resolver::Service>("resolver"),
state: args.depend::<rooms::state::Service>("rooms::state"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
user: args.depend::<rooms::user::Service>("rooms::user"),
Expand All @@ -94,7 +91,7 @@ impl crate::Service for Service {
account_data: args.depend::<account_data::Service>("account_data"),
appservice: args.depend::<crate::appservice::Service>("appservice"),
pusher: args.depend::<pusher::Service>("pusher"),
server_keys: args.depend::<server_keys::Service>("server_keys"),
federation: args.depend::<federation::Service>("federation"),
},
channels: (0..num_senders).map(|_| loole::unbounded()).collect(),
}))
Expand Down Expand Up @@ -277,7 +274,7 @@ impl Service {
}

/// Sends a request to a federation server
#[tracing::instrument(skip_all, name = "request", level = "debug")]
#[inline]
pub async fn send_federation_request<T>(
&self,
dest: &ServerName,
Expand All @@ -286,12 +283,11 @@ impl Service {
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.federation;
self.send(client, dest, request).await
self.services.federation.execute(dest, request).await
}

/// Like send_federation_request() but with a very large timeout
#[tracing::instrument(skip_all, name = "synapse", level = "debug")]
#[inline]
pub async fn send_synapse_request<T>(
&self,
dest: &ServerName,
Expand All @@ -300,8 +296,10 @@ impl Service {
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.synapse;
self.send(client, dest, request).await
self.services
.federation
.execute_synapse(dest, request)
.await
}

/// Sends a request to an appservice
Expand Down
2 changes: 1 addition & 1 deletion src/service/sending/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ impl Service {
};

let client = &self.services.client.sender;
self.send(client, &server, request)
self.services.federation.execute_on(client, &server, request)
.await
.inspect(|response| {
response
Expand Down
4 changes: 3 additions & 1 deletion src/service/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use database::Database;
use tokio::sync::Mutex;

use crate::{
account_data, admin, appservice, client, emergency, globals, key_backups,
account_data, admin, appservice, client, emergency, federation, globals, key_backups,
manager::Manager,
media, presence, pusher, resolver, rooms, sending, server_keys, service,
service::{Args, Map, Service},
Expand All @@ -30,6 +30,7 @@ pub struct Services {
pub pusher: Arc<pusher::Service>,
pub resolver: Arc<resolver::Service>,
pub rooms: rooms::Service,
pub federation: Arc<federation::Service>,
pub sending: Arc<sending::Service>,
pub server_keys: Arc<server_keys::Service>,
pub sync: Arc<sync::Service>,
Expand Down Expand Up @@ -95,6 +96,7 @@ impl Services {
typing: build!(rooms::typing::Service),
user: build!(rooms::user::Service),
},
federation: build!(federation::Service),
sending: build!(sending::Service),
server_keys: build!(server_keys::Service),
sync: build!(sync::Service),
Expand Down

0 comments on commit 7456ea6

Please sign in to comment.