From 5cc0a518fed96866c90ccacca57d84912e448626 Mon Sep 17 00:00:00 2001 From: Adam Cooke Date: Tue, 12 Mar 2024 09:01:51 +0000 Subject: [PATCH] feat: automatically remove queued messages with stale locks --- app/controllers/servers_controller.rb | 2 +- app/lib/worker/process.rb | 3 +- app/models/concerns/has_message.rb | 6 +- app/models/queued_message.rb | 11 +- .../tidy_queued_messages_task.rb | 18 ++ app/views/messages/_list.html.haml | 64 ++++-- doc/config/environment-variables.md | 1 + doc/config/yaml.yml | 2 + lib/postal/config_schema.rb | 5 + spec/factories/ip_pool_factory.rb | 6 + spec/models/queued_message_spec.rb | 197 ++++++++++++++++++ .../tidy_queued_messages_task_spec.rb | 30 +++ 12 files changed, 320 insertions(+), 25 deletions(-) create mode 100644 app/scheduled_tasks/tidy_queued_messages_task.rb create mode 100644 spec/models/queued_message_spec.rb create mode 100644 spec/scheduled_tasks/tidy_queued_messages_task_spec.rb diff --git a/app/controllers/servers_controller.rb b/app/controllers/servers_controller.rb index 1058b53b0..c5b40ee5e 100644 --- a/app/controllers/servers_controller.rb +++ b/app/controllers/servers_controller.rb @@ -73,7 +73,7 @@ def destroy end def queue - @messages = @server.queued_messages.order(id: :desc).page(params[:page]) + @messages = @server.queued_messages.order(id: :desc).page(params[:page]).includes(:ip_address) @messages_with_message = @messages.include_message end diff --git a/app/lib/worker/process.rb b/app/lib/worker/process.rb index 11167e5b5..5a5d0a343 100644 --- a/app/lib/worker/process.rb +++ b/app/lib/worker/process.rb @@ -40,7 +40,8 @@ class Process ProcessMessageRetentionScheduledTask, PruneSuppressionListsScheduledTask, PruneWebhookRequestsScheduledTask, - SendNotificationsScheduledTask + SendNotificationsScheduledTask, + TidyQueuedMessagesTask ].freeze # @param [Integer] thread_count The number of worker threads to run in this process diff --git a/app/models/concerns/has_message.rb b/app/models/concerns/has_message.rb index 99d4897f0..89b15d8e6 100644 --- a/app/models/concerns/has_message.rb +++ b/app/models/concerns/has_message.rb @@ -7,7 +7,11 @@ def self.included(base) end def message - @message ||= server.message_db.message(message_id) + return @message if instance_variable_defined?("@message") + + @message = server.message_db.message(message_id) + rescue Postal::MessageDB::Message::NotFound + @message = nil end def message=(message) diff --git a/app/models/queued_message.rb b/app/models/queued_message.rb index 69988de2a..11e9da014 100644 --- a/app/models/queued_message.rb +++ b/app/models/queued_message.rb @@ -33,14 +33,14 @@ class QueuedMessage < ApplicationRecord belongs_to :server belongs_to :ip_address, optional: true - belongs_to :user, optional: true before_create :allocate_ip_address scope :ready_with_delayed_retry, -> { where("retry_after IS NULL OR retry_after < ?", 30.seconds.ago) } + scope :with_stale_lock, -> { where("locked_at IS NOT NULL AND locked_at < ?", Postal::Config.postal.queued_message_lock_stale_days.days.ago) } def retry_now - update(retry_after: nil) + update!(retry_after: nil) end def send_bounce @@ -50,9 +50,14 @@ def send_bounce end def allocate_ip_address - return unless Postal.ip_pools? && message && pool = server.ip_pool_for_message(message) + return unless Postal.ip_pools? + + pool = server.ip_pool_for_message(message) + return if pool.nil? self.ip_address = pool.ip_addresses.select_by_priority + rescue Postal::MessageDB::Message::NotFound + nil end def batchable_messages(limit = 10) diff --git a/app/scheduled_tasks/tidy_queued_messages_task.rb b/app/scheduled_tasks/tidy_queued_messages_task.rb new file mode 100644 index 000000000..66b356f6f --- /dev/null +++ b/app/scheduled_tasks/tidy_queued_messages_task.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class TidyQueuedMessagesTask < ApplicationScheduledTask + + def call + QueuedMessage.with_stale_lock.in_batches do |messages| + messages.each do |message| + logger.info "removing queued message #{message.id} (locked at #{message.locked_at} by #{message.locked_by})" + message.destroy + end + end + end + + def self.next_run_after + quarter_to_each_hour + end + +end diff --git a/app/views/messages/_list.html.haml b/app/views/messages/_list.html.haml index 4d1a4ad76..3d7e08994 100644 --- a/app/views/messages/_list.html.haml +++ b/app/views/messages/_list.html.haml @@ -1,24 +1,50 @@ %ul.messageList - for message in messages + - if message.is_a?(QueuedMessage) + - queued_message = message - message = message.message - %li.messageList__message - = link_to organization_server_message_path(organization, @server, message.id), :class => 'messageList__link' do - .messageList__details{:class => 'messageList__details--' + message.scope} - %p.messageList__subject= message.subject || "No subject" - %dl.messageList__addresses - %dt To - %dd - - if message.rcpt_to_return_path? - %span.returnPathTag Return Path - - else - = message.rcpt_to || "none" - %dt From - %dd= message.mail_from || "none" + + + - if message.nil? && queued_message + %li.messageList__message + .messageList__link + .messageList__details + %p.messageList__subject Deleted message ##{queued_message.message_id} + %dl.messageList__addresses + %dt Domain + %dd= queued_message.domain + %dt Locked + %dd= queued_message.locked? ? "Yes" : "No" + .messageList__meta + %p.messageList__timestamp= queued_message.created_at.in_time_zone.to_fs(:long) + %p.messageList__status + %span.label{:class => "label--messageStatus-deleted"} Deleted - .messageList__meta - %p.messageList__timestamp= message.timestamp.in_time_zone.to_fs(:long) - %p.messageList__status - - if message.read? - %span.label.label--purple Opened - %span.label{:class => "label--messageStatus-#{message.status.underscore}"}= message.status.underscore.humanize + + - else + %li.messageList__message + = link_to organization_server_message_path(organization, @server, message.id), :class => 'messageList__link' do + .messageList__details{:class => 'messageList__details--' + message.scope} + %p.messageList__subject= message.subject || "No subject" + %dl.messageList__addresses + %dt To + %dd + - if message.rcpt_to_return_path? + %span.returnPathTag Return Path + - else + = message.rcpt_to || "none" + %dt From + %dd= message.mail_from || "none" + - if queued_message + %dt Attempts + %dd= queued_message.attempts + %dt Retry after + %dd= queued_message.retry_after&.to_fs(:short) || "ASAP" + + .messageList__meta + %p.messageList__timestamp= message.timestamp.in_time_zone.to_fs(:long) + %p.messageList__status + - if message.read? + %span.label.label--purple Opened + %span.label{:class => "label--messageStatus-#{message.status.underscore}"}= message.status.underscore.humanize diff --git a/doc/config/environment-variables.md b/doc/config/environment-variables.md index 054822c84..37b9a4e01 100644 --- a/doc/config/environment-variables.md +++ b/doc/config/environment-variables.md @@ -18,6 +18,7 @@ This document contains all the environment variables which are available for thi | `POSTAL_SIGNING_KEY_PATH` | String | Path to the private key used for signing | $config-file-root/signing.key | | `POSTAL_SMTP_RELAYS` | Array of strings | An array of SMTP relays in the format of smtp://host:port | [] | | `POSTAL_TRUSTED_PROXIES` | Array of strings | An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses) | [] | +| `POSTAL_QUEUED_MESSAGE_LOCK_STALE_DAYS` | Integer | The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried. | 1 | | `WEB_SERVER_DEFAULT_PORT` | Integer | The default port the web server should listen on unless overriden by the PORT environment variable | 5000 | | `WEB_SERVER_DEFAULT_BIND_ADDRESS` | String | The default bind address the web server should listen on unless overriden by the BIND_ADDRESS environment variable | 127.0.0.1 | | `WEB_SERVER_MAX_THREADS` | Integer | The maximum number of threads which can be used by the web server | 5 | diff --git a/doc/config/yaml.yml b/doc/config/yaml.yml index 23edd0e5c..b9d91d783 100644 --- a/doc/config/yaml.yml +++ b/doc/config/yaml.yml @@ -29,6 +29,8 @@ postal: smtp_relays: [] # An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses) trusted_proxies: [] + # The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried. + queued_message_lock_stale_days: 1 web_server: # The default port the web server should listen on unless overriden by the PORT environment variable diff --git a/lib/postal/config_schema.rb b/lib/postal/config_schema.rb index 45a550865..8e9976273 100644 --- a/lib/postal/config_schema.rb +++ b/lib/postal/config_schema.rb @@ -91,6 +91,11 @@ module Postal description "An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses)" transform { |ip| IPAddr.new(ip) } end + + integer :queued_message_lock_stale_days do + description "The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried." + default 1 + end end group :web_server do diff --git a/spec/factories/ip_pool_factory.rb b/spec/factories/ip_pool_factory.rb index 2870204e4..d0de39fd1 100644 --- a/spec/factories/ip_pool_factory.rb +++ b/spec/factories/ip_pool_factory.rb @@ -19,5 +19,11 @@ factory :ip_pool do name { "Default Pool" } default { true } + + trait :with_ip_address do + after(:create) do |ip_pool| + ip_pool.ip_addresses << create(:ip_address, ip_pool: ip_pool) + end + end end end diff --git a/spec/models/queued_message_spec.rb b/spec/models/queued_message_spec.rb new file mode 100644 index 000000000..88c77e5e4 --- /dev/null +++ b/spec/models/queued_message_spec.rb @@ -0,0 +1,197 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe QueuedMessage do + subject(:queued_message) { build(:queued_message) } + + describe "relationships" do + it { is_expected.to belong_to(:server) } + it { is_expected.to belong_to(:ip_address).optional } + end + + describe ".ready_with_delayed_retry" do + it "returns messages where retry after is null" do + message = create(:queued_message, retry_after: nil) + expect(described_class.ready_with_delayed_retry).to eq [message] + end + + it "returns messages where retry after is less than 30 seconds from now" do + Timecop.freeze do + message1 = create(:queued_message, retry_after: 45.seconds.ago) + message2 = create(:queued_message, retry_after: 5.minutes.ago) + create(:queued_message, retry_after: Time.now) + create(:queued_message, retry_after: 1.minute.from_now) + expect(described_class.ready_with_delayed_retry).to eq [message1, message2] + end + end + end + + describe ".with_stale_lock" do + it "returns messages where lock time is less than the configured number of stale days" do + allow(Postal::Config.postal).to receive(:queued_message_lock_stale_days).and_return(2) + message1 = create(:queued_message, locked_at: 3.days.ago, locked_by: "test") + message2 = create(:queued_message, locked_at: 2.days.ago, locked_by: "test") + create(:queued_message, locked_at: 1.days.ago, locked_by: "test") + create(:queued_message) + expect(described_class.with_stale_lock).to eq [message1, message2] + end + end + + describe "#retry_now" do + it "removes the retry time" do + message = create(:queued_message, retry_after: 2.minutes.from_now) + expect { message.retry_now }.to change { message.reload.retry_after }.from(kind_of(Time)).to(nil) + end + + it "raises an error if invalid" do + message = create(:queued_message, retry_after: 2.minutes.from_now) + message.update_columns(server_id: nil) # unlikely to actually happen + expect { message.retry_now }.to raise_error(ActiveRecord::RecordInvalid) + end + end + + describe "#send_bounce" do + let(:server) { create(:server) } + let(:message) { MessageFactory.incoming(server) } + + subject(:queued_message) { create(:queued_message, message: message) } + + context "when the message is eligiable for bounces" do + it "queues a bounce message for sending" do + expect(BounceMessage).to receive(:new).with(server, kind_of(Postal::MessageDB::Message)).and_wrap_original do |original, *args| + bounce = original.call(*args) + expect(bounce).to receive(:queue) + bounce + end + queued_message.send_bounce + end + end + + context "when the message is not eligible for bounces" do + it "returns nil" do + message.update(bounce: true) + expect(queued_message.send_bounce).to be nil + end + + it "does not queue a bounce message for sending" do + message.update(bounce: true) + expect(BounceMessage).not_to receive(:new) + queued_message.send_bounce + end + end + end + + describe "#allocate_ip_address" do + subject(:queued_message) { create(:queued_message) } + + context "when ip pools is disabled" do + it "returns nil" do + expect(queued_message.allocate_ip_address).to be nil + end + + it "does not allocate an IP address" do + expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address) + end + end + + context "when IP pools is enabled" do + before do + allow(Postal::Config.postal).to receive(:use_ip_pools?).and_return(true) + end + + context "when there is no backend message" do + it "returns nil" do + expect(queued_message.allocate_ip_address).to be nil + end + + it "does not allocate an IP address" do + expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address) + end + end + + context "when no IP pool can be determined for the message" do + let(:server) { create(:server) } + let(:message) { MessageFactory.outgoing(server) } + + subject(:queued_message) { create(:queued_message, message: message) } + + it "returns nil" do + expect(queued_message.allocate_ip_address).to be nil + end + + it "does not allocate an IP address" do + expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address) + end + end + + context "when an IP pool can be determined for the message" do + let(:ip_pool) { create(:ip_pool, :with_ip_address) } + let(:server) { create(:server, ip_pool: ip_pool) } + let(:message) { MessageFactory.outgoing(server) } + + subject(:queued_message) { create(:queued_message, message: message) } + + it "returns an IP address" do + expect(queued_message.allocate_ip_address).to be_a IPAddress + end + + it "allocates an IP address to the queued message" do + queued_message.update(ip_address: nil) + expect { queued_message.allocate_ip_address }.to change(queued_message, :ip_address).from(nil).to(ip_pool.ip_addresses.first) + end + end + end + end + + describe "#batchable_messages" do + context "when the message is not locked" do + subject(:queued_message) { build(:queued_message) } + + it "raises an error" do + expect { queued_message.batchable_messages }.to raise_error(Postal::Error, /must lock current message before locking any friends/i) + end + end + + context "when the message is locked" do + let(:batch_key) { nil } + subject(:queued_message) { build(:queued_message, :locked, batch_key: batch_key) } + + context "when there is no batch key on the queued message" do + it "returns an empty array" do + expect(queued_message.batch_key).to be nil + expect(queued_message.batchable_messages).to eq [] + end + end + + context "when there is a batch key" do + let(:batch_key) { "1234" } + + it "finds and locks messages with the same batch key and IP address up to the limit specified" do + other_message1 = create(:queued_message, batch_key: batch_key, ip_address: nil) + other_message2 = create(:queued_message, batch_key: batch_key, ip_address: nil) + create(:queued_message, batch_key: batch_key, ip_address: nil) + + messages = queued_message.batchable_messages(2) + expect(messages).to eq [other_message1, other_message2] + expect(messages).to all be_locked + end + + it "does not find messages with a different batch key" do + create(:queued_message, batch_key: "5678", ip_address: nil) + expect(queued_message.batchable_messages).to eq [] + end + + it "does not find messages that are not queued for sending yet" do + create(:queued_message, batch_key: batch_key, ip_address: nil, retry_after: 1.minute.from_now) + expect(queued_message.batchable_messages).to eq [] + end + + it "does not find messages that are for a different IP address" do + create(:queued_message, batch_key: batch_key, ip_address: create(:ip_address)) + expect(queued_message.batchable_messages).to eq [] + end + end + end + end +end diff --git a/spec/scheduled_tasks/tidy_queued_messages_task_spec.rb b/spec/scheduled_tasks/tidy_queued_messages_task_spec.rb new file mode 100644 index 000000000..87da1146c --- /dev/null +++ b/spec/scheduled_tasks/tidy_queued_messages_task_spec.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe TidyQueuedMessagesTask do + let(:logger) { TestLogger.new } + + subject(:task) { described_class.new(logger: logger) } + + describe "#call" do + it "destroys queued messages with stale locks" do + stale_message = create(:queued_message, locked_at: 2.days.ago, locked_by: "test") + task.call + expect { stale_message.reload }.to raise_error(ActiveRecord::RecordNotFound) + expect(logger).to have_logged(/removing queued message \d+/) + end + + it "does not destroy messages which are not logged" do + message = create(:queued_message) + task.call + expect { message.reload }.not_to raise_error + end + + it "does not destroy messages which where were locked less then the number of stale days" do + message = create(:queued_message, locked_at: 10.minutes.ago, locked_by: "test") + task.call + expect { message.reload }.not_to raise_error + end + end +end