Skip to content

Commit

Permalink
feat: automatically remove queued messages with stale locks
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcooke committed Mar 12, 2024
1 parent 5d8213a commit 84e2850
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 4 deletions.
3 changes: 2 additions & 1 deletion app/lib/worker/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class Process
ProcessMessageRetentionScheduledTask,
PruneSuppressionListsScheduledTask,
PruneWebhookRequestsScheduledTask,
SendNotificationsScheduledTask
SendNotificationsScheduledTask,
TidyQueuedMessagesTask
].freeze

# @param [Integer] thread_count The number of worker threads to run in this process
Expand Down
8 changes: 5 additions & 3 deletions app/models/queued_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class QueuedMessage < ApplicationRecord

belongs_to :server
belongs_to :ip_address, optional: true
belongs_to :user, optional: true

before_create :allocate_ip_address

scope :ready_with_delayed_retry, -> { where("retry_after IS NULL OR retry_after < ?", 30.seconds.ago) }
scope :with_stale_lock, -> { where("locked_at IS NOT NULL AND locked_at < ?", Postal::Config.postal.queued_message_lock_stale_days.days.ago) }

def retry_now
update(retry_after: nil)
update!(retry_after: nil)
end

def send_bounce
Expand All @@ -50,9 +50,11 @@ def send_bounce
end

def allocate_ip_address
return unless Postal.ip_pools? && message && pool = server.ip_pool_for_message(message)
return unless Postal.ip_pools? && pool = server.ip_pool_for_message(message)

self.ip_address = pool.ip_addresses.select_by_priority
rescue Postal::MessageDB::Message::NotFound
nil
end

def batchable_messages(limit = 10)
Expand Down
18 changes: 18 additions & 0 deletions app/scheduled_tasks/tidy_queued_messages_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

class TidyQueuedMessagesTask < ApplicationScheduledTask

def call
QueuedMessage.with_stale_lock.in_batches do |messages|
messages.each do |message|
logger.info "removing queued message #{message.id} (locked at #{message.locked_at} by #{message.locked_by})"
message.destroy
end
end
end

def self.next_run_after
quarter_to_each_hour
end

end
1 change: 1 addition & 0 deletions doc/config/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This document contains all the environment variables which are available for thi
| `POSTAL_SIGNING_KEY_PATH` | String | Path to the private key used for signing | $config-file-root/signing.key |
| `POSTAL_SMTP_RELAYS` | Array of strings | An array of SMTP relays in the format of smtp://host:port | [] |
| `POSTAL_TRUSTED_PROXIES` | Array of strings | An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses) | [] |
| `POSTAL_QUEUED_MESSAGE_LOCK_STALE_DAYS` | Integer | The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried. | 1 |
| `WEB_SERVER_DEFAULT_PORT` | Integer | The default port the web server should listen on unless overriden by the PORT environment variable | 5000 |
| `WEB_SERVER_DEFAULT_BIND_ADDRESS` | String | The default bind address the web server should listen on unless overriden by the BIND_ADDRESS environment variable | 127.0.0.1 |
| `WEB_SERVER_MAX_THREADS` | Integer | The maximum number of threads which can be used by the web server | 5 |
Expand Down
2 changes: 2 additions & 0 deletions doc/config/yaml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ postal:
smtp_relays: []
# An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses)
trusted_proxies: []
# The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried.
queued_message_lock_stale_days: 1

web_server:
# The default port the web server should listen on unless overriden by the PORT environment variable
Expand Down
5 changes: 5 additions & 0 deletions lib/postal/config_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ module Postal
description "An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses)"
transform { |ip| IPAddr.new(ip) }
end

integer :queued_message_lock_stale_days do
description "The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried."
default 1
end
end

group :web_server do
Expand Down
6 changes: 6 additions & 0 deletions spec/factories/ip_pool_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,11 @@
factory :ip_pool do
name { "Default Pool" }
default { true }

trait :with_ip_address do
after(:create) do |ip_pool|
ip_pool.ip_addresses << create(:ip_address, ip_pool: ip_pool)
end
end
end
end
197 changes: 197 additions & 0 deletions spec/models/queued_message_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# frozen_string_literal: true

require "rails_helper"

RSpec.describe QueuedMessage do
subject(:queued_message) { build(:queued_message) }

describe "relationships" do
it { is_expected.to belong_to(:server) }
it { is_expected.to belong_to(:ip_address).optional }
end

describe ".ready_with_delayed_retry" do
it "returns messages where retry after is null" do
message = create(:queued_message, retry_after: nil)
expect(described_class.ready_with_delayed_retry).to eq [message]
end

it "returns messages where retry after is less than 30 seconds from now" do
Timecop.freeze do
message1 = create(:queued_message, retry_after: 45.seconds.ago)
message2 = create(:queued_message, retry_after: 5.minutes.ago)
create(:queued_message, retry_after: Time.now)
create(:queued_message, retry_after: 1.minute.from_now)
expect(described_class.ready_with_delayed_retry).to eq [message1, message2]
end
end
end

describe ".with_stale_lock" do
it "returns messages where lock time is less than the configured number of stale days" do
allow(Postal::Config.postal).to receive(:queued_message_lock_stale_days).and_return(2)
message1 = create(:queued_message, locked_at: 3.days.ago, locked_by: "test")
message2 = create(:queued_message, locked_at: 2.days.ago, locked_by: "test")
create(:queued_message, locked_at: 1.days.ago, locked_by: "test")
create(:queued_message)
expect(described_class.with_stale_lock).to eq [message1, message2]
end
end

