Skip to content

Commit

Permalink
Merge branch 'expose-good-job-labels-in-dashboard' of github.com:BCla…
Browse files Browse the repository at this point in the history
…rk88/good_job into expose-good-job-labels-in-dashboard
  • Loading branch information
BClark88 committed Dec 13, 2024
2 parents 171363a + 8e579d1 commit 0f0b9db
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 6 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# Changelog

## [v4.6.0](https://github.com/bensheldon/good_job/tree/v4.6.0) (2024-12-12)

[Full Changelog](https://github.com/bensheldon/good_job/compare/v4.5.1...v4.6.0)

**Implemented enhancements:**

- Set job execution thread priority to `-3` when in async mode [\#1560](https://github.com/bensheldon/good_job/pull/1560) ([bensheldon](https://github.com/bensheldon))

**Closed issues:**

- Attaching metadata to jobs [\#1558](https://github.com/bensheldon/good_job/issues/1558)
- Lower Ruby Thread priority for jobs by default when running in Async mode [\#1554](https://github.com/bensheldon/good_job/issues/1554)
- NoMethodError: undefined method `\<' for nil \(process.rb:125 in stale?\) [\#1363](https://github.com/bensheldon/good_job/issues/1363)
- Install PgHero on the Demo app [\#1166](https://github.com/bensheldon/good_job/issues/1166)

**Merged pull requests:**

- Bump rails-html-sanitizer from 1.6.0 to 1.6.1 [\#1557](https://github.com/bensheldon/good_job/pull/1557) ([dependabot[bot]](https://github.com/apps/dependabot))
- Add PGHero to the demo app [\#1294](https://github.com/bensheldon/good_job/pull/1294) ([mec](https://github.com/mec))

## [v4.5.1](https://github.com/bensheldon/good_job/tree/v4.5.1) (2024-11-29)

[Full Changelog](https://github.com/bensheldon/good_job/compare/v4.5.0...v4.5.1)
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
good_job (4.5.1)
good_job (4.6.0)
activejob (>= 6.1.0)
activerecord (>= 6.1.0)
concurrent-ruby (>= 1.3.1)
Expand Down
1 change: 1 addition & 0 deletions checksums/good_job-4.6.0.gem.sha256
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a5287e96f2d756199107954a1621e2c1ab3fd6cc6f6a7021ae8cd5e9773d264a
1 change: 1 addition & 0 deletions checksums/good_job-4.6.0.gem.sha512
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5057a9bf9bc45c8462e4ff90a14c13e6e15fedf553ba53ef3964de96fdf65061cbe57a269c359268d9f53f0ceafceaeba2a6c2623ff10fc98b8b0db5f93b31c4
1 change: 1 addition & 0 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def start_async
return unless execute_async?

@capsule.start
@capsule.lower_thread_priority = true if GoodJob.configuration.lower_thread_priority.in?([true, nil])
@_async_started = true
end

Expand Down
10 changes: 9 additions & 1 deletion lib/good_job/capsule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def initialize(configuration: nil)

@shared_executor = GoodJob::SharedExecutor.new
@tracker = GoodJob::CapsuleTracker.new(executor: @shared_executor)
@lower_thread_priority = nil

self.class.instances << self
end
Expand All @@ -38,7 +39,9 @@ def start(force: false)

@notifier = GoodJob::Notifier.new(enable_listening: configuration.enable_listen_notify, capsule: self, executor: @shared_executor)
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true).tap do |multischeduler|
multischeduler.lower_thread_priority = @lower_thread_priority unless @lower_thread_priority.nil?
end
@notifier.recipients.push([@multi_scheduler, :create_thread])
@poller.recipients.push(-> { @multi_scheduler.create_thread({ fanout: true }) })

Expand Down Expand Up @@ -110,6 +113,11 @@ def process_id
@tracker.process_id
end

def lower_thread_priority=(value)
@lower_thread_priority = value
@multi_scheduler&.lower_thread_priority = value
end

private

def configuration
Expand Down
8 changes: 8 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ def in_webserver?
end || false
end

def lower_thread_priority
return options[:lower_thread_priority] unless options[:lower_thread_priority].nil?
return rails_config[:lower_thread_priority] unless rails_config[:lower_thread_priority].nil?
return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_LOWER_THREAD_PRIORITY']) unless env['GOOD_JOB_LOWER_THREAD_PRIORITY'].nil?

nil
end

# Whether to take an advisory lock on the process record in the notifier reactor.
# @return [Boolean]
def advisory_lock_heartbeat
Expand Down
9 changes: 8 additions & 1 deletion lib/good_job/multi_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_
max_cache: configuration.max_cache,
warm_cache_on_initialize: warm_cache_on_initialize,
cleanup_interval_seconds: configuration.cleanup_interval_seconds,
cleanup_interval_jobs: configuration.cleanup_interval_jobs
cleanup_interval_jobs: configuration.cleanup_interval_jobs,
lower_thread_priority: configuration.lower_thread_priority
)
end

Expand Down Expand Up @@ -85,6 +86,12 @@ def create_thread(state = nil)
end
end

def lower_thread_priority=(value)
schedulers.each do |scheduler|
scheduler.lower_thread_priority = value
end
end

def stats
scheduler_stats = schedulers.map(&:stats)

Expand Down
13 changes: 12 additions & 1 deletion lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class Scheduler
fallback_policy: :discard,
}.freeze

# In CRuby, this sets the thread quantum to ~12.5ms ( 100ms * 2^(-3) ).
LOW_THREAD_PRIORITY = -3

# @!attribute [r] instances
# @!scope class
# List of all instantiated Schedulers in the current process.
Expand All @@ -39,13 +42,18 @@ class Scheduler
# @return [String]
attr_reader :name

# Whether to lower the thread priority to a fixed value
# @return [Boolean]
attr_accessor :lower_thread_priority

# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
# @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+
# @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records
# @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil)
# @param lower_thread_priority [Boolean] whether to lower the thread priority of execution threads
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil, lower_thread_priority: false)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
Expand All @@ -62,6 +70,8 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia
@cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
@executor_options[:name] = name

self.lower_thread_priority = lower_thread_priority

create_executor
warm_cache if warm_cache_on_initialize
self.class.instances << self
Expand Down Expand Up @@ -271,6 +281,7 @@ def create_task(delay = 0, fanout: false)
future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Thread.current[:good_job_scheduler] = thr_scheduler
Thread.current.priority = -3 if thr_scheduler.lower_thread_priority

Rails.application.reloader.wrap do
thr_performer.next do |found|
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module GoodJob
# GoodJob gem version.
VERSION = '4.5.1'
VERSION = '4.6.0'

# GoodJob version as Gem::Version object
GEM_VERSION = Gem::Version.new(VERSION)
Expand Down
12 changes: 11 additions & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def perform(succeed: true)
allow(GoodJob::Job).to receive(:enqueue).and_return(good_job)
allow(GoodJob::Notifier).to receive(:notify)

capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil)
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

Expand All @@ -99,6 +99,16 @@ def perform(succeed: true)
expect(capsule).to have_received(:create_thread)
expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' })
end

it 'lowers the thread priority of the capsule' do
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

described_class.new(execution_mode: :async_all)

expect(capsule).to have_received(:lower_thread_priority=).with(true)
end
end
end

Expand Down

0 comments on commit 0f0b9db

Please sign in to comment.