Skip to content

Commit

Permalink
Bayes filtering per account (#819) + Delivered-To header (#916)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Dec 24, 2024
1 parent 2cfe467 commit 6520917
Show file tree
Hide file tree
Showing 58 changed files with 1,841 additions and 588 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
resolver = "2"
members = [
"crates/main",
# "crates/jmap",
# "crates/jmap-proto",
# "crates/imap",
# "crates/imap-proto",
# "crates/smtp",
# "crates/managesieve",
# "crates/pop3",
# "crates/spam-filter",
"crates/jmap",
"crates/jmap-proto",
"crates/imap",
"crates/imap-proto",
"crates/smtp",
"crates/managesieve",
"crates/pop3",
"crates/spam-filter",
"crates/nlp",
"crates/store",
"crates/directory",
Expand Down
13 changes: 12 additions & 1 deletion crates/common/src/config/smtp/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub enum AddressMapping {
#[derive(Clone)]
pub struct Data {
pub script: IfBlock,
pub spam_filter: IfBlock,

// Limits
pub max_messages: IfBlock,
Expand All @@ -143,6 +144,7 @@ pub struct Data {
pub add_auth_results: IfBlock,
pub add_message_id: IfBlock,
pub add_date: IfBlock,
pub add_delivered_to: bool,
}

#[derive(Clone)]
Expand Down Expand Up @@ -411,6 +413,11 @@ impl SessionConfig {
"session.data.limits.received-headers",
&has_rcpt_vars,
),
(
&mut session.data.spam_filter,
"session.data.spam-filter",
&has_rcpt_vars,
),
(
&mut session.data.add_received,
"session.data.add-headers.received",
Expand Down Expand Up @@ -446,7 +453,9 @@ impl SessionConfig {
*value = if_block;
}
}

session.data.add_delivered_to = config
.property_or_default("session.data.add-headers.delivered-to", "true")
.unwrap_or(true);
session
}
}
Expand Down Expand Up @@ -773,6 +782,7 @@ impl Default for SessionConfig {
},
data: Data {
script: IfBlock::empty("session.data.script"),
spam_filter: IfBlock::new::<()>("session.data.spam-filter", [], "true"),
max_messages: IfBlock::new::<()>("session.data.limits.messages", [], "10"),
max_message_size: IfBlock::new::<()>("session.data.limits.size", [], "104857600"),
max_received_headers: IfBlock::new::<()>(
Expand Down Expand Up @@ -810,6 +820,7 @@ impl Default for SessionConfig {
[("local_port == 25", "true")],
"false",
),
add_delivered_to: false,
},
extensions: Extensions {
pipelining: IfBlock::new::<()>("session.extensions.pipelining", [], "true"),
Expand Down
17 changes: 14 additions & 3 deletions crates/common/src/config/spamfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct SpamFilterConfig {
pub struct SpamFilterHeaderConfig {
pub status: Option<String>,
pub result: Option<String>,
pub bayes_result: Option<String>,
pub llm: Option<String>,
}

Expand Down Expand Up @@ -81,7 +82,9 @@ pub struct BayesConfig {
pub auto_learn_ham_threshold: f64,
pub score_spam: f64,
pub score_ham: f64,
pub enabled_account: bool,
pub account_score_spam: f64,
pub account_score_ham: f64,
pub account_classify: bool,
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -319,6 +322,7 @@ impl SpamFilterHeaderConfig {
("status", &mut header.status),
("result", &mut header.result),
("llm", &mut header.llm),
("bayes", &mut header.bayes_result),
] {
if config
.property_or_default(("spam-filter.header", typ, "enable"), "true")
Expand Down Expand Up @@ -549,9 +553,15 @@ impl BayesConfig {
score_ham: config
.property_or_default("spam-filter.bayes.score.ham", "0.5")
.unwrap_or(0.5),
enabled_account: config
.property_or_default("spam-filter.bayes.enable-account", "false")
account_classify: config
.property_or_default("spam-filter.bayes.account.enable", "false")
.unwrap_or(false),
account_score_spam: config
.property_or_default("spam-filter.bayes.account.score.spam", "0.7")
.unwrap_or(0.7),
account_score_ham: config
.property_or_default("spam-filter.bayes.account.score.ham", "0.5")
.unwrap_or(0.5),
}
.into()
}
Expand Down Expand Up @@ -633,6 +643,7 @@ impl Default for SpamFilterHeaderConfig {
SpamFilterHeaderConfig {
status: "X-Spam-Status".to_string().into(),
result: "X-Spam-Result".to_string().into(),
bayes_result: "X-Spam-Bayes".to_string().into(),
llm: "X-Spam-LLM".to_string().into(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub const KV_TRUSTED_REPLY: u8 = 19;
pub const KV_LOCK_PURGE_ACCOUNT: u8 = 20;
pub const KV_LOCK_QUEUE_MESSAGE: u8 = 21;
pub const KV_LOCK_QUEUE_REPORT: u8 = 22;
pub const KV_LOCK_FTS: u8 = 23;
pub const KV_LOCK_EMAIL_TASK: u8 = 23;
pub const KV_LOCK_HOUSEKEEPER: u8 = 24;

#[derive(Clone)]
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/manager/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use store::{
roaring::RoaringBitmap,
write::{
key::DeserializeBigEndian, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass,
FtsQueueClass, LookupClass, MaybeDynamicId, MaybeDynamicValue, Operation, TagValue,
LookupClass, MaybeDynamicId, MaybeDynamicValue, Operation, TagValue, TaskQueueClass,
ValueClass,
},
BlobStore, Serialize, Store, U32_LEN,
Expand Down Expand Up @@ -145,7 +145,7 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
if account_id != u32::MAX && document_id != u32::MAX {
if reader.version == 1 && collection == email_collection {
batch.set(
ValueClass::FtsQueue(FtsQueueClass {
ValueClass::TaskQueue(TaskQueueClass::IndexEmail {
seq,
hash: hash.clone(),
}),
Expand Down
7 changes: 4 additions & 3 deletions crates/directory/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ impl Permission {
Permission::Undelete => "Restore deleted items",
Permission::DkimSignatureCreate => "Create DKIM signatures for email authentication",
Permission::DkimSignatureGet => "Retrieve DKIM signature information",
Permission::UpdateSpamFilter => "Modify spam filter settings",
Permission::UpdateWebadmin => "Modify web admin interface settings",
Permission::SpamFilterUpdate => "Modify spam filter settings",
Permission::WebadminUpdate => "Modify web admin interface settings",
Permission::LogsView => "Access system logs",
Permission::SieveRun => "Execute Sieve scripts from the REST API",
Permission::SpamFilterTrain => "Train the spam filter",
Permission::SpamFilterClassify => "Classify emails with the spam filter",
Permission::Restart => "Restart the email server",
Permission::TracingList => "View stored traces",
Permission::TracingGet => "Retrieve specific trace information",
Expand Down
2 changes: 2 additions & 0 deletions crates/directory/src/core/principal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,8 @@ impl Permission {
| Permission::SieveRenameScript
| Permission::SieveCheckScript
| Permission::SieveHaveSpace
| Permission::SpamFilterClassify
| Permission::SpamFilterTrain
)
}

Expand Down
10 changes: 6 additions & 4 deletions crates/directory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ pub enum Permission {
Undelete,
DkimSignatureCreate,
DkimSignatureGet,
UpdateSpamFilter,
UpdateWebadmin,
SpamFilterUpdate,
WebadminUpdate,
LogsView,
SieveRun,
SpamFilterTrain,
Restart,
TracingList,
TracingGet,
Expand Down Expand Up @@ -264,7 +264,9 @@ pub enum Permission {
OauthClientOverride,

AiModelInteract,
Troubleshoot, // WARNING: add new ids at the end (TODO: use static ids)
Troubleshoot,
SpamFilterClassify,
// WARNING: add new ids at the end (TODO: use static ids)
}

pub type Permissions = Bitset<{ Permission::COUNT.div_ceil(std::mem::size_of::<usize>()) }>;
Expand Down
15 changes: 10 additions & 5 deletions crates/imap/src/op/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use crate::{
};
use common::{listener::SessionStream, MailboxId};
use jmap::{
email::ingest::{EmailIngest, IngestEmail, IngestSource},
email::{
bayes::EmailBayesTrain,
ingest::{EmailIngest, IngestEmail, IngestSource},
},
services::state::StateManager,
};
use jmap_proto::types::{acl::Acl, keyword::Keyword, state::StateChange, type_state::DataType};
Expand Down Expand Up @@ -91,12 +94,13 @@ impl<T: SessionStream> SessionData<T> {
}

// Obtain quota
let resource_token = self
let access_token = self
.server
.get_cached_access_token(mailbox.account_id)
.await
.imap_ctx(&arguments.tag, trc::location!())?
.as_resource_token();
.imap_ctx(&arguments.tag, trc::location!())?;
let resource_token = access_token.as_resource_token();
let spam_train = self.server.email_bayes_can_train(&access_token);

// Append messages
let mut response = StatusResponse::completed(Command::Append);
Expand All @@ -113,7 +117,8 @@ impl<T: SessionStream> SessionData<T> {
keywords: message.flags.into_iter().map(Keyword::from).collect(),
received_at: message.received_at.map(|d| d as u64),
source: IngestSource::Imap,
encrypt: self.server.core.jmap.encrypt && self.server.core.jmap.encrypt_append,
spam_classify: false,
spam_train,
session_id: self.session_id,
})
.await
Expand Down
62 changes: 51 additions & 11 deletions crates/imap/src/op/copy_move.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::{
use common::{listener::SessionStream, MailboxId};
use jmap::{
changes::write::ChangeLog,
email::{copy::EmailCopy, ingest::EmailIngest, set::TagManager},
mailbox::UidMailbox,
services::state::StateManager,
email::{bayes::EmailBayesTrain, copy::EmailCopy, ingest::EmailIngest, set::TagManager},
mailbox::{UidMailbox, JUNK_ID},
services::{index::Indexer, state::StateManager},
JmapMethods,
};
use jmap_proto::{
Expand All @@ -33,7 +33,7 @@ use jmap_proto::{
};
use store::{
roaring::RoaringBitmap,
write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_VALUE},
write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, ValueClass, F_VALUE},
};

use super::ImapContext;
Expand Down Expand Up @@ -171,10 +171,19 @@ impl<T: SessionStream> SessionData<T> {
let mut changelog = ChangeLogBuilder::new();
let mut did_move = false;
let mut copied_ids = Vec::with_capacity(ids.len());
let access_token = self
.server
.get_cached_access_token(dest_mailbox.account_id)
.await
.imap_ctx(&arguments.tag, trc::location!())?;

if src_mailbox.id.account_id == dest_mailbox.account_id {
// Mailboxes are in the same account
let account_id = src_mailbox.id.account_id;
let dest_mailbox_id = UidMailbox::new_unassigned(dest_mailbox_id);
let can_spam_train = self.server.email_bayes_can_train(&access_token);
let mut has_spam_train_tasks = false;

for (id, imap_id) in ids {
// Obtain mailbox tags
let (mut mailboxes, thread_id) = if let Some(result) = self
Expand Down Expand Up @@ -216,7 +225,7 @@ impl<T: SessionStream> SessionData<T> {
}
}

// Write changes
// Perepare write batch
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
Expand All @@ -231,28 +240,59 @@ impl<T: SessionStream> SessionData<T> {
.imap_ctx(&arguments.tag, trc::location!())?;
}
batch.value(Property::Cid, changelog.change_id, F_VALUE);

// Add bayes train task
if can_spam_train {
if dest_mailbox_id.mailbox_id == JUNK_ID {
batch.set(
ValueClass::TaskQueue(
self.server
.email_bayes_queue_task_build(account_id, id, true)
.await
.imap_ctx(&arguments.tag, trc::location!())?,
),
vec![],
);
has_spam_train_tasks = true;
} else if src_mailbox.id.mailbox_id == JUNK_ID {
batch.set(
ValueClass::TaskQueue(
self.server
.email_bayes_queue_task_build(account_id, id, false)
.await
.imap_ctx(&arguments.tag, trc::location!())?,
),
vec![],
);
has_spam_train_tasks = true;
}
}

// Write changes
self.server
.write_batch(batch)
.await
.imap_ctx(&arguments.tag, trc::location!())?;

// Update changelog
changelog.log_update(Collection::Email, Id::from_parts(thread_id, id));
changelog.log_child_update(Collection::Mailbox, dest_mailbox_id.mailbox_id);
if is_move {
changelog.log_child_update(Collection::Mailbox, src_mailbox.id.mailbox_id);
did_move = true;
}
}

// Trigger Bayes training
if has_spam_train_tasks {
self.server.notify_task_queue();
}
} else {
// Obtain quota for target account
let src_account_id = src_mailbox.id.account_id;
let mut dest_change_id = None;
let dest_account_id = dest_mailbox.account_id;
let resource_token = self
.server
.get_cached_access_token(dest_account_id)
.await
.imap_ctx(&arguments.tag, trc::location!())?
.as_resource_token();
let resource_token = access_token.as_resource_token();
let mut destroy_ids = RoaringBitmap::new();
for (id, imap_id) in ids {
match self
Expand Down
Loading

0 comments on commit 6520917

Please sign in to comment.