Skip to content

Commit

Permalink
Pin async function evaluation in expressions and reduce trc::Result size
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Jan 6, 2025
1 parent 043b53f commit 9e69a32
Show file tree
Hide file tree
Showing 24 changed files with 242 additions and 143 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ All notable changes to this project will be documented in this file. This projec

## [0.11.0] - 2025-01-06

This version includes breaking changes to the configuration file. Please read [UPGRADING.md](UPGRADING.md) for details.
This version includes breaking changes to the configuration file, please read [UPGRADING.md](UPGRADING.md) for details.
To upgrade replace the `stalwart-mail` binary and then upgrade to the latest web-admin.

### Added
- Spam filter rewritten in Rust for a significant performance improvement.
Expand Down
2 changes: 1 addition & 1 deletion UPGRADING.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Upgrading from `v0.10.x` to `v0.11.0`
------------------------------------

Version `0.11.0` introduces breaking changes to the spam filter configuration. No data migration is required but, if changes were made to the previous spam filter, the configuration of the new spam filter should be reviewed. In particular:
Version `0.11.0` introduces breaking changes to the spam filter configuration. Although no data migration is required, if changes were made to the previous spam filter, the configuration of the new spam filter should be reviewed. In particular:

- `lookup.spam-*` settings are no longer used, these have been replaced by `spam-filter.*` settings. Review the [updated documentation](http://stalw.art/docs/spamfilter/overview).
- Previous `spam-filter` and `track-replies` Sieve scripts cannot be used with the new version. They have been replaced by a built-in spam filter written in Rust.
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/auth/oauth/introspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Server {
}),
Err(err)
if matches!(
err.inner,
err.event_type(),
EventType::Auth(AuthEvent::Error) | EventType::Auth(AuthEvent::TokenExpired)
) =>
{
Expand Down
118 changes: 77 additions & 41 deletions crates/common/src/expr/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ impl Server {
return None;
}

match if_block.eval(resolver, self, session_id).await {
match (EvalContext {
resolver,
core: self,
expr: if_block,
captures: Vec::new(),
session_id,
})
.eval()
.await
{
Ok(result) => {
trc::event!(
Eval(EvalEvent::Result),
Expand Down Expand Up @@ -82,7 +91,16 @@ impl Server {
return None;
}

match expr.eval(resolver, self, &mut Vec::new(), session_id).await {
match (EvalContext {
resolver,
core: self,
expr,
captures: &mut Vec::new(),
session_id,
})
.eval()
.await
{
Ok(result) => {
trc::event!(
Eval(EvalEvent::Result),
Expand Down Expand Up @@ -119,60 +137,71 @@ impl Server {
}
}

impl IfBlock {
pub async fn eval<'x, V: ResolveVariable>(
&'x self,
resolver: &'x V,
core: &Server,
session_id: u64,
) -> trc::Result<Variable<'x>> {
let mut captures = Vec::new();

for if_then in &self.if_then {
if if_then
.expr
.eval(resolver, core, &mut captures, session_id)
.await?
.to_bool()
struct EvalContext<'x, 'y, V: ResolveVariable, T, C> {
resolver: &'x V,
core: &'y Server,
expr: &'x T,
captures: C,
session_id: u64,
}

impl<'x, 'y, V: ResolveVariable> EvalContext<'x, 'y, V, IfBlock, Vec<String>> {
async fn eval(&mut self) -> trc::Result<Variable<'x>> {
for if_then in &self.expr.if_then {
if (EvalContext {
resolver: self.resolver,
core: self.core,
expr: &if_then.expr,
captures: &mut self.captures,
session_id: self.session_id,
})
.eval()
.await?
.to_bool()
{
return if_then
.then
.eval(resolver, core, &mut captures, session_id)
.await;
return (EvalContext {
resolver: self.resolver,
core: self.core,
expr: &if_then.then,
captures: &mut self.captures,
session_id: self.session_id,
})
.eval()
.await;
}
}

self.default
.eval(resolver, core, &mut captures, session_id)
.await
(EvalContext {
resolver: self.resolver,
core: self.core,
expr: &self.expr.default,
captures: &mut self.captures,
session_id: self.session_id,
})
.eval()
.await
}
}

impl Expression {
async fn eval<'x, 'y, V: ResolveVariable>(
&'x self,
resolver: &'x V,
core: &Server,
captures: &'y mut Vec<String>,
session_id: u64,
) -> trc::Result<Variable<'x>> {
impl<'x, 'y, V: ResolveVariable> EvalContext<'x, 'y, V, Expression, &mut Vec<String>> {
async fn eval(&mut self) -> trc::Result<Variable<'x>> {
let mut stack = Vec::new();
let mut exprs = self.items.iter();
let mut exprs = self.expr.items.iter();

while let Some(expr) = exprs.next() {
match expr {
ExpressionItem::Variable(v) => {
stack.push(resolver.resolve_variable(*v));
stack.push(self.resolver.resolve_variable(*v));
}
ExpressionItem::Global(v) => {
stack.push(resolver.resolve_global(v));
stack.push(self.resolver.resolve_global(v));
}
ExpressionItem::Constant(val) => {
stack.push(Variable::from(val));
}
ExpressionItem::Capture(v) => {
stack.push(Variable::String(Cow::Owned(
captures
self.captures
.get(*v as usize)
.map(|v| v.as_str())
.unwrap_or_default()
Expand Down Expand Up @@ -216,8 +245,12 @@ impl Expression {
let result = if let Some((_, fnc, _)) = FUNCTIONS.get(*id as usize) {
(fnc)(arguments)
} else {
core.eval_fnc(*id - FUNCTIONS.len() as u32, arguments, session_id)
.await?
Box::pin(self.core.eval_fnc(
*id - FUNCTIONS.len() as u32,
arguments,
self.session_id,
))
.await?
};

stack.push(result);
Expand Down Expand Up @@ -247,23 +280,26 @@ impl Expression {
stack.push(Variable::Array(items));
}
ExpressionItem::Regex(regex) => {
captures.clear();
self.captures.clear();
let value = stack.pop().unwrap_or_default().into_string();

if let Some(captures_) = regex.captures(value.as_ref()) {
for capture in captures_.iter() {
captures.push(capture.map_or("", |m| m.as_str()).to_string());
self.captures
.push(capture.map_or("", |m| m.as_str()).to_string());
}
}

stack.push(Variable::Integer(!captures.is_empty() as i64));
stack.push(Variable::Integer(!self.captures.is_empty() as i64));
}
}
}

Ok(stack.pop().unwrap_or_default())
}
}

impl Expression {
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/telemetry/tracers/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ fn build_any_value(value: &trc::Value) -> AnyValue {
trc::Value::Event(v) => AnyValue::Map(Box::new(
[(
Key::from_static_str("eventName"),
AnyValue::String(v.inner.name().into()),
AnyValue::String(v.event_type().name().into()),
)]
.into_iter()
.chain(
v.keys
v.keys()
.iter()
.map(|(k, v)| (build_key(k), build_any_value(v))),
)
Expand Down
4 changes: 2 additions & 2 deletions crates/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ tokio = { version = "1.23", features = ["full"] }
jemallocator = "0.5.0"

[features]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"]
#default = ["rocks", "enterprise"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"]
default = ["rocks", "enterprise"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres"]
Expand Down
10 changes: 5 additions & 5 deletions crates/smtp/src/inbound/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<T: SessionStream> Session<T> {
}),
SpanId = self.data.session_id,
Strict = strict,
Result = dkim_output.iter().map(trc::Event::from).collect::<Vec<_>>(),
Result = dkim_output.iter().map(trc::Error::from).collect::<Vec<_>>(),
Elapsed = time.elapsed(),
);

Expand Down Expand Up @@ -194,7 +194,7 @@ impl<T: SessionStream> Session<T> {
}),
SpanId = self.data.session_id,
Strict = strict,
Result = trc::Event::from(arc_output.result()),
Result = trc::Error::from(arc_output.result()),
Elapsed = time.elapsed(),
);

Expand Down Expand Up @@ -295,7 +295,7 @@ impl<T: SessionStream> Session<T> {
Strict = strict,
Domain = dmarc_output.domain().to_string(),
Policy = dmarc_policy.to_string(),
Result = trc::Event::from(&dmarc_result),
Result = trc::Error::from(&dmarc_result),
Elapsed = time.elapsed(),
);

Expand Down Expand Up @@ -417,7 +417,7 @@ impl<T: SessionStream> Session<T> {
set.write_header(&mut headers);
}
Err(err) => {
trc::error!(trc::Event::from(err)
trc::error!(trc::Error::from(err)
.span_id(self.data.session_id)
.details("Failed to ARC seal message"));
}
Expand Down Expand Up @@ -649,7 +649,7 @@ impl<T: SessionStream> Session<T> {
signature.write_header(&mut headers);
}
Err(err) => {
trc::error!(trc::Event::from(err)
trc::error!(trc::Error::from(err)
.span_id(self.data.session_id)
.details("Failed to DKIM sign message"));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/smtp/src/inbound/ehlo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<T: SessionStream> Session<T> {
}),
SpanId = self.data.session_id,
Domain = self.data.helo_domain.clone(),
Result = trc::Event::from(&spf_output),
Result = trc::Error::from(&spf_output),
Elapsed = time.elapsed(),
);

Expand Down
4 changes: 2 additions & 2 deletions crates/smtp/src/inbound/mail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<T: SessionStream> Session<T> {
}),
SpanId = self.data.session_id,
Domain = self.data.helo_domain.clone(),
Result = trc::Event::from(&iprev),
Result = trc::Error::from(&iprev),
Elapsed = time.elapsed(),
);

Expand Down Expand Up @@ -482,7 +482,7 @@ impl<T: SessionStream> Session<T> {
"<>"
}
.to_string(),
Result = trc::Event::from(&spf_output),
Result = trc::Error::from(&spf_output),
Elapsed = time.elapsed(),
);

Expand Down
8 changes: 4 additions & 4 deletions crates/smtp/src/outbound/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl DeliveryAttempt {
TlsRpt(TlsRptEvent::RecordFetchError),
SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err),
CausedBy = trc::Error::from(err),
Elapsed = time.elapsed(),
);
None
Expand Down Expand Up @@ -466,7 +466,7 @@ impl DeliveryAttempt {
MtaSts(MtaStsEvent::PolicyFetchError),
SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err.clone()),
CausedBy = trc::Error::from(err.clone()),
Strict = strict,
Elapsed = time.elapsed(),
);
Expand Down Expand Up @@ -532,7 +532,7 @@ impl DeliveryAttempt {
Delivery(DeliveryEvent::MxLookupFailed),
SpanId = message.span_id,
Domain = domain.domain.clone(),
CausedBy = trc::Event::from(err.clone()),
CausedBy = trc::Error::from(err.clone()),
Elapsed = time.elapsed(),
);

Expand Down Expand Up @@ -820,7 +820,7 @@ impl DeliveryAttempt {
SpanId = message.span_id,
Domain = domain.domain.clone(),
Hostname = envelope.mx.to_string(),
CausedBy = trc::Event::from(err.clone()),
CausedBy = trc::Error::from(err.clone()),
Strict = strict,
Elapsed = time.elapsed(),
);
Expand Down
2 changes: 1 addition & 1 deletion crates/smtp/src/reporting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl SmtpReporting for Server {
signature.write_header(&mut headers);
}
Err(err) => {
trc::error!(trc::Event::from(err)
trc::error!(trc::Error::from(err)
.span_id(message.span_id)
.details("Failed to sign message")
.caused_by(trc::location!()));
Expand Down
2 changes: 1 addition & 1 deletion crates/smtp/src/scripts/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl RunScript for Server {
signature.write_header(&mut headers);
}
Err(err) => {
trc::error!(trc::Event::from(err)
trc::error!(trc::Error::from(err)
.span_id(session_id)
.caused_by(trc::location!())
.details("DKIM sign failed"));
Expand Down
2 changes: 1 addition & 1 deletion crates/trc/event-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ pub fn event(input: TokenStream) -> TokenStream {
(trc::Key::#key, trc::Value::from(#value))
}
});
// This avoid having to evaluate expensive values when we know we are not interested in the event
// This avoids having to evaluate expensive values when we know we are not interested in the event
let key_value_metric_tokens = key_values.iter().filter_map(|(key, value)| {
if key.is_metric_key() {
Some(quote! {
Expand Down
Loading

0 comments on commit 9e69a32

Please sign in to comment.