diff --git a/Gemfile.activerecord-6.1.lock b/Gemfile.activerecord-6.1.lock index 02d7f87..7fb7ddf 100644 --- a/Gemfile.activerecord-6.1.lock +++ b/Gemfile.activerecord-6.1.lock @@ -1,11 +1,11 @@ PATH remote: . specs: - switchman-inst-jobs (4.0.15) + switchman-inst-jobs (4.0.16) inst-jobs (>= 2.4.9, < 4.0) parallel (>= 1.19) railties (>= 6.1, < 7.2) - switchman (~> 3.1) + switchman (~> 3.1, >= 3.5.14) GEM remote: https://rubygems.org/ @@ -82,8 +82,12 @@ GEM crass (~> 1.0.2) nokogiri (>= 1.12.0) method_source (1.0.0) + mini_portile2 (2.8.4) minitest (5.20.0) newrelic_rpm (9.2.2) + nokogiri (1.15.4) + mini_portile2 (~> 2.8.2) + racc (~> 1.4) nokogiri (1.15.4-aarch64-linux) racc (~> 1.4) nokogiri (1.15.4-arm64-darwin) @@ -185,7 +189,7 @@ GEM simplecov_json_formatter (~> 0.1) simplecov-html (0.12.3) simplecov_json_formatter (0.1.4) - switchman (3.5.13) + switchman (3.5.14) activerecord (>= 6.1.4, < 7.2) guardrail (~> 3.0.1) parallel (~> 1.22) @@ -199,6 +203,7 @@ GEM PLATFORMS aarch64-linux arm64-darwin + ruby x86_64-darwin x86_64-linux diff --git a/Gemfile.activerecord-7.0.lock b/Gemfile.activerecord-7.0.lock index af6e79a..a71d0ac 100644 --- a/Gemfile.activerecord-7.0.lock +++ b/Gemfile.activerecord-7.0.lock @@ -1,11 +1,11 @@ PATH remote: . specs: - switchman-inst-jobs (4.0.15) + switchman-inst-jobs (4.0.16) inst-jobs (>= 2.4.9, < 4.0) parallel (>= 1.19) railties (>= 6.1, < 7.2) - switchman (~> 3.1) + switchman (~> 3.1, >= 3.5.14) GEM remote: https://rubygems.org/ @@ -81,8 +81,12 @@ GEM crass (~> 1.0.2) nokogiri (>= 1.12.0) method_source (1.0.0) + mini_portile2 (2.8.4) minitest (5.20.0) newrelic_rpm (9.2.2) + nokogiri (1.15.4) + mini_portile2 (~> 2.8.2) + racc (~> 1.4) nokogiri (1.15.4-aarch64-linux) racc (~> 1.4) nokogiri (1.15.4-arm64-darwin) @@ -185,7 +189,7 @@ GEM simplecov_json_formatter (~> 0.1) simplecov-html (0.12.3) simplecov_json_formatter (0.1.4) - switchman (3.5.13) + switchman (3.5.14) activerecord (>= 6.1.4, < 7.2) guardrail (~> 3.0.1) parallel (~> 1.22) @@ -199,6 +203,7 @@ GEM PLATFORMS aarch64-linux arm64-darwin + ruby x86_64-darwin x86_64-linux diff --git a/Gemfile.lock b/Gemfile.lock index 549634e..4a6da6e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,11 +1,11 @@ PATH remote: . specs: - switchman-inst-jobs (4.0.15) + switchman-inst-jobs (4.0.16) inst-jobs (>= 2.4.9, < 4.0) parallel (>= 1.19) railties (>= 6.1, < 7.2) - switchman (~> 3.1) + switchman (~> 3.1, >= 3.5.14) GEM remote: https://rubygems.org/ @@ -97,9 +97,13 @@ GEM crass (~> 1.0.2) nokogiri (>= 1.12.0) method_source (1.0.0) + mini_portile2 (2.8.4) minitest (5.20.0) mutex_m (0.1.2) newrelic_rpm (9.2.2) + nokogiri (1.15.4) + mini_portile2 (~> 2.8.2) + racc (~> 1.4) nokogiri (1.15.4-aarch64-linux) racc (~> 1.4) nokogiri (1.15.4-arm64-darwin) @@ -215,7 +219,7 @@ GEM simplecov-html (0.12.3) simplecov_json_formatter (0.1.4) stringio (3.0.8) - switchman (3.5.13) + switchman (3.5.14) activerecord (>= 6.1.4, < 7.2) guardrail (~> 3.0.1) parallel (~> 1.22) @@ -231,6 +235,7 @@ GEM PLATFORMS aarch64-linux arm64-darwin + ruby x86_64-darwin x86_64-linux diff --git a/lib/switchman_inst_jobs/delayed/pool.rb b/lib/switchman_inst_jobs/delayed/pool.rb index aa4dd64..74349b7 100644 --- a/lib/switchman_inst_jobs/delayed/pool.rb +++ b/lib/switchman_inst_jobs/delayed/pool.rb @@ -3,8 +3,17 @@ module SwitchmanInstJobs module Delayed module Pool + def initialize(*) + super + + raise "Cannot run jobs cross-region" unless shards.all?(&:in_current_region?) + end + def unlock_orphaned_jobs(worker = nil, pid = nil) if worker + # this is just a failsafe; it shouldn't be possible + return unless worker.shard.in_current_region? + shards = [worker.shard] else # Since we're not unlocking for a specific worker, look through @@ -19,13 +28,18 @@ def unlock_orphaned_jobs(worker = nil, pid = nil) # We purposely don't .compact to remove nils here, since if any # workers are on the default jobs shard we want to unlock against # that shard too. - shard_ids = @config[:workers].pluck(:shard).uniq - shards = shard_ids.map { |shard_id| ::Delayed::Worker.shard(shard_id) } + + shards = self.shards.select(&:in_current_region?) end ::Switchman::Shard.with_each_shard(shards, [::Delayed::Backend::ActiveRecord::AbstractJob]) do super end end + + def shards + shard_ids = @config[:workers].pluck(:shard).uniq + shard_ids.map { |shard_id| ::Delayed::Worker.shard(shard_id) } + end end end end diff --git a/lib/switchman_inst_jobs/delayed/worker.rb b/lib/switchman_inst_jobs/delayed/worker.rb index 5201ce5..674acc2 100644 --- a/lib/switchman_inst_jobs/delayed/worker.rb +++ b/lib/switchman_inst_jobs/delayed/worker.rb @@ -10,6 +10,9 @@ def self.prepended(base) def initialize(options = {}) # have to initialize this first, so #shard works @config = options + # this shouldn't be possible because of the pool check, but just in case + raise "Cannot run jobs cross-region" unless shard.in_current_region? + ::Delayed::Worker::HealthCheck.munge_service_name(shard) do super # ensure we get our own copy of the munged config diff --git a/lib/switchman_inst_jobs/version.rb b/lib/switchman_inst_jobs/version.rb index ff566c3..b5c60f8 100644 --- a/lib/switchman_inst_jobs/version.rb +++ b/lib/switchman_inst_jobs/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module SwitchmanInstJobs - VERSION = "4.0.15" + VERSION = "4.0.16" end diff --git a/spec/lib/switchman_inst_jobs/delayed/pool_spec.rb b/spec/lib/switchman_inst_jobs/delayed/pool_spec.rb index c0a2b5b..88c6e4f 100644 --- a/spec/lib/switchman_inst_jobs/delayed/pool_spec.rb +++ b/spec/lib/switchman_inst_jobs/delayed/pool_spec.rb @@ -7,13 +7,20 @@ let(:worker) { Delayed::Worker.new(worker_max_job_count: 1, shard: shard.id) } describe "pools" do + it "prevents creation of pools with workers in other regions" do + expect(Switchman::Shard).to receive(:lookup).with(shard.id).and_return(shard) + expect(shard).to receive(:in_current_region?).and_return(false) + + expect { Delayed::Pool.new({ workers: [{ shard: shard.id }] }) }.to raise_error("Cannot run jobs cross-region") + end + it "should unlock against the worker's shard" do allow(Delayed::Job).to receive(:unlock_orphaned_jobs) do expect(Switchman::Shard.current(Delayed::Backend::ActiveRecord::AbstractJob)) .to eq Switchman::Shard.default.delayed_jobs_shard 0 end - Delayed::Pool.new({}).send( + Delayed::Pool.new({ workers: [] }).send( :unlock_orphaned_jobs, Delayed::Worker.new, 1234 ) @@ -21,11 +28,11 @@ expect(Switchman::Shard.current(Delayed::Backend::ActiveRecord::AbstractJob)).to eq shard 0 end - Delayed::Pool.new({}).send(:unlock_orphaned_jobs, worker, 1234) + Delayed::Pool.new({ workers: [] }).send(:unlock_orphaned_jobs, worker, 1234) end it "should unlock against all configured shards" do - pool = Delayed::Pool.new({}) + pool = Delayed::Pool.new({ workers: [] }) pool.instance_variable_set( :@config, workers: [ diff --git a/spec/lib/switchman_inst_jobs/delayed/worker_spec.rb b/spec/lib/switchman_inst_jobs/delayed/worker_spec.rb index bf3dc90..3f0a8f3 100644 --- a/spec/lib/switchman_inst_jobs/delayed/worker_spec.rb +++ b/spec/lib/switchman_inst_jobs/delayed/worker_spec.rb @@ -7,6 +7,12 @@ let(:worker) { Delayed::Worker.new(worker_max_job_count: 1, shard: shard.id) } describe "workers" do + it "doesn't allow workers to be created for shards in other regions" do + expect(Switchman::Shard).to receive(:lookup).with(shard.id).and_return(shard) + expect(shard).to receive(:in_current_region?).and_return(false) + expect { worker }.to raise_error("Cannot run jobs cross-region") + end + it "should activate the jobs shard when calling run" do expect(Delayed::Job).to receive(:get_and_lock_next_available).once do expect(Switchman::Shard.current(Delayed::Backend::ActiveRecord::AbstractJob)).to eq shard diff --git a/switchman-inst-jobs.gemspec b/switchman-inst-jobs.gemspec index a90db12..49e162f 100644 --- a/switchman-inst-jobs.gemspec +++ b/switchman-inst-jobs.gemspec @@ -26,7 +26,7 @@ Gem::Specification.new do |s| s.add_dependency "inst-jobs", ">= 2.4.9", "< 4.0" s.add_dependency "parallel", ">= 1.19" s.add_dependency "railties", ">= 6.1", "< 7.2" - s.add_dependency "switchman", "~> 3.1" + s.add_dependency "switchman", "~> 3.1", ">= 3.5.14" s.add_development_dependency "bundler" s.add_development_dependency "byebug"