describe "#retry_now" do
it "removes the retry time" do
message = create(:queued_message, retry_after: 2.minutes.from_now)
expect { message.retry_now }.to change { message.reload.retry_after }.from(kind_of(Time)).to(nil)
end

it "raises an error if invalid" do
message = create(:queued_message, retry_after: 2.minutes.from_now)
message.update_columns(server_id: nil) # unlikely to actually happen
expect { message.retry_now }.to raise_error(ActiveRecord::RecordInvalid)
end
end

describe "#send_bounce" do
let(:server) { create(:server) }
let(:message) { MessageFactory.incoming(server) }

subject(:queued_message) { create(:queued_message, message: message) }

context "when the message is eligiable for bounces" do
it "queues a bounce message for sending" do
expect(BounceMessage).to receive(:new).with(server, kind_of(Postal::MessageDB::Message)).and_wrap_original do |original, *args|
bounce = original.call(*args)
expect(bounce).to receive(:queue)
bounce
end
queued_message.send_bounce
end
end

context "when the message is not eligible for bounces" do
it "returns nil" do
message.update(bounce: true)
expect(queued_message.send_bounce).to be nil
end

it "does not queue a bounce message for sending" do
message.update(bounce: true)
expect(BounceMessage).not_to receive(:new)
queued_message.send_bounce
end
end
end

describe "#allocate_ip_address" do
subject(:queued_message) { create(:queued_message) }

context "when ip pools is disabled" do
it "returns nil" do
expect(queued_message.allocate_ip_address).to be nil
end

it "does not allocate an IP address" do
expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address)
end
end

context "when IP pools is enabled" do
before do
allow(Postal::Config.postal).to receive(:use_ip_pools?).and_return(true)
end

context "when there is no backend message" do
it "returns nil" do
expect(queued_message.allocate_ip_address).to be nil
end

it "does not allocate an IP address" do
expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address)
end
end

context "when no IP pool can be determined for the message" do
let(:server) { create(:server) }
let(:message) { MessageFactory.outgoing(server) }

subject(:queued_message) { create(:queued_message, message: message) }

it "returns nil" do
expect(queued_message.allocate_ip_address).to be nil
end

it "does not allocate an IP address" do
expect { queued_message.allocate_ip_address }.not_to change(queued_message, :ip_address)
end
end

context "when an IP pool can be determined for the message" do
let(:ip_pool) { create(:ip_pool, :with_ip_address) }
let(:server) { create(:server, ip_pool: ip_pool) }
let(:message) { MessageFactory.outgoing(server) }

subject(:queued_message) { create(:queued_message, message: message) }

it "returns an IP address" do
expect(queued_message.allocate_ip_address).to be_a IPAddress
end

it "allocates an IP address to the queued message" do
queued_message.update(ip_address: nil)
expect { queued_message.allocate_ip_address }.to change(queued_message, :ip_address).from(nil).to(ip_pool.ip_addresses.first)
end
end
end
end

describe "#batchable_messages" do
context "when the message is not locked" do
subject(:queued_message) { build(:queued_message) }

it "raises an error" do
expect { queued_message.batchable_messages }.to raise_error(Postal::Error, /must lock current message before locking any friends/i)
end
end

context "when the message is locked" do
let(:batch_key) { nil }
subject(:queued_message) { build(:queued_message, :locked, batch_key: batch_key) }

context "when there is no batch key on the queued message" do
it "returns an empty array" do
expect(queued_message.batch_key).to be nil
expect(queued_message.batchable_messages).to eq []
end
end

context "when there is a batch key" do
let(:batch_key) { "1234" }

it "finds and locks messages with the same batch key and IP address up to the limit specified" do
other_message1 = create(:queued_message, batch_key: batch_key, ip_address: nil)
other_message2 = create(:queued_message, batch_key: batch_key, ip_address: nil)
create(:queued_message, batch_key: batch_key, ip_address: nil)

messages = queued_message.batchable_messages(2)
expect(messages).to eq [other_message1, other_message2]
expect(messages).to all be_locked
end

it "does not find messages with a different batch key" do
create(:queued_message, batch_key: "5678", ip_address: nil)
expect(queued_message.batchable_messages).to eq []
end

it "does not find messages that are not queued for sending yet" do
create(:queued_message, batch_key: batch_key, ip_address: nil, retry_after: 1.minute.from_now)
expect(queued_message.batchable_messages).to eq []
end

it "does not find messages that are for a different IP address" do
create(:queued_message, batch_key: batch_key, ip_address: create(:ip_address))
expect(queued_message.batchable_messages).to eq []
end
end
end
end
end
30 changes: 30 additions & 0 deletions spec/scheduled_tasks/tidy_queued_messages_task_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

require "rails_helper"

RSpec.describe TidyQueuedMessagesTask do
let(:logger) { TestLogger.new }

subject(:task) { described_class.new(logger: logger) }

describe "#call" do
it "destroys queued messages with stale locks" do
stale_message = create(:queued_message, locked_at: 2.days.ago, locked_by: "test")
task.call
expect { stale_message.reload }.to raise_error(ActiveRecord::RecordNotFound)
expect(logger).to have_logged(/removing queued message \d+/)
end

it "does not destroy messages which are not logged" do
message = create(:queued_message)
task.call
expect { message.reload }.not_to raise_error
end

it "does not destroy messages which where were locked less then the number of stale days" do
message = create(:queued_message, locked_at: 10.minutes.ago, locked_by: "test")
task.call
expect { message.reload }.not_to raise_error
end
end
end

0 comments on commit 84e2850

Please sign in to comment.