Skip to content

Commit

Permalink
Optimize PG reservation query for PG >=9.5
Browse files Browse the repository at this point in the history
What does this optimization do?

It handles an issue where two DJ servers "workers" would try to pull the
top `delayed_jobs` record at the same time. One would succeed and the
second would block.

"SKIP LOCKED", introduced in PostgreSQL 9.5, allows the second query to
skip records that are already locked by another transaction. This
results in significant improvement in performance when there are
multiple worker servers all trying to get the next available job row at
the same time.

What about older PostgreSQL versions?

ActiveRecord requires PG>=9.3, so we cannot rely on the built-in checks.
But we can use the same mechanisms that AR provides to check the current
version and enable an optimization in that case. Meaning newer versions
of AR (at least 5.0+) will inspect the current PG version and make the
optimization if they can. We're not going to worry about older ARs as
everything that old is EOL'd anyhow. And if they're running a Rails that
old, the PG is likely older too.
  • Loading branch information
stevenharman committed Aug 10, 2023
1 parent 97f26a3 commit 38613db
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion lib/delayed/backend/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,17 @@ def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
# Note: active_record would attempt to generate UPDATE...LIMIT like
# SQL for Postgres if we use a .limit() filter, but it would not
# use 'FOR UPDATE' and we would have many locking conflicts
# On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other
# when attempting to get the next available job
# https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
skip_locked = ""
if connection.respond_to?(:postgresql_version) && connection.postgresql_version >= 90500
skip_locked = " SKIP LOCKED"
end

quoted_name = connection.quote_table_name(table_name)
subquery = ready_scope.limit(1).lock(true).select("id").to_sql
sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *"
sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}#{skip_locked}) RETURNING *"
reserved = find_by_sql([sql, now, worker.name])
reserved[0]
end
Expand Down

0 comments on commit 38613db

Please sign in to comment.