From 125be6d0ea86919cfc70894aa034f6f1c82db3bf Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Thu, 12 Dec 2024 07:04:56 -0800 Subject: [PATCH 1/2] Set job execution thread priority to `-3` when in async mode (#1560) --- lib/good_job/adapter.rb | 1 + lib/good_job/capsule.rb | 10 +++++++++- lib/good_job/configuration.rb | 8 ++++++++ lib/good_job/multi_scheduler.rb | 9 ++++++++- lib/good_job/scheduler.rb | 13 ++++++++++++- spec/lib/good_job/adapter_spec.rb | 12 +++++++++++- 6 files changed, 49 insertions(+), 4 deletions(-) diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 9f4cb47ad..bd593f5c3 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -216,6 +216,7 @@ def start_async return unless execute_async? @capsule.start + @capsule.lower_thread_priority = true if GoodJob.configuration.lower_thread_priority.in?([true, nil]) @_async_started = true end diff --git a/lib/good_job/capsule.rb b/lib/good_job/capsule.rb index afa82a2bb..86302a983 100644 --- a/lib/good_job/capsule.rb +++ b/lib/good_job/capsule.rb @@ -24,6 +24,7 @@ def initialize(configuration: nil) @shared_executor = GoodJob::SharedExecutor.new @tracker = GoodJob::CapsuleTracker.new(executor: @shared_executor) + @lower_thread_priority = nil self.class.instances << self end @@ -38,7 +39,9 @@ def start(force: false) @notifier = GoodJob::Notifier.new(enable_listening: configuration.enable_listen_notify, capsule: self, executor: @shared_executor) @poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval) - @multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true) + @multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true).tap do |multischeduler| + multischeduler.lower_thread_priority = @lower_thread_priority unless @lower_thread_priority.nil? + end @notifier.recipients.push([@multi_scheduler, :create_thread]) @poller.recipients.push(-> { @multi_scheduler.create_thread({ fanout: true }) }) @@ -110,6 +113,11 @@ def process_id @tracker.process_id end + def lower_thread_priority=(value) + @lower_thread_priority = value + @multi_scheduler&.lower_thread_priority = value + end + private def configuration diff --git a/lib/good_job/configuration.rb b/lib/good_job/configuration.rb index a0019c875..65ca7d666 100644 --- a/lib/good_job/configuration.rb +++ b/lib/good_job/configuration.rb @@ -383,6 +383,14 @@ def in_webserver? end || false end + def lower_thread_priority + return options[:lower_thread_priority] unless options[:lower_thread_priority].nil? + return rails_config[:lower_thread_priority] unless rails_config[:lower_thread_priority].nil? + return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_LOWER_THREAD_PRIORITY']) unless env['GOOD_JOB_LOWER_THREAD_PRIORITY'].nil? + + nil + end + # Whether to take an advisory lock on the process record in the notifier reactor. # @return [Boolean] def advisory_lock_heartbeat diff --git a/lib/good_job/multi_scheduler.rb b/lib/good_job/multi_scheduler.rb index f0815c10e..a5440587c 100644 --- a/lib/good_job/multi_scheduler.rb +++ b/lib/good_job/multi_scheduler.rb @@ -19,7 +19,8 @@ def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_ max_cache: configuration.max_cache, warm_cache_on_initialize: warm_cache_on_initialize, cleanup_interval_seconds: configuration.cleanup_interval_seconds, - cleanup_interval_jobs: configuration.cleanup_interval_jobs + cleanup_interval_jobs: configuration.cleanup_interval_jobs, + lower_thread_priority: configuration.lower_thread_priority ) end @@ -85,6 +86,12 @@ def create_thread(state = nil) end end + def lower_thread_priority=(value) + schedulers.each do |scheduler| + scheduler.lower_thread_priority = value + end + end + def stats scheduler_stats = schedulers.map(&:stats) diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 9af6b1ca3..32ad6d13e 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -29,6 +29,9 @@ class Scheduler fallback_policy: :discard, }.freeze + # In CRuby, this sets the thread quantum to ~12.5ms ( 100ms * 2^(-3) ). + LOW_THREAD_PRIORITY = -3 + # @!attribute [r] instances # @!scope class # List of all instantiated Schedulers in the current process. @@ -39,13 +42,18 @@ class Scheduler # @return [String] attr_reader :name + # Whether to lower the thread priority to a fixed value + # @return [Boolean] + attr_accessor :lower_thread_priority + # @param performer [GoodJob::JobPerformer] # @param max_threads [Numeric, nil] number of seconds between polls for jobs # @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory # @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+ # @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records # @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records - def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil) + # @param lower_thread_priority [Boolean] whether to lower the thread priority of execution threads + def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil, lower_thread_priority: false) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) @performer = performer @@ -62,6 +70,8 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs) @executor_options[:name] = name + self.lower_thread_priority = lower_thread_priority + create_executor warm_cache if warm_cache_on_initialize self.class.instances << self @@ -271,6 +281,7 @@ def create_task(delay = 0, fanout: false) future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer| Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name Thread.current[:good_job_scheduler] = thr_scheduler + Thread.current.priority = -3 if thr_scheduler.lower_thread_priority Rails.application.reloader.wrap do thr_performer.next do |found| diff --git a/spec/lib/good_job/adapter_spec.rb b/spec/lib/good_job/adapter_spec.rb index 6d3929493..a0312d836 100644 --- a/spec/lib/good_job/adapter_spec.rb +++ b/spec/lib/good_job/adapter_spec.rb @@ -88,7 +88,7 @@ def perform(succeed: true) allow(GoodJob::Job).to receive(:enqueue).and_return(good_job) allow(GoodJob::Notifier).to receive(:notify) - capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil) + capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil) allow(GoodJob).to receive(:capsule).and_return(capsule) allow(capsule).to receive(:start) @@ -99,6 +99,16 @@ def perform(succeed: true) expect(capsule).to have_received(:create_thread) expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' }) end + + it 'lowers the thread priority of the capsule' do + capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil) + allow(GoodJob).to receive(:capsule).and_return(capsule) + allow(capsule).to receive(:start) + + described_class.new(execution_mode: :async_all) + + expect(capsule).to have_received(:lower_thread_priority=).with(true) + end end end From 07ef18565844d6aa2a28e5d7d602f75d3550355d Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Thu, 12 Dec 2024 07:07:07 -0800 Subject: [PATCH 2/2] Release good_job v4.6.0 --- CHANGELOG.md | 20 ++++++++++++++++++++ Gemfile.lock | 2 +- checksums/good_job-4.6.0.gem.sha256 | 1 + checksums/good_job-4.6.0.gem.sha512 | 1 + lib/good_job/version.rb | 2 +- 5 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 checksums/good_job-4.6.0.gem.sha256 create mode 100644 checksums/good_job-4.6.0.gem.sha512 diff --git a/CHANGELOG.md b/CHANGELOG.md index 14f8c930b..15b1f1c40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,25 @@ # Changelog +## [v4.6.0](https://github.com/bensheldon/good_job/tree/v4.6.0) (2024-12-12) + +[Full Changelog](https://github.com/bensheldon/good_job/compare/v4.5.1...v4.6.0) + +**Implemented enhancements:** + +- Set job execution thread priority to `-3` when in async mode [\#1560](https://github.com/bensheldon/good_job/pull/1560) ([bensheldon](https://github.com/bensheldon)) + +**Closed issues:** + +- Attaching metadata to jobs [\#1558](https://github.com/bensheldon/good_job/issues/1558) +- Lower Ruby Thread priority for jobs by default when running in Async mode [\#1554](https://github.com/bensheldon/good_job/issues/1554) +- NoMethodError: undefined method `\<' for nil \(process.rb:125 in stale?\) [\#1363](https://github.com/bensheldon/good_job/issues/1363) +- Install PgHero on the Demo app [\#1166](https://github.com/bensheldon/good_job/issues/1166) + +**Merged pull requests:** + +- Bump rails-html-sanitizer from 1.6.0 to 1.6.1 [\#1557](https://github.com/bensheldon/good_job/pull/1557) ([dependabot[bot]](https://github.com/apps/dependabot)) +- Add PGHero to the demo app [\#1294](https://github.com/bensheldon/good_job/pull/1294) ([mec](https://github.com/mec)) + ## [v4.5.1](https://github.com/bensheldon/good_job/tree/v4.5.1) (2024-11-29) [Full Changelog](https://github.com/bensheldon/good_job/compare/v4.5.0...v4.5.1) diff --git a/Gemfile.lock b/Gemfile.lock index ff0d62b91..4e029d215 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - good_job (4.5.1) + good_job (4.6.0) activejob (>= 6.1.0) activerecord (>= 6.1.0) concurrent-ruby (>= 1.3.1) diff --git a/checksums/good_job-4.6.0.gem.sha256 b/checksums/good_job-4.6.0.gem.sha256 new file mode 100644 index 000000000..b9a9f6a93 --- /dev/null +++ b/checksums/good_job-4.6.0.gem.sha256 @@ -0,0 +1 @@ +a5287e96f2d756199107954a1621e2c1ab3fd6cc6f6a7021ae8cd5e9773d264a diff --git a/checksums/good_job-4.6.0.gem.sha512 b/checksums/good_job-4.6.0.gem.sha512 new file mode 100644 index 000000000..f8fab8029 --- /dev/null +++ b/checksums/good_job-4.6.0.gem.sha512 @@ -0,0 +1 @@ +5057a9bf9bc45c8462e4ff90a14c13e6e15fedf553ba53ef3964de96fdf65061cbe57a269c359268d9f53f0ceafceaeba2a6c2623ff10fc98b8b0db5f93b31c4 diff --git a/lib/good_job/version.rb b/lib/good_job/version.rb index 3a110dee0..1b90335be 100644 --- a/lib/good_job/version.rb +++ b/lib/good_job/version.rb @@ -2,7 +2,7 @@ module GoodJob # GoodJob gem version. - VERSION = '4.5.1' + VERSION = '4.6.0' # GoodJob version as Gem::Version object GEM_VERSION = Gem::Version.new(VERSION)