From 8aa1e67bd57706311741c153a4f292cdf9ae8af6 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Wed, 12 Feb 2014 01:40:32 -0500 Subject: [PATCH 01/12] Collapsing lib/resque_scheduler into lib/resque/scheduler plus oodles of other changes related to the move, wheeeee! --- .gitignore | 13 +- README.md | 12 +- Rakefile | 3 + bin/resque-scheduler | 4 +- examples/Rakefile | 2 +- examples/config/initializers/resque-web.rb | 4 +- .../dynamic-scheduling/lib/tasks/resque.rake | 4 +- lib/resque-scheduler.rb | 4 +- lib/resque/scheduler.rb | 25 +- lib/resque/scheduler/cli.rb | 164 +++++++ lib/resque/scheduler/extension.rb | 397 +++++++++++++++++ lib/resque/scheduler/lock/base.rb | 2 +- lib/resque/scheduler/lock/basic.rb | 4 +- lib/resque/scheduler/lock/resilient.rb | 4 +- .../locking.rb} | 58 +-- lib/resque/scheduler/logger_builder.rb | 70 +++ lib/resque/scheduler/plugin.rb | 31 ++ lib/resque/scheduler/server.rb | 197 +++++++++ .../scheduler}/server/views/delayed.erb | 0 .../server/views/delayed_schedules.erb | 0 .../server/views/delayed_timestamp.erb | 0 .../server/views/requeue-params.erb | 0 .../scheduler}/server/views/scheduler.erb | 0 .../scheduler}/server/views/search.erb | 0 .../scheduler}/server/views/search_form.erb | 0 .../scheduler}/tasks.rb | 4 +- lib/resque/scheduler/util.rb | 41 ++ lib/resque/scheduler/version.rb | 7 + lib/resque_scheduler.rb | 403 ------------------ lib/resque_scheduler/cli.rb | 160 ------- lib/resque_scheduler/logger_builder.rb | 70 --- lib/resque_scheduler/plugin.rb | 29 -- lib/resque_scheduler/server.rb | 195 --------- lib/resque_scheduler/util.rb | 36 -- lib/resque_scheduler/version.rb | 5 - resque-scheduler.gemspec | 6 +- tasks/resque_scheduler.rake | 2 +- test/cli_test.rb | 4 +- test/scheduler_args_test.rb | 7 +- test/scheduler_locking_test.rb | 12 +- test/scheduler_task_test.rb | 7 +- test/scheduler_test.rb | 2 +- test/test_helper.rb | 4 +- test/util_test.rb | 4 +- 44 files changed, 1012 insertions(+), 984 deletions(-) create mode 100644 lib/resque/scheduler/cli.rb create mode 100644 lib/resque/scheduler/extension.rb rename lib/resque/{scheduler_locking.rb => scheduler/locking.rb} (77%) create mode 100644 lib/resque/scheduler/logger_builder.rb create mode 100644 lib/resque/scheduler/plugin.rb create mode 100644 lib/resque/scheduler/server.rb rename lib/{resque_scheduler => resque/scheduler}/server/views/delayed.erb (100%) rename lib/{resque_scheduler => resque/scheduler}/server/views/delayed_schedules.erb (100%) rename lib/{resque_scheduler => resque/scheduler}/server/views/delayed_timestamp.erb (100%) rename lib/{resque_scheduler => resque/scheduler}/server/views/requeue-params.erb (100%) rename lib/{resque_scheduler => resque/scheduler}/server/views/scheduler.erb (100%) rename lib/{resque_scheduler => resque/scheduler}/server/views/search.erb (100%) rename lib/{resque_scheduler => resque/scheduler}/server/views/search_form.erb (100%) rename lib/{resque_scheduler => resque/scheduler}/tasks.rb (85%) create mode 100644 lib/resque/scheduler/util.rb create mode 100644 lib/resque/scheduler/version.rb delete mode 100644 lib/resque_scheduler.rb delete mode 100644 lib/resque_scheduler/cli.rb delete mode 100644 lib/resque_scheduler/logger_builder.rb delete mode 100644 lib/resque_scheduler/plugin.rb delete mode 100644 lib/resque_scheduler/server.rb delete mode 100644 lib/resque_scheduler/util.rb delete mode 100644 lib/resque_scheduler/version.rb diff --git a/.gitignore b/.gitignore index 7d8cb014..e29957fa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,12 @@ -.bundle/ -.idea/ +/.bundle/ +/.idea/ +/.yardoc/ -doc/ -pkg +/doc/ +/pkg/ nbproject -Gemfile.lock -.rvmrc +/Gemfile.lock +/.rvmrc *.swp /coverage/ diff --git a/README.md b/README.md index 97da4e3d..caeec3a3 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ gem 'resque-scheduler' Adding the resque:scheduler rake task: ```ruby -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' ``` ### Rake integration @@ -66,12 +66,12 @@ everything `resque` needs to know. ```ruby # Resque tasks require 'resque/tasks' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' namespace :resque do task :setup do require 'resque' - require 'resque_scheduler' + require 'resque-scheduler' # you probably already have this somewhere Resque.redis = 'localhost:6379' @@ -448,7 +448,7 @@ redis instance and schedule. The scheduler processes will use redis to elect a master process and detect failover when the master dies. Precautions are taken to prevent jobs from potentially being queued twice during failover even when the clocks of the scheduler machines are slightly out of sync (or load affects -scheduled job firing time). If you want the gory details, look at Resque::SchedulerLocking. +scheduled job firing time). If you want the gory details, look at Resque::Scheduler::Locking. If the scheduler process(es) goes down for whatever reason, the delayed items that should have fired during the outage will fire once the scheduler process @@ -495,8 +495,8 @@ Now, you want to add the following: ```ruby # This will make the tabs show up. -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' ``` That should make the scheduler tabs show up in `resque-web`. diff --git a/Rakefile b/Rakefile index 75283ee1..93a3b210 100644 --- a/Rakefile +++ b/Rakefile @@ -2,6 +2,7 @@ require 'bundler/gem_tasks' require 'rake/testtask' require 'rubocop/rake_task' +require 'yard' task default: [:rubocop, :test] @@ -15,3 +16,5 @@ Rake::TestTask.new do |t| o << '--verbose ' if ENV['VERBOSE'] end end + +YARD::Rake::YardocTask.new diff --git a/bin/resque-scheduler b/bin/resque-scheduler index 124b9063..f26121d1 100755 --- a/bin/resque-scheduler +++ b/bin/resque-scheduler @@ -1,5 +1,5 @@ #!/usr/bin/env ruby # vim:fileencoding=utf-8 -require 'resque_scheduler' -ResqueScheduler::Cli.run! +require 'resque-scheduler' +Resque::Scheduler::Cli.run! diff --git a/examples/Rakefile b/examples/Rakefile index bf314d02..bf31d618 100644 --- a/examples/Rakefile +++ b/examples/Rakefile @@ -1,2 +1,2 @@ # vim:fileencoding=utf-8 -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' diff --git a/examples/config/initializers/resque-web.rb b/examples/config/initializers/resque-web.rb index 9a2ed792..20e1a141 100644 --- a/examples/config/initializers/resque-web.rb +++ b/examples/config/initializers/resque-web.rb @@ -7,8 +7,8 @@ redis_env_var = ENV['REDIS_PROVIDER'] || 'REDIS_URL' Resque.redis = ENV[redis_env_var] || 'localhost:6379' -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' schedule_yml = ENV['RESQUE_SCHEDULE_YML'] if schedule_yml diff --git a/examples/dynamic-scheduling/lib/tasks/resque.rake b/examples/dynamic-scheduling/lib/tasks/resque.rake index 76b8dbc1..5e7e8a07 100644 --- a/examples/dynamic-scheduling/lib/tasks/resque.rake +++ b/examples/dynamic-scheduling/lib/tasks/resque.rake @@ -1,13 +1,13 @@ # vim:fileencoding=utf-8 require 'resque/tasks' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' require 'yaml' namespace :resque do task :setup do require 'resque' - require 'resque_scheduler' + require 'resque-scheduler' rails_root = ENV['RAILS_ROOT'] || File.expand_path('../../../', __FILE__) rails_env = ENV['RAILS_ENV'] || 'development' diff --git a/lib/resque-scheduler.rb b/lib/resque-scheduler.rb index b1e9d337..b770cd91 100644 --- a/lib/resque-scheduler.rb +++ b/lib/resque-scheduler.rb @@ -1,2 +1,4 @@ # vim:fileencoding=utf-8 -require File.expand_path('../resque_scheduler', __FILE__) +require_relative 'resque/scheduler' + +Resque.extend Resque::Scheduler::Extension diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index f08af92b..d207f4f5 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -1,11 +1,14 @@ # vim:fileencoding=utf-8 require 'rufus/scheduler' -require 'resque/scheduler_locking' -require 'resque_scheduler/logger_builder' +require_relative 'scheduler/locking' +require_relative 'scheduler/logger_builder' module Resque - class Scheduler - extend Resque::SchedulerLocking + module Scheduler + autoload :Cli, 'resque/scheduler/cli' + autoload :Extension, 'resque/scheduler/extension' + + extend Resque::Scheduler::Locking class << self # Allows for block-style configuration @@ -83,7 +86,7 @@ def poll_sleep_amount attr_writer :logger def logger - @logger ||= ResqueScheduler::LoggerBuilder.new( + @logger ||= Resque::Scheduler::LoggerBuilder.new( mute: mute, verbose: verbose, log_dev: logfile, @@ -202,8 +205,8 @@ def optionizate_interval_value(value) args end - # Loads a job schedule into the Rufus::Scheduler and stores it in - # @scheduled_jobs + # Loads a job schedule into the Rufus::Scheduler and stores it + # in @scheduled_jobs def load_schedule_job(name, config) # If `rails_env` or `env` is set in the config, load jobs only if they # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or @@ -306,7 +309,7 @@ def enqueue_from_config(job_config) klass_name = job_config['class'] || job_config[:class] begin - klass = ResqueScheduler::Util.constantize(klass_name) + klass = Resque::Scheduler::Util.constantize(klass_name) rescue NameError klass = klass_name end @@ -324,7 +327,7 @@ def enqueue_from_config(job_config) # from the web perhaps), fall back to enqueing normally via # Resque::Job.create. begin - ResqueScheduler::Util.constantize(job_klass).scheduled( + Resque::Scheduler::Util.constantize(job_klass).scheduled( queue, klass_name, *params ) rescue NameError @@ -337,7 +340,7 @@ def enqueue_from_config(job_config) # for non-existent classes (for example: running scheduler in # one app that schedules for another. if Class === klass - ResqueScheduler::Plugin.run_before_delayed_enqueue_hooks( + Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks( klass, *params ) @@ -474,7 +477,7 @@ def build_procline(string) end def internal_name - "resque-scheduler-#{ResqueScheduler::VERSION}" + "resque-scheduler-#{Resque::Scheduler::VERSION}" end end end diff --git a/lib/resque/scheduler/cli.rb b/lib/resque/scheduler/cli.rb new file mode 100644 index 00000000..9915d3b4 --- /dev/null +++ b/lib/resque/scheduler/cli.rb @@ -0,0 +1,164 @@ +# vim:fileencoding=utf-8 + +require 'optparse' + +module Resque + module Scheduler + class Cli + BANNER = <<-EOF.gsub(/ {6}/, '') + Usage: resque-scheduler [options] + + Runs a resque scheduler process directly (rather than via rake). + + EOF + OPTIONS = [ + { + args: ['-n', '--app-name [APP_NAME]', + 'Application name for procline'], + callback: ->(options) { ->(n) { options[:app_name] = n } } + }, + { + args: ['-B', '--background', 'Run in the background [BACKGROUND]'], + callback: ->(options) { ->(b) { options[:background] = b } } + }, + { + args: ['-D', '--dynamic-schedule', + 'Enable dynamic scheduling [DYNAMIC_SCHEDULE]'], + callback: ->(options) { ->(d) { options[:dynamic] = d } } + }, + { + args: ['-E', '--environment [RAILS_ENV]', 'Environment name'], + callback: ->(options) { ->(e) { options[:env] = e } } + }, + { + args: ['-I', '--initializer-path [INITIALIZER_PATH]', + 'Path to optional initializer ruby file'], + callback: ->(options) { ->(i) { options[:initializer_path] = i } } + }, + { + args: ['-i', '--interval [RESQUE_SCHEDULER_INTERVAL]', + 'Interval for checking if a scheduled job must run'], + callback: ->(options) { ->(i) { options[:poll_sleep_amount] = i } } + }, + { + args: ['-l', '--logfile [LOGFILE]', 'Log file name'], + callback: ->(options) { ->(l) { options[:logfile] = l } } + }, + { + args: ['-F', '--logformat [LOGFORMAT]', 'Log output format'], + callback: ->(options) { ->(f) { options[:logformat] = f } } + }, + { + args: ['-P', '--pidfile [PIDFILE]', 'PID file name'], + callback: ->(options) { ->(p) { options[:pidfile] = p } } + }, + { + args: ['-q', '--quiet', + 'Run with minimal output [QUIET] (or [MUTE])'], + callback: ->(options) { ->(q) { options[:mute] = q } } + }, + { + args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'], + callback: ->(options) { ->(v) { options[:verbose] = v } } + } + ].freeze + + def self.run!(argv = ARGV, env = ENV) + new(argv, env).run! + end + + def initialize(argv = ARGV, env = ENV) + @argv = argv + @env = env + end + + def run! + pre_run + run_forever + end + + def pre_run + parse_options + pre_setup + setup_env + end + + def parse_options + OptionParser.new do |opts| + opts.banner = BANNER + OPTIONS.each do |opt| + opts.on(*opt[:args], &(opt[:callback].call(options))) + end + end.parse!(argv.dup) + end + + def pre_setup + if options[:initializer_path] + load options[:initializer_path].to_s.strip + else + false + end + end + + def setup_env + require 'resque' + require 'resque/scheduler' + + # Need to set this here for conditional Process.daemon redirect of + # stderr/stdout to /dev/null + Resque::Scheduler.mute = !!options[:mute] + + if options[:background] + unless Process.respond_to?('daemon') + abort 'background option is set, which requires ruby >= 1.9' + end + + Process.daemon(true, !Resque::Scheduler.mute) + Resque.redis.client.reconnect + end + + File.open(options[:pidfile], 'w') do |f| + f.puts $PROCESS_ID + end if options[:pidfile] + + Resque::Scheduler.configure do |c| + # These settings are somewhat redundant given the defaults present + # in the attr reader methods. They are left here for clarity and + # to serve as an example of how to use `.configure`. + + c.app_name = options[:app_name] + c.dynamic = !!options[:dynamic] + c.env = options[:env] + c.logfile = options[:logfile] + c.logformat = options[:logformat] + c.poll_sleep_amount = Float(options[:poll_sleep_amount] || '5') + c.verbose = !!options[:verbose] + end + end + + def run_forever + Resque::Scheduler.run + end + + private + + attr_reader :argv, :env + + def options + @options ||= { + app_name: env['APP_NAME'], + background: env['BACKGROUND'], + dynamic: env['DYNAMIC_SCHEDULE'], + env: env['RAILS_ENV'], + initializer_path: env['INITIALIZER_PATH'], + logfile: env['LOGFILE'], + logformat: env['LOGFORMAT'], + mute: env['MUTE'] || env['QUIET'], + pidfile: env['PIDFILE'], + poll_sleep_amount: env['RESQUE_SCHEDULER_INTERVAL'], + verbose: env['VERBOSE'] + } + end + end + end +end diff --git a/lib/resque/scheduler/extension.rb b/lib/resque/scheduler/extension.rb new file mode 100644 index 00000000..6564252c --- /dev/null +++ b/lib/resque/scheduler/extension.rb @@ -0,0 +1,397 @@ +# vim:fileencoding=utf-8 +require 'rubygems' +require 'resque' +require_relative 'version' +require_relative 'util' +require_relative '../scheduler' +require_relative 'plugin' + +module Resque + module Scheduler + module Extension + # Accepts a new schedule configuration of the form: + # + # { + # "MakeTea" => { + # "every" => "1m" }, + # "some_name" => { + # "cron" => "5/* * * *", + # "class" => "DoSomeWork", + # "args" => "work on this string", + # "description" => "this thing works it"s butter off" }, + # ... + # } + # + # Hash keys can be anything and are used to describe and reference + # the scheduled job. If the "class" argument is missing, the key + # is used implicitly as "class" argument - in the "MakeTea" example, + # "MakeTea" is used both as job name and resque worker class. + # + # Any jobs that were in the old schedule, but are not + # present in the new schedule, will be removed. + # + # :cron can be any cron scheduling string + # + # :every can be used in lieu of :cron. see rufus-scheduler's 'every' + # usage for valid syntax. If :cron is present it will take precedence + # over :every. + # + # :class must be a resque worker class. If it is missing, the job name + # (hash key) will be used as :class. + # + # :args can be any yaml which will be converted to a ruby literal and + # passed in a params. (optional) + # + # :rails_envs is the list of envs where the job gets loaded. Envs are + # comma separated (optional) + # + # :description is just that, a description of the job (optional). If + # params is an array, each element in the array is passed as a separate + # param, otherwise params is passed in as the only parameter to perform. + def schedule=(schedule_hash) + # clean the schedules as it exists in redis + clean_schedules + + schedule_hash = prepare_schedule(schedule_hash) + + # store all schedules in redis, so we can retrieve them back + # everywhere. + schedule_hash.each do |name, job_spec| + set_schedule(name, job_spec) + end + + # ensure only return the successfully saved data! + reload_schedule! + end + + # Returns the schedule hash + def schedule + @schedule ||= all_schedules + @schedule || {} + end + + # reloads the schedule from redis + def reload_schedule! + @schedule = all_schedules + end + + # gets the schedules as it exists in redis + def all_schedules + return nil unless redis.exists(:schedules) + + redis.hgetall(:schedules).tap do |h| + h.each do |name, config| + h[name] = decode(config) + end + end + end + + # clean the schedules as it exists in redis, useful for first setup? + def clean_schedules + if redis.exists(:schedules) + redis.hkeys(:schedules).each do |key| + remove_schedule(key) unless schedule_persisted?(key) + end + end + @schedule = nil + true + end + + # Create or update a schedule with the provided name and configuration. + # + # Note: values for class and custom_job_class need to be strings, + # not constants. + # + # Resque.set_schedule('some_job', {:class => 'SomeJob', + # :every => '15mins', + # :queue => 'high', + # :args => '/tmp/poop'}) + def set_schedule(name, config) + existing_config = fetch_schedule(name) + persist = config.delete(:persist) || config.delete('persist') + unless existing_config && existing_config == config + redis.pipelined do + redis.hset(:schedules, name, encode(config)) + redis.sadd(:schedules_changed, name) + redis.sadd(:persisted_schedules, name) if persist + end + end + config + end + + # retrive the schedule configuration for the given name + def fetch_schedule(name) + decode(redis.hget(:schedules, name)) + end + + def schedule_persisted?(name) + redis.sismember(:persisted_schedules, name) + end + + # remove a given schedule by name + def remove_schedule(name) + redis.pipelined do + redis.hdel(:schedules, name) + redis.srem(:persisted_schedules, name) + redis.sadd(:schedules_changed, name) + end + end + + # This method is nearly identical to +enqueue+ only it also + # takes a timestamp which will be used to schedule the job + # for queueing. Until timestamp is in the past, the job will + # sit in the schedule list. + def enqueue_at(timestamp, klass, *args) + validate(klass) + enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args) + end + + # Identical to +enqueue_at+, except you can also specify + # a queue in which the job will be placed after the + # timestamp has passed. It respects Resque.inline option, by + # creating the job right away instead of adding to the queue. + def enqueue_at_with_queue(queue, timestamp, klass, *args) + return false unless Plugin.run_before_schedule_hooks(klass, *args) + + if Resque.inline? || timestamp.to_i < Time.now.to_i + # Just create the job and let resque perform it right away with + # inline. If the class is a custom job class, call self#scheduled on + # it. This allows you to do things like Resque.enqueue_at(timestamp, + # CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque. + if klass.respond_to?(:scheduled) + klass.scheduled(queue, klass.to_s, *args) + else + Resque::Job.create(queue, klass, *args) + end + else + delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) + end + + Plugin.run_after_schedule_hooks(klass, *args) + end + + # Identical to enqueue_at but takes number_of_seconds_from_now + # instead of a timestamp. + def enqueue_in(number_of_seconds_from_now, klass, *args) + enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) + end + + # Identical to +enqueue_in+, except you can also specify + # a queue in which the job will be placed after the + # number of seconds has passed. + def enqueue_in_with_queue(queue, number_of_seconds_from_now, + klass, *args) + enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, + klass, *args) + end + + # Used internally to stuff the item into the schedule sorted list. + # +timestamp+ can be either in seconds or a datetime object Insertion if + # O(log(n)). Returns true if it's the first job to be scheduled at that + # time, else false + def delayed_push(timestamp, item) + # First add this item to the list for this timestamp + redis.rpush("delayed:#{timestamp.to_i}", encode(item)) + + # Store the timestamps at with this item occurs + redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") + + # Now, add this timestamp to the zsets. The score and the value are + # the same since we'll be querying by timestamp, and we don't have + # anything else to store. + redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i + end + + # Returns an array of timestamps based on start and count + def delayed_queue_peek(start, count) + result = redis.zrange(:delayed_queue_schedule, start, + start + count - 1) + Array(result).map(&:to_i) + end + + # Returns the size of the delayed queue schedule + def delayed_queue_schedule_size + redis.zcard :delayed_queue_schedule + end + + # Returns the number of jobs for a given timestamp in the delayed queue + # schedule + def delayed_timestamp_size(timestamp) + redis.llen("delayed:#{timestamp.to_i}").to_i + end + + # Returns an array of delayed items for the given timestamp + def delayed_timestamp_peek(timestamp, start, count) + if 1 == count + r = list_range "delayed:#{timestamp.to_i}", start, count + r.nil? ? [] : [r] + else + list_range "delayed:#{timestamp.to_i}", start, count + end + end + + # Returns the next delayed queue timestamp + # (don't call directly) + def next_delayed_timestamp(at_time = nil) + items = redis.zrangebyscore( + :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, + limit: [0, 1] + ) + timestamp = items.nil? ? nil : Array(items).first + timestamp.to_i unless timestamp.nil? + end + + # Returns the next item to be processed for a given timestamp, nil if + # done. (don't call directly) + # +timestamp+ can either be in seconds or a datetime + def next_item_for_timestamp(timestamp) + key = "delayed:#{timestamp.to_i}" + + encoded_item = redis.lpop(key) + redis.srem("timestamps:#{encoded_item}", key) + item = decode(encoded_item) + + # If the list is empty, remove it. + clean_up_timestamp(key, timestamp) + item + end + + # Clears all jobs created with enqueue_at or enqueue_in + def reset_delayed_queue + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| + key = "delayed:#{item}" + items = redis.lrange(key, 0, -1) + redis.pipelined do + items.each { |ts_item| redis.del("timestamps:#{ts_item}") } + end + redis.del key + end + + redis.del :delayed_queue_schedule + end + + # Given an encoded item, remove it from the delayed_queue + def remove_delayed(klass, *args) + search = encode(job_to_hash(klass, args)) + timestamps = redis.smembers("timestamps:#{search}") + + replies = redis.pipelined do + timestamps.each do |key| + redis.lrem(key, 0, search) + redis.srem("timestamps:#{search}", key) + end + end + + return 0 if replies.nil? || replies.empty? + replies.each_slice(2).map(&:first).inject(:+) + end + + # Given an encoded item, enqueue it now + def enqueue_delayed(klass, *args) + hash = job_to_hash(klass, args) + remove_delayed(klass, *args).times do + Resque::Scheduler.enqueue_from_config(hash) + end + end + + # Given a block, remove jobs that return true from a block + # + # This allows for removal of delayed jobs that have arguments matching + # certain criteria + def remove_delayed_selection + fail ArgumentError, 'Please supply a block' unless block_given? + + destroyed = 0 + # There is no way to search Redis list entries for a partial match, so + # we query for all delayed job tasks and do our matching after decoding + # the payload data + jobs = Resque.redis.keys('delayed:*') + jobs.each do |job| + index = Resque.redis.llen(job) - 1 + while index >= 0 + payload = Resque.redis.lindex(job, index) + decoded_payload = decode(payload) + if yield(decoded_payload['args']) + removed = redis.lrem job, 0, payload + destroyed += removed + index -= removed + else + index -= 1 + end + end + end + destroyed + end + + # Given a timestamp and job (klass + args) it removes all instances and + # returns the count of jobs removed. + # + # O(N) where N is the number of jobs scheduled to fire at the given + # timestamp + def remove_delayed_job_from_timestamp(timestamp, klass, *args) + key = "delayed:#{timestamp.to_i}" + encoded_job = encode(job_to_hash(klass, args)) + + redis.srem("timestamps:#{encoded_job}", key) + count = redis.lrem(key, 0, encoded_job) + clean_up_timestamp(key, timestamp) + + count + end + + def count_all_scheduled_jobs + total_jobs = 0 + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp| + total_jobs += redis.llen("delayed:#{timestamp}").to_i + end + total_jobs + end + + # Returns delayed jobs schedule timestamp for +klass+, +args+. + def scheduled_at(klass, *args) + search = encode(job_to_hash(klass, args)) + redis.smembers("timestamps:#{search}").map do |key| + key.tr('delayed:', '').to_i + end + end + + private + + def job_to_hash(klass, args) + { class: klass.to_s, args: args, queue: queue_from_class(klass) } + end + + def job_to_hash_with_queue(queue, klass, args) + { class: klass.to_s, args: args, queue: queue } + end + + def clean_up_timestamp(key, timestamp) + # If the list is empty, remove it. + + # Use a watch here to ensure nobody adds jobs to this delayed + # queue while we're removing it. + redis.watch key + if 0 == redis.llen(key).to_i + redis.multi do + redis.del key + redis.zrem :delayed_queue_schedule, timestamp.to_i + end + else + redis.unwatch + end + end + + def prepare_schedule(schedule_hash) + prepared_hash = {} + schedule_hash.each do |name, job_spec| + job_spec = job_spec.dup + unless job_spec.key?('class') || job_spec.key?(:class) + job_spec['class'] = name + end + prepared_hash[name] = job_spec + end + prepared_hash + end + end + end +end diff --git a/lib/resque/scheduler/lock/base.rb b/lib/resque/scheduler/lock/base.rb index 52aa43fc..fe33eb4d 100644 --- a/lib/resque/scheduler/lock/base.rb +++ b/lib/resque/scheduler/lock/base.rb @@ -1,7 +1,7 @@ # vim:fileencoding=utf-8 module Resque - class Scheduler + module Scheduler module Lock class Base attr_reader :key diff --git a/lib/resque/scheduler/lock/basic.rb b/lib/resque/scheduler/lock/basic.rb index 10cecd77..b2a030f1 100644 --- a/lib/resque/scheduler/lock/basic.rb +++ b/lib/resque/scheduler/lock/basic.rb @@ -1,8 +1,8 @@ # vim:fileencoding=utf-8 -require 'resque/scheduler/lock/base' +require_relative 'base' module Resque - class Scheduler + module Scheduler module Lock class Basic < Base def acquire! diff --git a/lib/resque/scheduler/lock/resilient.rb b/lib/resque/scheduler/lock/resilient.rb index 6c7eba00..906a8a91 100644 --- a/lib/resque/scheduler/lock/resilient.rb +++ b/lib/resque/scheduler/lock/resilient.rb @@ -1,8 +1,8 @@ # vim:fileencoding=utf-8 -require 'resque/scheduler/lock/base' +require_relative 'base' module Resque - class Scheduler + module Scheduler module Lock class Resilient < Base def acquire! diff --git a/lib/resque/scheduler_locking.rb b/lib/resque/scheduler/locking.rb similarity index 77% rename from lib/resque/scheduler_locking.rb rename to lib/resque/scheduler/locking.rb index 2a09bcd7..5a100172 100644 --- a/lib/resque/scheduler_locking.rb +++ b/lib/resque/scheduler/locking.rb @@ -49,44 +49,46 @@ # you stop cron, no jobs fire while it's stopped and it doesn't fire jobs that # were missed when it starts up again. -require 'resque/scheduler/lock' +require_relative 'lock' module Resque - module SchedulerLocking - def master_lock - @master_lock ||= build_master_lock - end + module Scheduler + module Locking + def master_lock + @master_lock ||= build_master_lock + end - def supports_lua? - redis_master_version >= 2.5 - end + def supports_lua? + redis_master_version >= 2.5 + end - def master? - master_lock.acquire! || master_lock.locked? - end + def master? + master_lock.acquire! || master_lock.locked? + end - def release_master_lock! - master_lock.release! - end + def release_master_lock! + master_lock.release! + end - private + private - def build_master_lock - if supports_lua? - Resque::Scheduler::Lock::Resilient.new(master_lock_key) - else - Resque::Scheduler::Lock::Basic.new(master_lock_key) + def build_master_lock + if supports_lua? + Resque::Scheduler::Lock::Resilient.new(master_lock_key) + else + Resque::Scheduler::Lock::Basic.new(master_lock_key) + end end - end - def master_lock_key - lock_prefix = ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] || '' - lock_prefix += ':' if lock_prefix != '' - "#{Resque.redis.namespace}:#{lock_prefix}resque_scheduler_master_lock" - end + def master_lock_key + lock_prefix = ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] || '' + lock_prefix += ':' if lock_prefix != '' + "#{Resque.redis.namespace}:#{lock_prefix}resque_scheduler_master_lock" + end - def redis_master_version - Resque.redis.info['redis_version'].to_f + def redis_master_version + Resque.redis.info['redis_version'].to_f + end end end end diff --git a/lib/resque/scheduler/logger_builder.rb b/lib/resque/scheduler/logger_builder.rb new file mode 100644 index 00000000..d11981a4 --- /dev/null +++ b/lib/resque/scheduler/logger_builder.rb @@ -0,0 +1,70 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + # Just builds a logger, with specified verbosity and destination. + # The simplest example: + # + # Resque::Scheduler::LoggerBuilder.new.build + class LoggerBuilder + # Initializes new instance of the builder + # + # Pass :opts Hash with + # - :mute if logger needs to be silent for all levels. Default - false + # - :verbose if there is a need in debug messages. Default - false + # - :log_dev to output logs into a desired file. Default - STDOUT + # - :format log format, either 'text' or 'json'. Default - 'text' + # + # Example: + # + # LoggerBuilder.new( + # :mute => false, :verbose => true, :log_dev => 'log/scheduler.log' + # ) + def initialize(opts = {}) + @muted = !!opts[:mute] + @verbose = !!opts[:verbose] + @log_dev = opts[:log_dev] || $stdout + @format = opts[:format] || 'text' + end + + # Returns an instance of Logger + def build + logger = Logger.new(@log_dev) + logger.level = level + logger.formatter = send(:"#{@format}_formatter") + logger + end + + private + + def level + if @verbose && !@muted + Logger::DEBUG + elsif !@muted + Logger::INFO + else + Logger::FATAL + end + end + + def text_formatter + proc do |severity, datetime, progname, msg| + "resque-scheduler: [#{severity}] #{datetime.iso8601}: #{msg}\n" + end + end + + def json_formatter + proc do |severity, datetime, progname, msg| + require 'json' + JSON.dump( + name: 'resque-scheduler', + progname: progname, + level: severity, + timestamp: datetime.iso8601, + msg: msg + ) + "\n" + end + end + end + end +end diff --git a/lib/resque/scheduler/plugin.rb b/lib/resque/scheduler/plugin.rb new file mode 100644 index 00000000..bab898b4 --- /dev/null +++ b/lib/resque/scheduler/plugin.rb @@ -0,0 +1,31 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module Plugin + def self.hooks(job, pattern) + job.methods.grep(/^#{pattern}/).sort + end + + def self.run_hooks(job, pattern, *args) + results = hooks(job, pattern).map do |hook| + job.send(hook, *args) + end + + results.all? { |result| result != false } + end + + def self.run_before_delayed_enqueue_hooks(klass, *args) + run_hooks(klass, 'before_delayed_enqueue', *args) + end + + def self.run_before_schedule_hooks(klass, *args) + run_hooks(klass, 'before_schedule', *args) + end + + def self.run_after_schedule_hooks(klass, *args) + run_hooks(klass, 'after_schedule', *args) + end + end + end +end diff --git a/lib/resque/scheduler/server.rb b/lib/resque/scheduler/server.rb new file mode 100644 index 00000000..4d9f9954 --- /dev/null +++ b/lib/resque/scheduler/server.rb @@ -0,0 +1,197 @@ +# vim:fileencoding=utf-8 +require 'resque-scheduler' +require 'resque/server' +require 'json' + +# Extend Resque::Server to add tabs +module Resque + module Scheduler + module Server + def self.included(base) + base.class_eval do + helpers do + def format_time(t) + t.strftime('%Y-%m-%d %H:%M:%S %z') + end + + def queue_from_class_name(class_name) + Resque.queue_from_class( + Resque::Scheduler::Util.constantize(class_name) + ) + end + + def find_job(worker) + worker = worker.downcase + results = Array.new + + # Check working jobs + working = [*Resque.working] + work = working.select do |w| + w.job && w.job['payload'] && + w.job['payload']['class'].downcase.include?(worker) + end + work.each do |w| + results += [ + w.job['payload'].merge( + 'queue' => w.job['queue'], 'where_at' => 'working' + ) + ] + end + + # Check delayed Jobs + dels = Array.new + schedule_size = Resque.delayed_queue_schedule_size + Resque.delayed_queue_peek(0, schedule_size).each do |d| + Resque.delayed_timestamp_peek( + d, 0, Resque.delayed_timestamp_size(d)).each do |j| + dels << j.merge!('timestamp' => d) + end + end + results += dels.select do |j| + j['class'].downcase.include?(worker) && + j.merge!('where_at' => 'delayed') + end + + # Check Queues + Resque.queues.each do |queue| + queued = Resque.peek(queue, 0, Resque.size(queue)) + queued = [queued] unless queued.is_a?(Array) + results += queued.select do |j| + j['class'].downcase.include?(worker) && + j.merge!('queue' => queue, 'where_at' => 'queued') + end + end + results + end + + def schedule_interval(config) + if config['every'] + schedule_interval_every(config['every']) + elsif config['cron'] + 'cron: ' + config['cron'].to_s + else + 'Not currently scheduled' + end + end + + def schedule_interval_every(every) + every = [*every] + s = 'every: ' << every.first + + return s unless every.length > 1 + + s << ' (' + meta = every.last.map do |key, value| + "#{key.to_s.gsub(/_/, ' ')} #{value}" + end + s << meta.join(', ') << ')' + end + + def schedule_class(config) + if config['class'].nil? && !config['custom_job_class'].nil? + config['custom_job_class'] + else + config['class'] + end + end + + def scheduler_template(name) + File.read( + File.expand_path("../server/views/#{name}.erb", __FILE__) + ) + end + end + + get '/schedule' do + Resque.reload_schedule! if Resque::Scheduler.dynamic + erb scheduler_template('scheduler') + end + + post '/schedule/requeue' do + @job_name = params['job_name'] || params[:job_name] + config = Resque.schedule[@job_name] + @parameters = config['parameters'] || config[:parameters] + if @parameters + erb scheduler_template('requeue-params') + else + Resque::Scheduler.enqueue_from_config(config) + redirect u('/overview') + end + end + + post '/schedule/requeue_with_params' do + job_name = params['job_name'] || params[:job_name] + config = Resque.schedule[job_name] + # Build args hash from post data (removing the job name) + submitted_args = params.reject do |key, value| + key == 'job_name' || key == :job_name + end + + # Merge constructed args hash with existing args hash for + # the job, if it exists + config_args = config['args'] || config[:args] || {} + config_args = config_args.merge(submitted_args) + + # Insert the args hash into config and queue the resque job + config = config.merge('args' => config_args) + Resque::Scheduler.enqueue_from_config(config) + redirect u('/overview') + end + + get '/delayed' do + erb scheduler_template('delayed') + end + + get '/delayed/jobs/:klass' do + begin + klass = Resque::Scheduler::Util.constantize(params[:klass]) + @args = JSON.load(URI.decode(params[:args])) + @timestamps = Resque.scheduled_at(klass, *@args) + rescue + @timestamps = [] + end + + erb scheduler_template('delayed_schedules') + end + + post '/delayed/search' do + @jobs = find_job(params[:search]) + erb scheduler_template('search') + end + + get '/delayed/:timestamp' do + erb scheduler_template('delayed_timestamp') + end + + post '/delayed/queue_now' do + timestamp = params['timestamp'].to_i + if timestamp > 0 + Resque::Scheduler.enqueue_delayed_items_for_timestamp(timestamp) + end + redirect u('/overview') + end + + post '/delayed/cancel_now' do + klass = Resque::Scheduler::Util.constantize(params['klass']) + timestamp = params['timestamp'] + args = Resque.decode params['args'] + Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args) + redirect u('/delayed') + end + + post '/delayed/clear' do + Resque.reset_delayed_queue + redirect u('delayed') + end + end + end + end + end +end + +Resque::Server.tabs << 'Schedule' +Resque::Server.tabs << 'Delayed' + +Resque::Server.class_eval do + include Resque::Scheduler::Server +end diff --git a/lib/resque_scheduler/server/views/delayed.erb b/lib/resque/scheduler/server/views/delayed.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed.erb rename to lib/resque/scheduler/server/views/delayed.erb diff --git a/lib/resque_scheduler/server/views/delayed_schedules.erb b/lib/resque/scheduler/server/views/delayed_schedules.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed_schedules.erb rename to lib/resque/scheduler/server/views/delayed_schedules.erb diff --git a/lib/resque_scheduler/server/views/delayed_timestamp.erb b/lib/resque/scheduler/server/views/delayed_timestamp.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed_timestamp.erb rename to lib/resque/scheduler/server/views/delayed_timestamp.erb diff --git a/lib/resque_scheduler/server/views/requeue-params.erb b/lib/resque/scheduler/server/views/requeue-params.erb similarity index 100% rename from lib/resque_scheduler/server/views/requeue-params.erb rename to lib/resque/scheduler/server/views/requeue-params.erb diff --git a/lib/resque_scheduler/server/views/scheduler.erb b/lib/resque/scheduler/server/views/scheduler.erb similarity index 100% rename from lib/resque_scheduler/server/views/scheduler.erb rename to lib/resque/scheduler/server/views/scheduler.erb diff --git a/lib/resque_scheduler/server/views/search.erb b/lib/resque/scheduler/server/views/search.erb similarity index 100% rename from lib/resque_scheduler/server/views/search.erb rename to lib/resque/scheduler/server/views/search.erb diff --git a/lib/resque_scheduler/server/views/search_form.erb b/lib/resque/scheduler/server/views/search_form.erb similarity index 100% rename from lib/resque_scheduler/server/views/search_form.erb rename to lib/resque/scheduler/server/views/search_form.erb diff --git a/lib/resque_scheduler/tasks.rb b/lib/resque/scheduler/tasks.rb similarity index 85% rename from lib/resque_scheduler/tasks.rb rename to lib/resque/scheduler/tasks.rb index 6e6cfd1c..52eed1aa 100644 --- a/lib/resque_scheduler/tasks.rb +++ b/lib/resque/scheduler/tasks.rb @@ -2,13 +2,13 @@ require 'English' require 'resque/tasks' -require 'resque_scheduler' +require 'resque-scheduler' namespace :resque do task :setup def scheduler_cli - @scheduler_cli ||= ResqueScheduler::Cli.new( + @scheduler_cli ||= Resque::Scheduler::Cli.new( %W(#{ENV['RESQUE_SCHEDULER_OPTIONS']}) ) end diff --git a/lib/resque/scheduler/util.rb b/lib/resque/scheduler/util.rb new file mode 100644 index 00000000..ad983a9b --- /dev/null +++ b/lib/resque/scheduler/util.rb @@ -0,0 +1,41 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + class Util + # In order to upgrade to resque(1.25) which has deprecated following + # methods, we just added these usefull helpers back to use in Resque + # Scheduler. refer to: + # https://github.com/resque/resque-scheduler/pull/273 + + def self.constantize(camel_cased_word) + camel_cased_word = camel_cased_word.to_s + + if camel_cased_word.include?('-') + camel_cased_word = classify(camel_cased_word) + end + + names = camel_cased_word.split('::') + names.shift if names.empty? || names.first.empty? + + constant = Object + names.each do |name| + args = Module.method(:const_get).arity != 1 ? [false] : [] + + if constant.const_defined?(name, *args) + constant = constant.const_get(name) + else + constant = constant.const_missing(name) + end + end + constant + end + + def self.classify(dashed_word) + dashed_word.split('-').each do|part| + part[0] = part[0].chr.upcase + end.join + end + end + end +end diff --git a/lib/resque/scheduler/version.rb b/lib/resque/scheduler/version.rb new file mode 100644 index 00000000..6d55ff62 --- /dev/null +++ b/lib/resque/scheduler/version.rb @@ -0,0 +1,7 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + VERSION = '2.5.2' + end +end diff --git a/lib/resque_scheduler.rb b/lib/resque_scheduler.rb deleted file mode 100644 index 316105db..00000000 --- a/lib/resque_scheduler.rb +++ /dev/null @@ -1,403 +0,0 @@ -# vim:fileencoding=utf-8 -require 'rubygems' -require 'resque' -require 'resque_scheduler/version' -require 'resque_scheduler/util' -require 'resque/scheduler' -require 'resque_scheduler/plugin' - -module ResqueScheduler - autoload :Cli, 'resque_scheduler/cli' - - # - # Accepts a new schedule configuration of the form: - # - # { - # "MakeTea" => { - # "every" => "1m" }, - # "some_name" => { - # "cron" => "5/* * * *", - # "class" => "DoSomeWork", - # "args" => "work on this string", - # "description" => "this thing works it"s butter off" }, - # ... - # } - # - # Hash keys can be anything and are used to describe and reference - # the scheduled job. If the "class" argument is missing, the key - # is used implicitly as "class" argument - in the "MakeTea" example, - # "MakeTea" is used both as job name and resque worker class. - # - # Any jobs that were in the old schedule, but are not - # present in the new schedule, will be removed. - # - # :cron can be any cron scheduling string - # - # :every can be used in lieu of :cron. see rufus-scheduler's 'every' usage - # for valid syntax. If :cron is present it will take precedence over :every. - # - # :class must be a resque worker class. If it is missing, the job name (hash - # key) will be used as :class. - # - # :args can be any yaml which will be converted to a ruby literal and - # passed in a params. (optional) - # - # :rails_envs is the list of envs where the job gets loaded. Envs are - # comma separated (optional) - # - # :description is just that, a description of the job (optional). If - # params is an array, each element in the array is passed as a separate - # param, otherwise params is passed in as the only parameter to perform. - def schedule=(schedule_hash) - # clean the schedules as it exists in redis - clean_schedules - - schedule_hash = prepare_schedule(schedule_hash) - - # store all schedules in redis, so we can retrieve them back everywhere. - schedule_hash.each do |name, job_spec| - set_schedule(name, job_spec) - end - - # ensure only return the successfully saved data! - reload_schedule! - end - - # Returns the schedule hash - def schedule - @schedule ||= all_schedules - @schedule || {} - end - - # reloads the schedule from redis - def reload_schedule! - @schedule = all_schedules - end - - # gets the schedules as it exists in redis - def all_schedules - return nil unless redis.exists(:schedules) - - redis.hgetall(:schedules).tap do |h| - h.each do |name, config| - h[name] = decode(config) - end - end - end - - # clean the schedules as it exists in redis, useful for first setup? - def clean_schedules - if redis.exists(:schedules) - redis.hkeys(:schedules).each do |key| - remove_schedule(key) unless schedule_persisted?(key) - end - end - @schedule = nil - true - end - - # Create or update a schedule with the provided name and configuration. - # - # Note: values for class and custom_job_class need to be strings, - # not constants. - # - # Resque.set_schedule('some_job', {:class => 'SomeJob', - # :every => '15mins', - # :queue => 'high', - # :args => '/tmp/poop'}) - def set_schedule(name, config) - existing_config = fetch_schedule(name) - persist = config.delete(:persist) || config.delete('persist') - unless existing_config && existing_config == config - redis.pipelined do - redis.hset(:schedules, name, encode(config)) - redis.sadd(:schedules_changed, name) - redis.sadd(:persisted_schedules, name) if persist - end - end - config - end - - # retrive the schedule configuration for the given name - def fetch_schedule(name) - decode(redis.hget(:schedules, name)) - end - - def schedule_persisted?(name) - redis.sismember(:persisted_schedules, name) - end - - # remove a given schedule by name - def remove_schedule(name) - redis.pipelined do - redis.hdel(:schedules, name) - redis.srem(:persisted_schedules, name) - redis.sadd(:schedules_changed, name) - end - end - - # This method is nearly identical to +enqueue+ only it also - # takes a timestamp which will be used to schedule the job - # for queueing. Until timestamp is in the past, the job will - # sit in the schedule list. - def enqueue_at(timestamp, klass, *args) - validate(klass) - enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args) - end - - # Identical to +enqueue_at+, except you can also specify - # a queue in which the job will be placed after the - # timestamp has passed. It respects Resque.inline option, by - # creating the job right away instead of adding to the queue. - def enqueue_at_with_queue(queue, timestamp, klass, *args) - return false unless Plugin.run_before_schedule_hooks(klass, *args) - - if Resque.inline? || timestamp.to_i < Time.now.to_i - # Just create the job and let resque perform it right away with inline. - # If the class is a custom job class, call self#scheduled on it. This - # allows you to do things like Resque.enqueue_at(timestamp, - # CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque. - if klass.respond_to?(:scheduled) - klass.scheduled(queue, klass.to_s, *args) - else - Resque::Job.create(queue, klass, *args) - end - else - delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) - end - - Plugin.run_after_schedule_hooks(klass, *args) - end - - # Identical to enqueue_at but takes number_of_seconds_from_now - # instead of a timestamp. - def enqueue_in(number_of_seconds_from_now, klass, *args) - enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) - end - - # Identical to +enqueue_in+, except you can also specify - # a queue in which the job will be placed after the - # number of seconds has passed. - def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) - enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, - klass, *args) - end - - # Used internally to stuff the item into the schedule sorted list. - # +timestamp+ can be either in seconds or a datetime object - # Insertion if O(log(n)). - # Returns true if it's the first job to be scheduled at that time, else false - def delayed_push(timestamp, item) - # First add this item to the list for this timestamp - redis.rpush("delayed:#{timestamp.to_i}", encode(item)) - - # Store the timestamps at with this item occurs - redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") - - # Now, add this timestamp to the zsets. The score and the value are - # the same since we'll be querying by timestamp, and we don't have - # anything else to store. - redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i - end - - # Returns an array of timestamps based on start and count - def delayed_queue_peek(start, count) - result = redis.zrange(:delayed_queue_schedule, start, start + count - 1) - Array(result).map(&:to_i) - end - - # Returns the size of the delayed queue schedule - def delayed_queue_schedule_size - redis.zcard :delayed_queue_schedule - end - - # Returns the number of jobs for a given timestamp in the delayed queue - # schedule - def delayed_timestamp_size(timestamp) - redis.llen("delayed:#{timestamp.to_i}").to_i - end - - # Returns an array of delayed items for the given timestamp - def delayed_timestamp_peek(timestamp, start, count) - if 1 == count - r = list_range "delayed:#{timestamp.to_i}", start, count - r.nil? ? [] : [r] - else - list_range "delayed:#{timestamp.to_i}", start, count - end - end - - # Returns the next delayed queue timestamp - # (don't call directly) - def next_delayed_timestamp(at_time = nil) - items = redis.zrangebyscore( - :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, - limit: [0, 1] - ) - timestamp = items.nil? ? nil : Array(items).first - timestamp.to_i unless timestamp.nil? - end - - # Returns the next item to be processed for a given timestamp, nil if - # done. (don't call directly) - # +timestamp+ can either be in seconds or a datetime - def next_item_for_timestamp(timestamp) - key = "delayed:#{timestamp.to_i}" - - encoded_item = redis.lpop(key) - redis.srem("timestamps:#{encoded_item}", key) - item = decode(encoded_item) - - # If the list is empty, remove it. - clean_up_timestamp(key, timestamp) - item - end - - # Clears all jobs created with enqueue_at or enqueue_in - def reset_delayed_queue - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| - key = "delayed:#{item}" - items = redis.lrange(key, 0, -1) - redis.pipelined do - items.each { |ts_item| redis.del("timestamps:#{ts_item}") } - end - redis.del key - end - - redis.del :delayed_queue_schedule - end - - # Given an encoded item, remove it from the delayed_queue - def remove_delayed(klass, *args) - search = encode(job_to_hash(klass, args)) - timestamps = redis.smembers("timestamps:#{search}") - - replies = redis.pipelined do - timestamps.each do |key| - redis.lrem(key, 0, search) - redis.srem("timestamps:#{search}", key) - end - end - - return 0 if replies.nil? || replies.empty? - replies.each_slice(2).map(&:first).inject(:+) - end - - # Given an encoded item, enqueue it now - def enqueue_delayed(klass, *args) - hash = job_to_hash(klass, args) - remove_delayed(klass, *args).times do - Resque::Scheduler.enqueue_from_config(hash) - end - end - - # Given a block, remove jobs that return true from a block - # - # This allows for removal of delayed jobs that have arguments matching - # certain criteria - def remove_delayed_selection - fail ArgumentError, 'Please supply a block' unless block_given? - - destroyed = 0 - # There is no way to search Redis list entries for a partial match, so we - # query for all delayed job tasks and do our matching after decoding the - # payload data - jobs = Resque.redis.keys('delayed:*') - jobs.each do |job| - index = Resque.redis.llen(job) - 1 - while index >= 0 - payload = Resque.redis.lindex(job, index) - decoded_payload = decode(payload) - if yield(decoded_payload['args']) - removed = redis.lrem job, 0, payload - destroyed += removed - index -= removed - else - index -= 1 - end - end - end - destroyed - end - - # Given a timestamp and job (klass + args) it removes all instances and - # returns the count of jobs removed. - # - # O(N) where N is the number of jobs scheduled to fire at the given - # timestamp - def remove_delayed_job_from_timestamp(timestamp, klass, *args) - key = "delayed:#{timestamp.to_i}" - encoded_job = encode(job_to_hash(klass, args)) - - redis.srem("timestamps:#{encoded_job}", key) - count = redis.lrem(key, 0, encoded_job) - clean_up_timestamp(key, timestamp) - - count - end - - def count_all_scheduled_jobs - total_jobs = 0 - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp| - total_jobs += redis.llen("delayed:#{timestamp}").to_i - end - total_jobs - end - - # Discover if a job hs been delayed. - # Examples - # Resque.delayed? MyJob - # Resque.delayed? MyJob, id: 1 - # Returns true if the job has been delayed - def delayed?(klass, *args) - !scheduled_at(klass, *args).empty? - end - - # Returns delayed jobs schedule timestamp for +klass+, +args+. - def scheduled_at(klass, *args) - search = encode(job_to_hash(klass, args)) - redis.smembers("timestamps:#{search}").map do |key| - key.tr('delayed:', '').to_i - end - end - - private - - def job_to_hash(klass, args) - { class: klass.to_s, args: args, queue: queue_from_class(klass) } - end - - def job_to_hash_with_queue(queue, klass, args) - { class: klass.to_s, args: args, queue: queue } - end - - def clean_up_timestamp(key, timestamp) - # If the list is empty, remove it. - - # Use a watch here to ensure nobody adds jobs to this delayed - # queue while we're removing it. - redis.watch key - if 0 == redis.llen(key).to_i - redis.multi do - redis.del key - redis.zrem :delayed_queue_schedule, timestamp.to_i - end - else - redis.unwatch - end - end - - def prepare_schedule(schedule_hash) - prepared_hash = {} - schedule_hash.each do |name, job_spec| - job_spec = job_spec.dup - unless job_spec.key?('class') || job_spec.key?(:class) - job_spec['class'] = name - end - prepared_hash[name] = job_spec - end - prepared_hash - end -end - -Resque.extend ResqueScheduler diff --git a/lib/resque_scheduler/cli.rb b/lib/resque_scheduler/cli.rb deleted file mode 100644 index 0bcc43aa..00000000 --- a/lib/resque_scheduler/cli.rb +++ /dev/null @@ -1,160 +0,0 @@ -# vim:fileencoding=utf-8 - -require 'optparse' - -module ResqueScheduler - class Cli - BANNER = <<-EOF.gsub(/ {6}/, '') - Usage: resque-scheduler [options] - - Runs a resque scheduler process directly (rather than via rake). - - EOF - OPTIONS = [ - { - args: ['-n', '--app-name [APP_NAME]', 'Application name for procline'], - callback: ->(options) { ->(n) { options[:app_name] = n } } - }, - { - args: ['-B', '--background', 'Run in the background [BACKGROUND]'], - callback: ->(options) { ->(b) { options[:background] = b } } - }, - { - args: ['-D', '--dynamic-schedule', - 'Enable dynamic scheduling [DYNAMIC_SCHEDULE]'], - callback: ->(options) { ->(d) { options[:dynamic] = d } } - }, - { - args: ['-E', '--environment [RAILS_ENV]', 'Environment name'], - callback: ->(options) { ->(e) { options[:env] = e } } - }, - { - args: ['-I', '--initializer-path [INITIALIZER_PATH]', - 'Path to optional initializer ruby file'], - callback: ->(options) { ->(i) { options[:initializer_path] = i } } - }, - { - args: ['-i', '--interval [RESQUE_SCHEDULER_INTERVAL]', - 'Interval for checking if a scheduled job must run'], - callback: ->(options) { ->(i) { options[:poll_sleep_amount] = i } } - }, - { - args: ['-l', '--logfile [LOGFILE]', 'Log file name'], - callback: ->(options) { ->(l) { options[:logfile] = l } } - }, - { - args: ['-F', '--logformat [LOGFORMAT]', 'Log output format'], - callback: ->(options) { ->(f) { options[:logformat] = f } } - }, - { - args: ['-P', '--pidfile [PIDFILE]', 'PID file name'], - callback: ->(options) { ->(p) { options[:pidfile] = p } } - }, - { - args: ['-q', '--quiet', 'Run with minimal output [QUIET] (or [MUTE])'], - callback: ->(options) { ->(q) { options[:mute] = q } } - }, - { - args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'], - callback: ->(options) { ->(v) { options[:verbose] = v } } - } - ].freeze - - def self.run!(argv = ARGV, env = ENV) - new(argv, env).run! - end - - def initialize(argv = ARGV, env = ENV) - @argv = argv - @env = env - end - - def run! - pre_run - run_forever - end - - def pre_run - parse_options - pre_setup - setup_env - end - - def parse_options - OptionParser.new do |opts| - opts.banner = BANNER - OPTIONS.each do |opt| - opts.on(*opt[:args], &(opt[:callback].call(options))) - end - end.parse!(argv.dup) - end - - def pre_setup - if options[:initializer_path] - load options[:initializer_path].to_s.strip - else - false - end - end - - def setup_env - require 'resque' - require 'resque/scheduler' - - # Need to set this here for conditional Process.daemon redirect of - # stderr/stdout to /dev/null - Resque::Scheduler.mute = !!options[:mute] - - if options[:background] - unless Process.respond_to?('daemon') - abort 'background option is set, which requires ruby >= 1.9' - end - - Process.daemon(true, !Resque::Scheduler.mute) - Resque.redis.client.reconnect - end - - File.open(options[:pidfile], 'w') do |f| - f.puts $PROCESS_ID - end if options[:pidfile] - - Resque::Scheduler.configure do |c| - # These settings are somewhat redundant given the defaults present - # in the attr reader methods. They are left here for clarity and - # to serve as an example of how to use `.configure`. - - c.app_name = options[:app_name] - c.dynamic = !!options[:dynamic] - c.env = options[:env] - c.logfile = options[:logfile] - c.logformat = options[:logformat] - c.poll_sleep_amount = Float(options[:poll_sleep_amount] || '5') - c.verbose = !!options[:verbose] - end - end - - def run_forever - Resque::Scheduler.run - end - - private - - attr_reader :argv, :env - - def options - @options ||= { - app_name: env['APP_NAME'], - background: env['BACKGROUND'], - dynamic: env['DYNAMIC_SCHEDULE'], - env: env['RAILS_ENV'], - initializer_path: env['INITIALIZER_PATH'], - logfile: env['LOGFILE'], - logformat: env['LOGFORMAT'], - mute: env['MUTE'] || env['QUIET'], - pidfile: env['PIDFILE'], - poll_sleep_amount: env['RESQUE_SCHEDULER_INTERVAL'], - verbose: env['VERBOSE'] - } - end - end -end diff --git a/lib/resque_scheduler/logger_builder.rb b/lib/resque_scheduler/logger_builder.rb deleted file mode 100644 index 85e2d41e..00000000 --- a/lib/resque_scheduler/logger_builder.rb +++ /dev/null @@ -1,70 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - # Just builds a logger, with specified verbosity and destination. - # The simplest example: - # - # ResqueScheduler::LoggerBuilder.new.build - class LoggerBuilder - # Initializes new instance of the builder - # - # Pass :opts Hash with - # - :mute if logger needs to be silent for all levels. Default - false - # - :verbose if there is a need in debug messages. Default - false - # - :log_dev to output logs into a desired file. Default - STDOUT - # - :format log format, either 'text' or 'json'. Default - 'text' - # - # Example: - # - # LoggerBuilder.new( - # :mute => false, :verbose => true, :log_dev => 'log/scheduler.log' - # ) - def initialize(opts = {}) - @muted = !!opts[:mute] - @verbose = !!opts[:verbose] - @log_dev = opts[:log_dev] || $stdout - @format = opts[:format] || 'text' - end - - # Returns an instance of Logger - def build - logger = Logger.new(@log_dev) - logger.level = level - logger.formatter = send(:"#{@format}_formatter") - logger - end - - private - - def level - if @verbose && !@muted - Logger::DEBUG - elsif !@muted - Logger::INFO - else - Logger::FATAL - end - end - - def text_formatter - proc do |severity, datetime, progname, msg| - "resque-scheduler: [#{severity}] #{datetime.iso8601}: #{msg}\n" - end - end - - def json_formatter - proc do |severity, datetime, progname, msg| - require 'json' - JSON.dump( - - name: 'resque-scheduler', - progname: progname, - level: severity, - timestamp: datetime.iso8601, - msg: msg - - ) + "\n" - end - end - end -end diff --git a/lib/resque_scheduler/plugin.rb b/lib/resque_scheduler/plugin.rb deleted file mode 100644 index db8f7fc2..00000000 --- a/lib/resque_scheduler/plugin.rb +++ /dev/null @@ -1,29 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - module Plugin - def self.hooks(job, pattern) - job.methods.grep(/^#{pattern}/).sort - end - - def self.run_hooks(job, pattern, *args) - results = hooks(job, pattern).map do |hook| - job.send(hook, *args) - end - - results.all? { |result| result != false } - end - - def self.run_before_delayed_enqueue_hooks(klass, *args) - run_hooks(klass, 'before_delayed_enqueue', *args) - end - - def self.run_before_schedule_hooks(klass, *args) - run_hooks(klass, 'before_schedule', *args) - end - - def self.run_after_schedule_hooks(klass, *args) - run_hooks(klass, 'after_schedule', *args) - end - end -end diff --git a/lib/resque_scheduler/server.rb b/lib/resque_scheduler/server.rb deleted file mode 100644 index 8b97f121..00000000 --- a/lib/resque_scheduler/server.rb +++ /dev/null @@ -1,195 +0,0 @@ -# vim:fileencoding=utf-8 -require 'resque_scheduler' -require 'resque/server' -require 'json' - -# Extend Resque::Server to add tabs -module ResqueScheduler - module Server - def self.included(base) - base.class_eval do - helpers do - def format_time(t) - t.strftime('%Y-%m-%d %H:%M:%S %z') - end - - def queue_from_class_name(class_name) - Resque.queue_from_class( - ResqueScheduler::Util.constantize(class_name) - ) - end - - def find_job(worker) - worker = worker.downcase - results = Array.new - - # Check working jobs - working = [*Resque.working] - work = working.select do |w| - w.job && w.job['payload'] && - w.job['payload']['class'].downcase.include?(worker) - end - work.each do |w| - results += [ - w.job['payload'].merge( - 'queue' => w.job['queue'], 'where_at' => 'working' - ) - ] - end - - # Check delayed Jobs - dels = Array.new - schedule_size = Resque.delayed_queue_schedule_size - Resque.delayed_queue_peek(0, schedule_size).each do |d| - Resque.delayed_timestamp_peek( - d, 0, Resque.delayed_timestamp_size(d)).each do |j| - dels << j.merge!('timestamp' => d) - end - end - results += dels.select do |j| - j['class'].downcase.include?(worker) && - j.merge!('where_at' => 'delayed') - end - - # Check Queues - Resque.queues.each do |queue| - queued = Resque.peek(queue, 0, Resque.size(queue)) - queued = [queued] unless queued.is_a?(Array) - results += queued.select do |j| - j['class'].downcase.include?(worker) && - j.merge!('queue' => queue, 'where_at' => 'queued') - end - end - results - end - - def schedule_interval(config) - if config['every'] - schedule_interval_every(config['every']) - elsif config['cron'] - 'cron: ' + config['cron'].to_s - else - 'Not currently scheduled' - end - end - - def schedule_interval_every(every) - every = [*every] - s = 'every: ' << every.first - - return s unless every.length > 1 - - s << ' (' - meta = every.last.map do |key, value| - "#{key.to_s.gsub(/_/, ' ')} #{value}" - end - s << meta.join(', ') << ')' - end - - def schedule_class(config) - if config['class'].nil? && !config['custom_job_class'].nil? - config['custom_job_class'] - else - config['class'] - end - end - - def scheduler_template(name) - File.read( - File.expand_path("../server/views/#{name}.erb", __FILE__) - ) - end - end - - get '/schedule' do - Resque.reload_schedule! if Resque::Scheduler.dynamic - erb scheduler_template('scheduler') - end - - post '/schedule/requeue' do - @job_name = params['job_name'] || params[:job_name] - config = Resque.schedule[@job_name] - @parameters = config['parameters'] || config[:parameters] - if @parameters - erb scheduler_template('requeue-params') - else - Resque::Scheduler.enqueue_from_config(config) - redirect u('/overview') - end - end - - post '/schedule/requeue_with_params' do - job_name = params['job_name'] || params[:job_name] - config = Resque.schedule[job_name] - # Build args hash from post data (removing the job name) - submitted_args = params.reject do |key, value| - key == 'job_name' || key == :job_name - end - - # Merge constructed args hash with existing args hash for - # the job, if it exists - config_args = config['args'] || config[:args] || {} - config_args = config_args.merge(submitted_args) - - # Insert the args hash into config and queue the resque job - config = config.merge('args' => config_args) - Resque::Scheduler.enqueue_from_config(config) - redirect u('/overview') - end - - get '/delayed' do - erb scheduler_template('delayed') - end - - get '/delayed/jobs/:klass' do - begin - klass = ResqueScheduler::Util.constantize(params[:klass]) - @args = JSON.load(URI.decode(params[:args])) - @timestamps = Resque.scheduled_at(klass, *@args) - rescue - @timestamps = [] - end - - erb scheduler_template('delayed_schedules') - end - - post '/delayed/search' do - @jobs = find_job(params[:search]) - erb scheduler_template('search') - end - - get '/delayed/:timestamp' do - erb scheduler_template('delayed_timestamp') - end - - post '/delayed/queue_now' do - timestamp = params['timestamp'].to_i - if timestamp > 0 - Resque::Scheduler.enqueue_delayed_items_for_timestamp(timestamp) - end - redirect u('/overview') - end - - post '/delayed/cancel_now' do - klass = ResqueScheduler::Util.constantize params['klass'] - timestamp = params['timestamp'] - args = Resque.decode params['args'] - Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args) - redirect u('/delayed') - end - - post '/delayed/clear' do - Resque.reset_delayed_queue - redirect u('delayed') - end - end - end - end -end - -Resque::Server.tabs << 'Schedule' -Resque::Server.tabs << 'Delayed' - -Resque::Server.class_eval do - include ResqueScheduler::Server -end diff --git a/lib/resque_scheduler/util.rb b/lib/resque_scheduler/util.rb deleted file mode 100644 index fe81a177..00000000 --- a/lib/resque_scheduler/util.rb +++ /dev/null @@ -1,36 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - class Util - # In order to upgrade to resque(1.25) which has deprecated following - # methods, we just added these usefull helpers back to use in Resque - # Scheduler. refer to: https://github.com/resque/resque-scheduler/pull/273 - - def self.constantize(camel_cased_word) - camel_cased_word = camel_cased_word.to_s - - if camel_cased_word.include?('-') - camel_cased_word = classify(camel_cased_word) - end - - names = camel_cased_word.split('::') - names.shift if names.empty? || names.first.empty? - - constant = Object - names.each do |name| - args = Module.method(:const_get).arity != 1 ? [false] : [] - - if constant.const_defined?(name, *args) - constant = constant.const_get(name) - else - constant = constant.const_missing(name) - end - end - constant - end - - def self.classify(dashed_word) - dashed_word.split('-').each { |part| part[0] = part[0].chr.upcase }.join - end - end -end diff --git a/lib/resque_scheduler/version.rb b/lib/resque_scheduler/version.rb deleted file mode 100644 index 241c0f71..00000000 --- a/lib/resque_scheduler/version.rb +++ /dev/null @@ -1,5 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - VERSION = '2.5.4' -end diff --git a/resque-scheduler.gemspec b/resque-scheduler.gemspec index cb610d5a..e160d6e1 100644 --- a/resque-scheduler.gemspec +++ b/resque-scheduler.gemspec @@ -1,11 +1,11 @@ # vim:fileencoding=utf-8 lib = File.expand_path('../lib', __FILE__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) -require 'resque_scheduler/version' +require 'resque/scheduler/version' Gem::Specification.new do |spec| spec.name = 'resque-scheduler' - spec.version = ResqueScheduler::VERSION + spec.version = Resque::Scheduler::VERSION spec.authors = ['Ben VandenBos'] spec.email = ['bvandenbos@gmail.com'] spec.homepage = 'http://github.com/resque/resque-scheduler' @@ -26,8 +26,10 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'pry' spec.add_development_dependency 'rack-test' spec.add_development_dependency 'rake' + spec.add_development_dependency 'redcarpet' spec.add_development_dependency 'rubocop' spec.add_development_dependency 'simplecov' + spec.add_development_dependency 'yard' spec.add_runtime_dependency 'redis', '~> 3.0' spec.add_runtime_dependency 'resque', '~> 1.25' diff --git a/tasks/resque_scheduler.rake b/tasks/resque_scheduler.rake index a6f2b4b8..fb0a9300 100644 --- a/tasks/resque_scheduler.rake +++ b/tasks/resque_scheduler.rake @@ -1,2 +1,2 @@ $LOAD_PATH.unshift File.dirname(__FILE__) + '/../lib' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' diff --git a/test/cli_test.rb b/test/cli_test.rb index 5a0c8e06..d3d0c36e 100644 --- a/test/cli_test.rb +++ b/test/cli_test.rb @@ -3,7 +3,7 @@ context 'Cli' do def new_cli(argv = [], env = {}) - ResqueScheduler::Cli.new(argv, env) + Resque::Scheduler::Cli.new(argv, env) end test 'does not require any positional arguments' do @@ -248,6 +248,6 @@ def new_cli(argv = [], env = {}) test 'runs Resque::Scheduler' do Resque::Scheduler.expects(:run) - ResqueScheduler::Cli.run!([], {}) + Resque::Scheduler::Cli.run!([], {}) end end diff --git a/test/scheduler_args_test.rb b/test/scheduler_args_test.rb index f32702b9..600969ad 100644 --- a/test/scheduler_args_test.rb +++ b/test/scheduler_args_test.rb @@ -5,8 +5,11 @@ context 'scheduling jobs with arguments' do setup do Resque::Scheduler.clear_schedule! - Resque::Scheduler.dynamic = false - Resque::Scheduler.mute = true + Resque::Scheduler.configure do |c| + c.dynamic = false + c.mute = true + c.poll_sleep_amount = nil + end end test 'enqueue_from_config puts stuff in resque without class loaded' do diff --git a/test/scheduler_locking_test.rb b/test/scheduler_locking_test.rb index aefb5ca6..3bc8f7cd 100644 --- a/test/scheduler_locking_test.rb +++ b/test/scheduler_locking_test.rb @@ -9,7 +9,7 @@ def lock_is_not_held(lock) context '#master_lock_key' do setup do - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -25,7 +25,7 @@ def lock_is_not_held(lock) context 'with a prefix set via ENV' do setup do ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'my.prefix' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -43,7 +43,7 @@ def lock_is_not_held(lock) context 'with a namespace set for resque' do setup do Resque.redis.namespace = 'my.namespace' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -61,7 +61,7 @@ def lock_is_not_held(lock) setup do Resque.redis.namespace = 'my.namespace' ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'my.prefix' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -79,9 +79,9 @@ def lock_is_not_held(lock) end end -context 'Resque::SchedulerLocking' do +context 'Resque::Scheduler::Locking' do setup do - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do diff --git a/test/scheduler_task_test.rb b/test/scheduler_task_test.rb index c6545098..96dfd939 100644 --- a/test/scheduler_task_test.rb +++ b/test/scheduler_task_test.rb @@ -3,7 +3,10 @@ context 'Resque::Scheduler' do setup do - Resque::Scheduler.dynamic = false + Resque::Scheduler.configure do |c| + c.dynamic = false + c.poll_sleep_amount = 0.1 + end Resque.redis.flushall Resque::Scheduler.mute = true Resque::Scheduler.clear_schedule! @@ -21,7 +24,7 @@ test 'sending TERM to scheduler breaks out of poll_sleep' do Resque::Scheduler.expects(:release_master_lock!) fork do - sleep(0.5) + sleep(0.05) system("kill -TERM #{Process.ppid}") exit! end diff --git a/test/scheduler_test.rb b/test/scheduler_test.rb index e465b44b..6a753c2d 100644 --- a/test/scheduler_test.rb +++ b/test/scheduler_test.rb @@ -378,7 +378,7 @@ test 'adheres to lint' do assert_nothing_raised do Resque::Plugin.lint(Resque::Scheduler) - Resque::Plugin.lint(ResqueScheduler) + Resque::Plugin.lint(Resque::Scheduler::Extension) end end diff --git a/test/test_helper.rb b/test/test_helper.rb index aaf10cc0..6fe86d52 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -7,8 +7,8 @@ require 'resque' $LOAD_PATH.unshift File.dirname(File.expand_path(__FILE__)) + '/../lib' -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' unless ENV['RESQUE_SCHEDULER_DISABLE_TEST_REDIS_SERVER'] # Start our own Redis when the tests start. RedisInstance will take care of diff --git a/test/util_test.rb b/test/util_test.rb index f3782eee..8fdad2c9 100644 --- a/test/util_test.rb +++ b/test/util_test.rb @@ -2,10 +2,10 @@ context 'Util' do def util - ResqueScheduler::Util + Resque::Scheduler::Util end test 'constantizing' do - assert util.constantize('resque-scheduler') == ResqueScheduler + assert util.constantize('Resque::Scheduler') == Resque::Scheduler end end From 3525f407b72020577169e5b1ae780451633fbcbf Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Wed, 12 Feb 2014 02:05:00 -0500 Subject: [PATCH 02/12] Extracting some of Resque::Scheduler's functionality into modules although this is admittedly not an awesome solution. Needs more work. --- .rubocop.yml | 23 ++--- lib/resque/scheduler.rb | 125 ++++-------------------- lib/resque/scheduler/configuration.rb | 73 ++++++++++++++ lib/resque/scheduler/signal_handling.rb | 35 +++++++ test/scheduler_setup_test.rb | 18 ++-- test/test_helper.rb | 2 +- 6 files changed, 151 insertions(+), 125 deletions(-) create mode 100644 lib/resque/scheduler/configuration.rb create mode 100644 lib/resque/scheduler/signal_handling.rb diff --git a/.rubocop.yml b/.rubocop.yml index 0ddd6569..60b610bd 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,5 +1,5 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2014-02-09 12:18:33 -0500 using RuboCop version 0.18.1. +# on 2014-02-12 01:55:22 -0500 using RuboCop version 0.18.1. # The point is for the user to remove these configuration records # one by one as the offences are removed from the code base. # Note that changes in the inspected code, or installation of new @@ -9,30 +9,31 @@ AllCops: Includes: - Gemfile - Rakefile - - '*.gemspec' - - 'bin/*' + - resque-scheduler.gemspec + - bin/resque-scheduler # Offence count: 1 CaseEquality: Enabled: false -# Offence count: 1 +# Offence count: 2 # Configuration parameters: CountComments. ClassLength: - Max: 341 + Max: 130 -# Offence count: 4 +# Offence count: 3 CyclomaticComplexity: - Max: 24 + Max: 21 + +# Offence count: 29 +Documentation: + Enabled: false # Offence count: 17 # Configuration parameters: CountComments. MethodLength: - Max: 150 + Max: 145 # Offence count: 1 RescueException: Enabled: false - -Documentation: - Enabled: false diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index d207f4f5..75abae9b 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -1,99 +1,25 @@ # vim:fileencoding=utf-8 + require 'rufus/scheduler' +require_relative 'scheduler/configuration' require_relative 'scheduler/locking' require_relative 'scheduler/logger_builder' +require_relative 'scheduler/signal_handling' module Resque module Scheduler autoload :Cli, 'resque/scheduler/cli' autoload :Extension, 'resque/scheduler/extension' - extend Resque::Scheduler::Locking - - class << self - # Allows for block-style configuration - def configure - yield self - end - - attr_writer :signal_queue - - def signal_queue - @signal_queue ||= [] - end - - # Used in `#load_schedule_job` - attr_writer :env - - def env - return @env if @env - @env ||= Rails.env if defined?(Rails) - @env ||= ENV['RAILS_ENV'] - @env - end - - # If true, logs more stuff... - attr_writer :verbose - - def verbose - @verbose ||= !!ENV['VERBOSE'] - end - - # If set, produces no output - attr_writer :mute - - def mute - @mute ||= !!ENV['MUTE'] - end - - # If set, will write messages to the file - attr_writer :logfile - - def logfile - @logfile ||= ENV['LOGFILE'] - end - - # Sets whether to log in 'text' or 'json' - attr_writer :logformat - - def logformat - @logformat ||= ENV['LOGFORMAT'] - end - - # If set, will try to update the schedule in the loop - attr_writer :dynamic - - def dynamic - @dynamic ||= !!ENV['DYNAMIC_SCHEDULE'] - end - - # If set, will append the app name to procline - attr_writer :app_name - - def app_name - @app_name ||= ENV['APP_NAME'] - end - - # Amount of time in seconds to sleep between polls of the delayed - # queue. Defaults to 5 - attr_writer :poll_sleep_amount - - def poll_sleep_amount - @poll_sleep_amount ||= - Float(ENV.fetch('RESQUE_SCHEDULER_INTERVAL', '5')) - end + private - attr_writer :logger + extend Resque::Scheduler::Locking + extend Resque::Scheduler::Configuration + extend Resque::Scheduler::SignalHandling - def logger - @logger ||= Resque::Scheduler::LoggerBuilder.new( - mute: mute, - verbose: verbose, - log_dev: logfile, - format: logformat - ).build - end + public + class << self # the Rufus::Scheduler jobs that are scheduled attr_reader :scheduled_jobs @@ -139,28 +65,6 @@ def run end end - # For all signals, set the shutdown flag and wait for current - # poll/enqueing to finish (should be almost instant). In the - # case of sleeping, exit immediately. - def register_signal_handlers - %w(INT TERM USR1 USR2 QUIT).each do |sig| - trap(sig) { signal_queue << sig } - end - end - - def handle_signals - loop do - sig = signal_queue.shift - break unless sig - log! "Got #{sig} signal" - case sig - when 'INT', 'TERM', 'QUIT' then shutdown - when 'USR1' then print_schedule - when 'USR2' then reload_schedule! - end - end - end - def print_schedule if rufus_scheduler log! "Scheduling Info\tLast Run" @@ -464,6 +368,17 @@ def procline(string) private + attr_writer :logger + + def logger + @logger ||= Resque::Scheduler::LoggerBuilder.new( + mute: mute, + verbose: verbose, + log_dev: logfile, + format: logformat + ).build + end + def app_str app_name ? "[#{app_name}]" : '' end diff --git a/lib/resque/scheduler/configuration.rb b/lib/resque/scheduler/configuration.rb new file mode 100644 index 00000000..8543b55c --- /dev/null +++ b/lib/resque/scheduler/configuration.rb @@ -0,0 +1,73 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module Configuration + # Allows for block-style configuration + def configure + yield self + end + + # Used in `#load_schedule_job` + attr_writer :env + + def env + return @env if @env + @env ||= Rails.env if defined?(Rails) + @env ||= ENV['RAILS_ENV'] + @env + end + + # If true, logs more stuff... + attr_writer :verbose + + def verbose + @verbose ||= !!ENV['VERBOSE'] + end + + # If set, produces no output + attr_writer :mute + + def mute + @mute ||= !!ENV['MUTE'] + end + + # If set, will write messages to the file + attr_writer :logfile + + def logfile + @logfile ||= ENV['LOGFILE'] + end + + # Sets whether to log in 'text' or 'json' + attr_writer :logformat + + def logformat + @logformat ||= ENV['LOGFORMAT'] + end + + # If set, will try to update the schedule in the loop + attr_writer :dynamic + + def dynamic + @dynamic ||= !!ENV['DYNAMIC_SCHEDULE'] + end + + # If set, will append the app name to procline + attr_writer :app_name + + def app_name + @app_name ||= ENV['APP_NAME'] + end + + # Amount of time in seconds to sleep between polls of the delayed + # queue. Defaults to 5 + attr_writer :poll_sleep_amount + + def poll_sleep_amount + @poll_sleep_amount ||= + Float(ENV.fetch('RESQUE_SCHEDULER_INTERVAL', '5')) + end + end + end +end diff --git a/lib/resque/scheduler/signal_handling.rb b/lib/resque/scheduler/signal_handling.rb new file mode 100644 index 00000000..e779f022 --- /dev/null +++ b/lib/resque/scheduler/signal_handling.rb @@ -0,0 +1,35 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module SignalHandling + attr_writer :signal_queue + + def signal_queue + @signal_queue ||= [] + end + + # For all signals, set the shutdown flag and wait for current + # poll/enqueing to finish (should be almost instant). In the + # case of sleeping, exit immediately. + def register_signal_handlers + %w(INT TERM USR1 USR2 QUIT).each do |sig| + trap(sig) { signal_queue << sig } + end + end + + def handle_signals + loop do + sig = signal_queue.shift + break unless sig + log! "Got #{sig} signal" + case sig + when 'INT', 'TERM', 'QUIT' then shutdown + when 'USR1' then print_schedule + when 'USR2' then reload_schedule! + end + end + end + end + end +end diff --git a/test/scheduler_setup_test.rb b/test/scheduler_setup_test.rb index e757ed6f..bf7b6137 100644 --- a/test/scheduler_setup_test.rb +++ b/test/scheduler_setup_test.rb @@ -14,8 +14,8 @@ test 'set custom logger' do custom_logger = Logger.new('/dev/null') - Resque::Scheduler.logger = custom_logger - assert_equal(custom_logger, Resque::Scheduler.logger) + Resque::Scheduler.send(:logger=, custom_logger) + assert_equal(custom_logger, Resque::Scheduler.send(:logger)) end test 'configure block' do @@ -58,16 +58,17 @@ def wipe test 'uses STDOUT' do assert_equal( - Resque::Scheduler.logger.instance_variable_get(:@logdev).dev, $stdout + Resque::Scheduler.send(:logger) + .instance_variable_get(:@logdev).dev, $stdout ) end test 'not verbose' do - assert Resque::Scheduler.logger.level > Logger::DEBUG + assert Resque::Scheduler.send(:logger).level > Logger::DEBUG end test 'not muted' do - assert Resque::Scheduler.logger.level < Logger::FATAL + assert Resque::Scheduler.send(:logger).level < Logger::FATAL end end @@ -78,19 +79,20 @@ def wipe test 'uses logfile' do Resque::Scheduler.logfile = '/dev/null' assert_equal( - Resque::Scheduler.logger.instance_variable_get(:@logdev).filename, + Resque::Scheduler.send(:logger) + .instance_variable_get(:@logdev).filename, '/dev/null' ) end test 'set verbosity' do Resque::Scheduler.verbose = true - assert Resque::Scheduler.logger.level == Logger::DEBUG + assert Resque::Scheduler.send(:logger).level == Logger::DEBUG end test 'mute logger' do Resque::Scheduler.mute = true - assert Resque::Scheduler.logger.level == Logger::FATAL + assert Resque::Scheduler.send(:logger).level == Logger::FATAL end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 6fe86d52..77c034d4 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -108,7 +108,7 @@ def nullify_logger c.mute = nil c.verbose = nil c.logfile = nil - c.logger = nil + c.send(:logger=, nil) end ENV['LOGFILE'] = nil From faee8b454c5e78f57f3c9abf9a71f94129de2a62 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Wed, 12 Feb 2014 08:40:59 -0500 Subject: [PATCH 03/12] Updating ROADMAP post-rebase --- ROADMAP.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 2e102190..cf31f6a5 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -39,8 +39,12 @@ v2.6.0 TODO for v3.0.0 =============== -- Clean up all RuboCop offences, breaking the API if necessary -- Get to at least 95% test coverage +- [x] Clean up all RuboCop offences, breaking the API if necessary +- [x] Get to at least 95% test coverage +- [x] Collapse `lib/resque_scheduler` into `lib/resque/scheduler` +- [ ] Reduce public API on `Resque::Scheduler::Extension` to fewer than 10 methods +- [ ] Reduce public API on `Resque::Scheduler` to fewer than 10 methods +- [ ] Anything else? TODO for v3.1.0 =============== From 812b0ed29e0ccd303f32fc95427d08907933f972 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 17 Feb 2014 22:57:43 -0500 Subject: [PATCH 04/12] Correcting the forward-porting of #365 as it was (whoopsie!) lost in a rebase. --- lib/resque/scheduler/extension.rb | 9 +++++++++ test/delayed_queue_test.rb | 8 ++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/resque/scheduler/extension.rb b/lib/resque/scheduler/extension.rb index 6564252c..7f922b04 100644 --- a/lib/resque/scheduler/extension.rb +++ b/lib/resque/scheduler/extension.rb @@ -347,6 +347,15 @@ def count_all_scheduled_jobs total_jobs end + # Discover if a job has been delayed. + # Examples + # Resque.delayed?(MyJob) + # Resque.delayed?(MyJob, id: 1) + # Returns true if the job has been delayed + def delayed?(klass, *args) + !scheduled_at(klass, *args).empty? + end + # Returns delayed jobs schedule timestamp for +klass+, +args+. def scheduled_at(klass, *args) search = encode(job_to_hash(klass, args)) diff --git a/test/delayed_queue_test.rb b/test/delayed_queue_test.rb index d99f468b..cafde507 100644 --- a/test/delayed_queue_test.rb +++ b/test/delayed_queue_test.rb @@ -534,10 +534,10 @@ Resque.enqueue_at Time.now + 1, SomeIvarJob Resque.enqueue_at Time.now + 1, SomeIvarJob, id: 1 - assert(Resque.delayed?(SomeIvarJob, id: 1)) - assert(!Resque.delayed?(SomeIvarJob, id: 2)) - assert(Resque.delayed?(SomeIvarJob)) - assert(!Resque.delayed?(SomeJob)) + assert Resque.delayed?(SomeIvarJob, id: 1) + assert !Resque.delayed?(SomeIvarJob, id: 2) + assert Resque.delayed?(SomeIvarJob) + assert !Resque.delayed?(SomeJob) end From 4a76c72076690e1fb1b2840024a1558ef04eb9b1 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 17 Feb 2014 23:04:36 -0500 Subject: [PATCH 05/12] TRIVIAL removing some extra blank lines --- test/delayed_queue_test.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/delayed_queue_test.rb b/test/delayed_queue_test.rb index cafde507..e55146c1 100644 --- a/test/delayed_queue_test.rb +++ b/test/delayed_queue_test.rb @@ -538,7 +538,5 @@ assert !Resque.delayed?(SomeIvarJob, id: 2) assert Resque.delayed?(SomeIvarJob) assert !Resque.delayed?(SomeJob) - end - end From 05ac3617002579bcd9b8f3b3aa5c099f929d0b04 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Tue, 18 Feb 2014 00:08:24 -0500 Subject: [PATCH 06/12] Extracting helpers and server methods into modules both for complexity reasons and so that RuboCop can properly analyze the methods. --- lib/resque/scheduler/server.rb | 304 +++++++++++++++++---------------- 1 file changed, 161 insertions(+), 143 deletions(-) diff --git a/lib/resque/scheduler/server.rb b/lib/resque/scheduler/server.rb index 4d9f9954..dd01b332 100644 --- a/lib/resque/scheduler/server.rb +++ b/lib/resque/scheduler/server.rb @@ -9,181 +9,199 @@ module Scheduler module Server def self.included(base) base.class_eval do - helpers do - def format_time(t) - t.strftime('%Y-%m-%d %H:%M:%S %z') - end + helpers { include HelperMethods } + include ServerMethods - def queue_from_class_name(class_name) - Resque.queue_from_class( - Resque::Scheduler::Util.constantize(class_name) - ) - end + get('/schedule') { schedule } + post('/schedule/requeue') { schedule_requeue } + post('/schedule/requeue_with_params') do + schedule_requeue_with_params + end + get('/delayed') { delayed } + get('/delayed/jobs/:klass') { delayed_jobs_klass } + post('/delayed/search') { delayed_search } + get('/delayed/:timestamp') { delayed_timestamp } + post('/delayed/queue_now') { delayed_queue_now } + post('/delayed/cancel_now') { delayed_cancel_now } + post('/delayed/clear') { delayed_clear } + end + end - def find_job(worker) - worker = worker.downcase - results = Array.new + module ServerMethods + def schedule + Resque.reload_schedule! if Resque::Scheduler.dynamic + erb scheduler_template('scheduler') + end - # Check working jobs - working = [*Resque.working] - work = working.select do |w| - w.job && w.job['payload'] && - w.job['payload']['class'].downcase.include?(worker) - end - work.each do |w| - results += [ - w.job['payload'].merge( - 'queue' => w.job['queue'], 'where_at' => 'working' - ) - ] - end + def schedule_requeue + @job_name = params['job_name'] || params[:job_name] + config = Resque.schedule[@job_name] + @parameters = config['parameters'] || config[:parameters] + if @parameters + erb scheduler_template('requeue-params') + else + Resque::Scheduler.enqueue_from_config(config) + redirect u('/overview') + end + end - # Check delayed Jobs - dels = Array.new - schedule_size = Resque.delayed_queue_schedule_size - Resque.delayed_queue_peek(0, schedule_size).each do |d| - Resque.delayed_timestamp_peek( - d, 0, Resque.delayed_timestamp_size(d)).each do |j| - dels << j.merge!('timestamp' => d) - end - end - results += dels.select do |j| - j['class'].downcase.include?(worker) && - j.merge!('where_at' => 'delayed') - end + def schedule_requeue_with_params + job_name = params['job_name'] || params[:job_name] + config = Resque.schedule[job_name] + # Build args hash from post data (removing the job name) + submitted_args = params.reject do |key, value| + key == 'job_name' || key == :job_name + end - # Check Queues - Resque.queues.each do |queue| - queued = Resque.peek(queue, 0, Resque.size(queue)) - queued = [queued] unless queued.is_a?(Array) - results += queued.select do |j| - j['class'].downcase.include?(worker) && - j.merge!('queue' => queue, 'where_at' => 'queued') - end - end - results - end + # Merge constructed args hash with existing args hash for + # the job, if it exists + config_args = config['args'] || config[:args] || {} + config_args = config_args.merge(submitted_args) - def schedule_interval(config) - if config['every'] - schedule_interval_every(config['every']) - elsif config['cron'] - 'cron: ' + config['cron'].to_s - else - 'Not currently scheduled' - end - end + # Insert the args hash into config and queue the resque job + config = config.merge('args' => config_args) + Resque::Scheduler.enqueue_from_config(config) + redirect u('/overview') + end - def schedule_interval_every(every) - every = [*every] - s = 'every: ' << every.first + def delayed + erb scheduler_template('delayed') + end - return s unless every.length > 1 + def delayed_jobs_klass + begin + klass = Resque::Scheduler::Util.constantize(params[:klass]) + @args = JSON.load(URI.decode(params[:args])) + @timestamps = Resque.scheduled_at(klass, *@args) + rescue + @timestamps = [] + end - s << ' (' - meta = every.last.map do |key, value| - "#{key.to_s.gsub(/_/, ' ')} #{value}" - end - s << meta.join(', ') << ')' - end + erb scheduler_template('delayed_schedules') + end - def schedule_class(config) - if config['class'].nil? && !config['custom_job_class'].nil? - config['custom_job_class'] - else - config['class'] - end - end + def delayed_search + @jobs = find_job(params[:search]) + erb scheduler_template('search') + end - def scheduler_template(name) - File.read( - File.expand_path("../server/views/#{name}.erb", __FILE__) - ) - end - end + def delayed_timestamp + erb scheduler_template('delayed_timestamp') + end - get '/schedule' do - Resque.reload_schedule! if Resque::Scheduler.dynamic - erb scheduler_template('scheduler') + def delayed_queue_now + timestamp = params['timestamp'].to_i + if timestamp > 0 + Resque::Scheduler.enqueue_delayed_items_for_timestamp(timestamp) end + redirect u('/overview') + end - post '/schedule/requeue' do - @job_name = params['job_name'] || params[:job_name] - config = Resque.schedule[@job_name] - @parameters = config['parameters'] || config[:parameters] - if @parameters - erb scheduler_template('requeue-params') - else - Resque::Scheduler.enqueue_from_config(config) - redirect u('/overview') - end - end + def delayed_cancel_now + klass = Resque::Scheduler::Util.constantize(params['klass']) + timestamp = params['timestamp'] + args = Resque.decode params['args'] + Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args) + redirect u('/delayed') + end - post '/schedule/requeue_with_params' do - job_name = params['job_name'] || params[:job_name] - config = Resque.schedule[job_name] - # Build args hash from post data (removing the job name) - submitted_args = params.reject do |key, value| - key == 'job_name' || key == :job_name - end + def delayed_clear + Resque.reset_delayed_queue + redirect u('delayed') + end + end - # Merge constructed args hash with existing args hash for - # the job, if it exists - config_args = config['args'] || config[:args] || {} - config_args = config_args.merge(submitted_args) + module HelperMethods + def format_time(t) + t.strftime('%Y-%m-%d %H:%M:%S %z') + end - # Insert the args hash into config and queue the resque job - config = config.merge('args' => config_args) - Resque::Scheduler.enqueue_from_config(config) - redirect u('/overview') + def queue_from_class_name(class_name) + Resque.queue_from_class( + Resque::Scheduler::Util.constantize(class_name) + ) + end + + def find_job(worker) + worker = worker.downcase + results = [] + + # Check working jobs + working = [*Resque.working] + work = working.select do |w| + w.job && w.job['payload'] && + w.job['payload']['class'].downcase.include?(worker) + end + work.each do |w| + results += [ + w.job['payload'].merge( + 'queue' => w.job['queue'], 'where_at' => 'working' + ) + ] end - get '/delayed' do - erb scheduler_template('delayed') + # Check delayed Jobs + dels = [] + schedule_size = Resque.delayed_queue_schedule_size + Resque.delayed_queue_peek(0, schedule_size).each do |d| + Resque.delayed_timestamp_peek( + d, 0, Resque.delayed_timestamp_size(d)).each do |j| + dels << j.merge!('timestamp' => d) + end + end + results += dels.select do |j| + j['class'].downcase.include?(worker) && + j.merge!('where_at' => 'delayed') end - get '/delayed/jobs/:klass' do - begin - klass = Resque::Scheduler::Util.constantize(params[:klass]) - @args = JSON.load(URI.decode(params[:args])) - @timestamps = Resque.scheduled_at(klass, *@args) - rescue - @timestamps = [] + # Check Queues + Resque.queues.each do |queue| + queued = Resque.peek(queue, 0, Resque.size(queue)) + queued = [queued] unless queued.is_a?(Array) + results += queued.select do |j| + j['class'].downcase.include?(worker) && + j.merge!('queue' => queue, 'where_at' => 'queued') end - - erb scheduler_template('delayed_schedules') end + results + end - post '/delayed/search' do - @jobs = find_job(params[:search]) - erb scheduler_template('search') + def schedule_interval(config) + if config['every'] + schedule_interval_every(config['every']) + elsif config['cron'] + 'cron: ' + config['cron'].to_s + else + 'Not currently scheduled' end + end - get '/delayed/:timestamp' do - erb scheduler_template('delayed_timestamp') - end + def schedule_interval_every(every) + every = [*every] + s = 'every: ' << every.first - post '/delayed/queue_now' do - timestamp = params['timestamp'].to_i - if timestamp > 0 - Resque::Scheduler.enqueue_delayed_items_for_timestamp(timestamp) - end - redirect u('/overview') - end + return s unless every.length > 1 - post '/delayed/cancel_now' do - klass = Resque::Scheduler::Util.constantize(params['klass']) - timestamp = params['timestamp'] - args = Resque.decode params['args'] - Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args) - redirect u('/delayed') + s << ' (' + meta = every.last.map do |key, value| + "#{key.to_s.gsub(/_/, ' ')} #{value}" end + s << meta.join(', ') << ')' + end - post '/delayed/clear' do - Resque.reset_delayed_queue - redirect u('delayed') + def schedule_class(config) + if config['class'].nil? && !config['custom_job_class'].nil? + config['custom_job_class'] + else + config['class'] end end + + def scheduler_template(name) + File.read( + File.expand_path("../server/views/#{name}.erb", __FILE__) + ) + end end end end From 528579929094c0f3533fff094255d17b1319812b Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Tue, 18 Feb 2014 00:26:33 -0500 Subject: [PATCH 07/12] Breaking up the `#find_job` helper method a bit --- lib/resque/scheduler/server.rb | 59 ++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/lib/resque/scheduler/server.rb b/lib/resque/scheduler/server.rb index dd01b332..f697464b 100644 --- a/lib/resque/scheduler/server.rb +++ b/lib/resque/scheduler/server.rb @@ -124,37 +124,14 @@ def queue_from_class_name(class_name) def find_job(worker) worker = worker.downcase - results = [] + results = working_jobs_for_worker(worker) - # Check working jobs - working = [*Resque.working] - work = working.select do |w| - w.job && w.job['payload'] && - w.job['payload']['class'].downcase.include?(worker) - end - work.each do |w| - results += [ - w.job['payload'].merge( - 'queue' => w.job['queue'], 'where_at' => 'working' - ) - ] - end - - # Check delayed Jobs - dels = [] - schedule_size = Resque.delayed_queue_schedule_size - Resque.delayed_queue_peek(0, schedule_size).each do |d| - Resque.delayed_timestamp_peek( - d, 0, Resque.delayed_timestamp_size(d)).each do |j| - dels << j.merge!('timestamp' => d) - end - end + dels = delayed_jobs_for_worker(worker) results += dels.select do |j| j['class'].downcase.include?(worker) && j.merge!('where_at' => 'delayed') end - # Check Queues Resque.queues.each do |queue| queued = Resque.peek(queue, 0, Resque.size(queue)) queued = [queued] unless queued.is_a?(Array) @@ -163,6 +140,7 @@ def find_job(worker) j.merge!('queue' => queue, 'where_at' => 'queued') end end + results end @@ -202,6 +180,37 @@ def scheduler_template(name) File.expand_path("../server/views/#{name}.erb", __FILE__) ) end + + private + + def working_jobs_for_worker(worker) + [].tap do |results| + working = [*Resque.working] + work = working.select do |w| + w.job && w.job['payload'] && + w.job['payload']['class'].downcase.include?(worker) + end + work.each do |w| + results += [ + w.job['payload'].merge( + 'queue' => w.job['queue'], 'where_at' => 'working' + ) + ] + end + end + end + + def delayed_jobs_for_worker(worker) + [].tap do |dels| + schedule_size = Resque.delayed_queue_schedule_size + Resque.delayed_queue_peek(0, schedule_size).each do |d| + Resque.delayed_timestamp_peek( + d, 0, Resque.delayed_timestamp_size(d)).each do |j| + dels << j.merge!('timestamp' => d) + end + end + end + end end end end From 49bc40b9e05eb8495bbefb90cbdf843e432420dd Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Tue, 18 Feb 2014 00:42:13 -0500 Subject: [PATCH 08/12] Breaking up some complex methods in cli, collapsing MUTE/QUIET -> QUIET --- README.md | 10 +-- lib/resque/scheduler.rb | 2 +- lib/resque/scheduler/cli.rb | 87 +++++++++++++++----------- lib/resque/scheduler/configuration.rb | 6 +- lib/resque/scheduler/logger_builder.rb | 10 +-- test/cli_test.rb | 16 ++--- test/delayed_queue_test.rb | 2 +- test/scheduler_args_test.rb | 2 +- test/scheduler_setup_test.rb | 12 ++-- test/scheduler_task_test.rb | 2 +- test/scheduler_test.rb | 2 +- test/test_helper.rb | 4 +- 12 files changed, 86 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index caeec3a3..8fdab7a0 100644 --- a/README.md +++ b/README.md @@ -145,8 +145,8 @@ scheduled job must run (coerced with `Kernel#Float()`) (default `5`) * `LOGFORMAT` - Log output format to use (either `'text'` or `'json'`, default `'text'`) * `PIDFILE` - If non-empty, write process PID to file (default empty) -* `QUIET` or `MUTE` - Silence most output if non-empty (equivalent to -a level of `Logger::FATAL`, default `false`) +* `QUIET` - Silence most output if non-empty (equivalent to a level of +`Logger::FATAL`, default `false`) * `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level of `Logger::DEBUG`, default `false`) @@ -534,8 +534,8 @@ worker is started. There are several options to toggle the way scheduler logs its actions. They are toggled by environment variables: - - `MUTE` will stop logging anything. Completely silent. - - `VERBOSE` opposite of 'mute'; will log even debug information + - `QUIET` will stop logging anything. Completely silent. + - `VERBOSE` opposite of 'QUIET'; will log even debug information - `LOGFILE` specifies the file to write logs to. (default standard output) - `LOGFORMAT` specifies either "text" or "json" output format (default "text") @@ -545,7 +545,7 @@ values: ```ruby Resque::Scheduler.configure do |c| - c.mute = false + c.quiet = false c.verbose = false c.logfile = nil # meaning all messages go to $stdout c.logformat = 'text' diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 75abae9b..d05b8dc3 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -372,7 +372,7 @@ def procline(string) def logger @logger ||= Resque::Scheduler::LoggerBuilder.new( - mute: mute, + quiet: quiet, verbose: verbose, log_dev: logfile, format: logformat diff --git a/lib/resque/scheduler/cli.rb b/lib/resque/scheduler/cli.rb index 9915d3b4..3ad4bcef 100644 --- a/lib/resque/scheduler/cli.rb +++ b/lib/resque/scheduler/cli.rb @@ -53,9 +53,8 @@ class Cli callback: ->(options) { ->(p) { options[:pidfile] = p } } }, { - args: ['-q', '--quiet', - 'Run with minimal output [QUIET] (or [MUTE])'], - callback: ->(options) { ->(q) { options[:mute] = q } } + args: ['-q', '--quiet', 'Run with minimal output [QUIET]'], + callback: ->(options) { ->(q) { options[:quiet] = q } } }, { args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'], @@ -84,12 +83,7 @@ def pre_run end def parse_options - OptionParser.new do |opts| - opts.banner = BANNER - OPTIONS.each do |opt| - opts.on(*opt[:args], &(opt[:callback].call(options))) - end - end.parse!(argv.dup) + option_parser.parse!(argv.dup) end def pre_setup @@ -104,23 +98,70 @@ def setup_env require 'resque' require 'resque/scheduler' + setup_backgrounding + setup_pid_file + setup_scheduler_configuration + end + + def run_forever + Resque::Scheduler.run + end + + private + + attr_reader :argv, :env + + def option_parser + OptionParser.new do |opts| + opts.banner = BANNER + OPTIONS.each do |opt| + opts.on(*opt[:args], &(opt[:callback].call(options))) + end + end + end + + OPTIONS_ENV_MAPPING = { + app_name: 'APP_NAME', + background: 'BACKGROUND', + dynamic: 'DYNAMIC_SCHEDULE', + env: 'RAILS_ENV', + initializer_path: 'INITIALIZER_PATH', + logfile: 'LOGFILE', + logformat: 'LOGFORMAT', + quiet: 'QUIET', + pidfile: 'PIDFILE', + poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL', + verbose: 'VERBOSE' + } + + def options + @options ||= {}.tap do |o| + OPTIONS_ENV_MAPPING.map { |key, envvar| o[key] = env[envvar] } + end + end + + def setup_backgrounding # Need to set this here for conditional Process.daemon redirect of # stderr/stdout to /dev/null - Resque::Scheduler.mute = !!options[:mute] + Resque::Scheduler.quiet = !!options[:quiet] if options[:background] unless Process.respond_to?('daemon') abort 'background option is set, which requires ruby >= 1.9' end - Process.daemon(true, !Resque::Scheduler.mute) + Process.daemon(true, !Resque::Scheduler.quiet) Resque.redis.client.reconnect end + end + def setup_pid_file File.open(options[:pidfile], 'w') do |f| f.puts $PROCESS_ID end if options[:pidfile] + end + def setup_scheduler_configuration Resque::Scheduler.configure do |c| # These settings are somewhat redundant given the defaults present # in the attr reader methods. They are left here for clarity and @@ -135,30 +176,6 @@ def setup_env c.verbose = !!options[:verbose] end end - - def run_forever - Resque::Scheduler.run - end - - private - - attr_reader :argv, :env - - def options - @options ||= { - app_name: env['APP_NAME'], - background: env['BACKGROUND'], - dynamic: env['DYNAMIC_SCHEDULE'], - env: env['RAILS_ENV'], - initializer_path: env['INITIALIZER_PATH'], - logfile: env['LOGFILE'], - logformat: env['LOGFORMAT'], - mute: env['MUTE'] || env['QUIET'], - pidfile: env['PIDFILE'], - poll_sleep_amount: env['RESQUE_SCHEDULER_INTERVAL'], - verbose: env['VERBOSE'] - } - end end end end diff --git a/lib/resque/scheduler/configuration.rb b/lib/resque/scheduler/configuration.rb index 8543b55c..b2cccf07 100644 --- a/lib/resque/scheduler/configuration.rb +++ b/lib/resque/scheduler/configuration.rb @@ -26,10 +26,10 @@ def verbose end # If set, produces no output - attr_writer :mute + attr_writer :quiet - def mute - @mute ||= !!ENV['MUTE'] + def quiet + @quiet ||= !!ENV['QUIET'] end # If set, will write messages to the file diff --git a/lib/resque/scheduler/logger_builder.rb b/lib/resque/scheduler/logger_builder.rb index d11981a4..58a3ee84 100644 --- a/lib/resque/scheduler/logger_builder.rb +++ b/lib/resque/scheduler/logger_builder.rb @@ -10,7 +10,7 @@ class LoggerBuilder # Initializes new instance of the builder # # Pass :opts Hash with - # - :mute if logger needs to be silent for all levels. Default - false + # - :quiet if logger needs to be silent for all levels. Default - false # - :verbose if there is a need in debug messages. Default - false # - :log_dev to output logs into a desired file. Default - STDOUT # - :format log format, either 'text' or 'json'. Default - 'text' @@ -18,10 +18,10 @@ class LoggerBuilder # Example: # # LoggerBuilder.new( - # :mute => false, :verbose => true, :log_dev => 'log/scheduler.log' + # :quiet => false, :verbose => true, :log_dev => 'log/scheduler.log' # ) def initialize(opts = {}) - @muted = !!opts[:mute] + @quiet = !!opts[:quiet] @verbose = !!opts[:verbose] @log_dev = opts[:log_dev] || $stdout @format = opts[:format] || 'text' @@ -38,9 +38,9 @@ def build private def level - if @verbose && !@muted + if @verbose && !@quiet Logger::DEBUG - elsif !@muted + elsif !@quiet Logger::INFO else Logger::FATAL diff --git a/test/cli_test.rb b/test/cli_test.rb index d3d0c36e..010c4cfb 100644 --- a/test/cli_test.rb +++ b/test/cli_test.rb @@ -158,25 +158,25 @@ def new_cli(argv = [], env = {}) cli.pre_run end - test 'initializes mute/quiet from the env' do + test 'initializes quiet from the env' do cli = new_cli([], 'QUIET' => '1') - assert_equal('1', cli.send(:options)[:mute]) + assert_equal('1', cli.send(:options)[:quiet]) end - test 'defaults to unmuted' do - assert_equal(false, !!new_cli.send(:options)[:mute]) + test 'defaults to un-quieted' do + assert_equal(false, !!new_cli.send(:options)[:quiet]) end - test 'accepts mute/quiet via -q' do + test 'accepts quiet via -q' do cli = new_cli(%w(-q)) cli.parse_options - assert_equal(true, cli.send(:options)[:mute]) + assert_equal(true, cli.send(:options)[:quiet]) end - test 'accepts mute via --quiet' do + test 'accepts quiet via --quiet' do cli = new_cli(%w(--quiet)) cli.parse_options - assert_equal(true, cli.send(:options)[:mute]) + assert_equal(true, cli.send(:options)[:quiet]) end test 'initializes logfile from the env' do diff --git a/test/delayed_queue_test.rb b/test/delayed_queue_test.rb index e55146c1..ba6eb7f3 100644 --- a/test/delayed_queue_test.rb +++ b/test/delayed_queue_test.rb @@ -3,7 +3,7 @@ context 'DelayedQueue' do setup do - Resque::Scheduler.mute = true + Resque::Scheduler.quiet = true Resque.redis.flushall end diff --git a/test/scheduler_args_test.rb b/test/scheduler_args_test.rb index 600969ad..c10cf3f9 100644 --- a/test/scheduler_args_test.rb +++ b/test/scheduler_args_test.rb @@ -7,7 +7,7 @@ Resque::Scheduler.clear_schedule! Resque::Scheduler.configure do |c| c.dynamic = false - c.mute = true + c.quiet = true c.poll_sleep_amount = nil end end diff --git a/test/scheduler_setup_test.rb b/test/scheduler_setup_test.rb index bf7b6137..351a4e72 100644 --- a/test/scheduler_setup_test.rb +++ b/test/scheduler_setup_test.rb @@ -19,11 +19,11 @@ end test 'configure block' do - Resque::Scheduler.mute = false + Resque::Scheduler.quiet = false Resque::Scheduler.configure do |c| - c.mute = true + c.quiet = true end - assert_equal(Resque::Scheduler.mute, true) + assert_equal(Resque::Scheduler.quiet, true) end context 'when getting the env' do @@ -67,7 +67,7 @@ def wipe assert Resque::Scheduler.send(:logger).level > Logger::DEBUG end - test 'not muted' do + test 'not quieted' do assert Resque::Scheduler.send(:logger).level < Logger::FATAL end end @@ -90,8 +90,8 @@ def wipe assert Resque::Scheduler.send(:logger).level == Logger::DEBUG end - test 'mute logger' do - Resque::Scheduler.mute = true + test 'quiet logger' do + Resque::Scheduler.quiet = true assert Resque::Scheduler.send(:logger).level == Logger::FATAL end end diff --git a/test/scheduler_task_test.rb b/test/scheduler_task_test.rb index 96dfd939..5f0b6fca 100644 --- a/test/scheduler_task_test.rb +++ b/test/scheduler_task_test.rb @@ -8,7 +8,7 @@ c.poll_sleep_amount = 0.1 end Resque.redis.flushall - Resque::Scheduler.mute = true + Resque::Scheduler.quiet = true Resque::Scheduler.clear_schedule! Resque::Scheduler.send(:instance_variable_set, :@scheduled_jobs, {}) Resque::Scheduler.send(:instance_variable_set, :@shutdown, false) diff --git a/test/scheduler_test.rb b/test/scheduler_test.rb index 6a753c2d..0e956c5f 100644 --- a/test/scheduler_test.rb +++ b/test/scheduler_test.rb @@ -5,7 +5,7 @@ setup do Resque::Scheduler.configure do |c| c.dynamic = false - c.mute = true + c.quiet = true c.env = nil c.app_name = nil end diff --git a/test/test_helper.rb b/test/test_helper.rb index 77c034d4..e2130491 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -95,7 +95,7 @@ def self.perform(*args) DYNAMIC_SCHEDULE LOGFILE LOGFORMAT - MUTE + QUIET RAILS_ENV RESQUE_SCHEDULER_INTERVAL VERBOSE @@ -105,7 +105,7 @@ def self.perform(*args) def nullify_logger Resque::Scheduler.configure do |c| - c.mute = nil + c.quiet = nil c.verbose = nil c.logfile = nil c.send(:logger=, nil) From 1a5212b9032816493b2b9b6dc953a7b1cc8e879f Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Tue, 18 Feb 2014 00:58:19 -0500 Subject: [PATCH 09/12] Extracting runtime env bits into separate class --- lib/resque/scheduler/cli.rb | 79 ++++++++++--------------------------- lib/resque/scheduler/env.rb | 61 ++++++++++++++++++++++++++++ test/cli_test.rb | 41 ++++--------------- test/env_test.rb | 41 +++++++++++++++++++ 4 files changed, 130 insertions(+), 92 deletions(-) create mode 100644 lib/resque/scheduler/env.rb create mode 100644 test/env_test.rb diff --git a/lib/resque/scheduler/cli.rb b/lib/resque/scheduler/cli.rb index 3ad4bcef..6d37ab19 100644 --- a/lib/resque/scheduler/cli.rb +++ b/lib/resque/scheduler/cli.rb @@ -4,6 +4,20 @@ module Resque module Scheduler + CLI_OPTIONS_ENV_MAPPING = { + app_name: 'APP_NAME', + background: 'BACKGROUND', + dynamic: 'DYNAMIC_SCHEDULE', + env: 'RAILS_ENV', + initializer_path: 'INITIALIZER_PATH', + logfile: 'LOGFILE', + logformat: 'LOGFORMAT', + quiet: 'QUIET', + pidfile: 'PIDFILE', + poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL', + verbose: 'VERBOSE' + } + class Cli BANNER = <<-EOF.gsub(/ {6}/, '') Usage: resque-scheduler [options] @@ -95,12 +109,8 @@ def pre_setup end def setup_env - require 'resque' - require 'resque/scheduler' - - setup_backgrounding - setup_pid_file - setup_scheduler_configuration + require_relative 'env' + runtime_env.setup end def run_forever @@ -111,6 +121,10 @@ def run_forever attr_reader :argv, :env + def runtime_env + Resque::Scheduler::Env.new(options) + end + def option_parser OptionParser.new do |opts| opts.banner = BANNER @@ -120,60 +134,9 @@ def option_parser end end - OPTIONS_ENV_MAPPING = { - app_name: 'APP_NAME', - background: 'BACKGROUND', - dynamic: 'DYNAMIC_SCHEDULE', - env: 'RAILS_ENV', - initializer_path: 'INITIALIZER_PATH', - logfile: 'LOGFILE', - logformat: 'LOGFORMAT', - quiet: 'QUIET', - pidfile: 'PIDFILE', - poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL', - verbose: 'VERBOSE' - } - def options @options ||= {}.tap do |o| - OPTIONS_ENV_MAPPING.map { |key, envvar| o[key] = env[envvar] } - end - end - - def setup_backgrounding - # Need to set this here for conditional Process.daemon redirect of - # stderr/stdout to /dev/null - Resque::Scheduler.quiet = !!options[:quiet] - - if options[:background] - unless Process.respond_to?('daemon') - abort 'background option is set, which requires ruby >= 1.9' - end - - Process.daemon(true, !Resque::Scheduler.quiet) - Resque.redis.client.reconnect - end - end - - def setup_pid_file - File.open(options[:pidfile], 'w') do |f| - f.puts $PROCESS_ID - end if options[:pidfile] - end - - def setup_scheduler_configuration - Resque::Scheduler.configure do |c| - # These settings are somewhat redundant given the defaults present - # in the attr reader methods. They are left here for clarity and - # to serve as an example of how to use `.configure`. - - c.app_name = options[:app_name] - c.dynamic = !!options[:dynamic] - c.env = options[:env] - c.logfile = options[:logfile] - c.logformat = options[:logformat] - c.poll_sleep_amount = Float(options[:poll_sleep_amount] || '5') - c.verbose = !!options[:verbose] + CLI_OPTIONS_ENV_MAPPING.map { |key, envvar| o[key] = env[envvar] } end end end diff --git a/lib/resque/scheduler/env.rb b/lib/resque/scheduler/env.rb new file mode 100644 index 00000000..61945bcc --- /dev/null +++ b/lib/resque/scheduler/env.rb @@ -0,0 +1,61 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + class Env + def initialize(options) + @options = options + end + + def setup + require 'resque' + require 'resque/scheduler' + + setup_backgrounding + setup_pid_file + setup_scheduler_configuration + end + + private + + attr_reader :options + + def setup_backgrounding + # Need to set this here for conditional Process.daemon redirect of + # stderr/stdout to /dev/null + Resque::Scheduler.quiet = !!options[:quiet] + + if options[:background] + unless Process.respond_to?('daemon') + abort 'background option is set, which requires ruby >= 1.9' + end + + Process.daemon(true, !Resque::Scheduler.quiet) + Resque.redis.client.reconnect + end + end + + def setup_pid_file + File.open(options[:pidfile], 'w') do |f| + f.puts $PROCESS_ID + end if options[:pidfile] + end + + def setup_scheduler_configuration + Resque::Scheduler.configure do |c| + # These settings are somewhat redundant given the defaults present + # in the attr reader methods. They are left here for clarity and + # to serve as an example of how to use `.configure`. + + c.app_name = options[:app_name] + c.dynamic = !!options[:dynamic] + c.env = options[:env] + c.logfile = options[:logfile] + c.logformat = options[:logformat] + c.poll_sleep_amount = Float(options[:poll_sleep_amount] || '5') + c.verbose = !!options[:verbose] + end + end + end + end +end diff --git a/test/cli_test.rb b/test/cli_test.rb index 010c4cfb..418d9a53 100644 --- a/test/cli_test.rb +++ b/test/cli_test.rb @@ -2,8 +2,14 @@ require_relative 'test_helper' context 'Cli' do + def mock_runtime_env + mock.tap { |m| m.stubs(:setup) } + end + def new_cli(argv = [], env = {}) - Resque::Scheduler::Cli.new(argv, env) + Resque::Scheduler::Cli.new(argv, env).tap do |cli| + cli.stubs(:runtime_env).returns(mock_runtime_env) + end end test 'does not require any positional arguments' do @@ -52,31 +58,6 @@ def new_cli(argv = [], env = {}) assert_equal(true, cli.send(:options)[:background]) end - test 'daemonizes when background is true' do - Process.expects(:daemon) - cli = new_cli(%w(--background)) - cli.pre_run - end - - test 'reconnects redis when background is true' do - Process.stubs(:daemon) - mock_redis_client = mock('redis_client') - mock_redis = mock('redis') - mock_redis.expects(:client).returns(mock_redis_client) - mock_redis_client.expects(:reconnect) - Resque.expects(:redis).returns(mock_redis) - cli = new_cli(%w(--background)) - cli.pre_run - end - - test 'aborts when background is given and Process does not support daemon' do - Process.stubs(:daemon) - Process.expects(:respond_to?).with('daemon').returns(false) - cli = new_cli(%w(--background)) - cli.expects(:abort) - cli.pre_run - end - test 'initializes pidfile from the env' do cli = new_cli([], 'PIDFILE' => 'bar') assert_equal('bar', cli.send(:options)[:pidfile]) @@ -98,14 +79,6 @@ def new_cli(argv = [], env = {}) assert_equal('foo', cli.send(:options)[:pidfile]) end - test 'writes pid to pidfile when given' do - mock_pidfile = mock('pidfile') - mock_pidfile.expects(:puts) - File.expects(:open).with('derp.pid', 'w').yields(mock_pidfile) - cli = new_cli(%w(--pidfile derp.pid)) - cli.pre_run - end - test 'defaults to nil dynamic' do assert_equal(nil, new_cli.send(:options)[:dynamic]) end diff --git a/test/env_test.rb b/test/env_test.rb new file mode 100644 index 00000000..01b543b9 --- /dev/null +++ b/test/env_test.rb @@ -0,0 +1,41 @@ +# vim:fileencoding=utf-8 +require_relative 'test_helper' + +context 'Env' do + def new_env(options = {}) + Resque::Scheduler::Env.new(options) + end + + test 'daemonizes when background is true' do + Process.expects(:daemon) + env = new_env(background: true) + env.setup + end + + test 'reconnects redis when background is true' do + Process.stubs(:daemon) + mock_redis_client = mock('redis_client') + mock_redis = mock('redis') + mock_redis.expects(:client).returns(mock_redis_client) + mock_redis_client.expects(:reconnect) + Resque.expects(:redis).returns(mock_redis) + env = new_env(background: true) + env.setup + end + + test 'aborts when background is given and Process does not support daemon' do + Process.stubs(:daemon) + Process.expects(:respond_to?).with('daemon').returns(false) + env = new_env(background: true) + env.expects(:abort) + env.setup + end + + test 'writes pid to pidfile when given' do + mock_pidfile = mock('pidfile') + mock_pidfile.expects(:puts) + File.expects(:open).with('derp.pid', 'w').yields(mock_pidfile) + env = new_env(pidfile: 'derp.pid') + env.setup + end +end From 0aba925ae94340896f157362998a22fbb28d656b Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Tue, 18 Feb 2014 10:15:52 -0500 Subject: [PATCH 10/12] Splitting the resque extension's methods up a bit --- lib/resque/scheduler.rb | 1 + lib/resque/scheduler/delaying_extensions.rb | 270 ++++++++++++ lib/resque/scheduler/extension.rb | 403 +----------------- lib/resque/scheduler/scheduling_extensions.rb | 150 +++++++ 4 files changed, 426 insertions(+), 398 deletions(-) create mode 100644 lib/resque/scheduler/delaying_extensions.rb create mode 100644 lib/resque/scheduler/scheduling_extensions.rb diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index d05b8dc3..6cdb7b9e 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -10,6 +10,7 @@ module Resque module Scheduler autoload :Cli, 'resque/scheduler/cli' autoload :Extension, 'resque/scheduler/extension' + autoload :Util, 'resque/scheduler/util' private diff --git a/lib/resque/scheduler/delaying_extensions.rb b/lib/resque/scheduler/delaying_extensions.rb new file mode 100644 index 00000000..b7e588dd --- /dev/null +++ b/lib/resque/scheduler/delaying_extensions.rb @@ -0,0 +1,270 @@ +# vim:fileencoding=utf-8 +require 'resque' +require_relative 'plugin' +require_relative '../scheduler' + +module Resque + module Scheduler + module DelayingExtensions + # This method is nearly identical to +enqueue+ only it also + # takes a timestamp which will be used to schedule the job + # for queueing. Until timestamp is in the past, the job will + # sit in the schedule list. + def enqueue_at(timestamp, klass, *args) + validate(klass) + enqueue_at_with_queue( + queue_from_class(klass), timestamp, klass, *args + ) + end + + # Identical to +enqueue_at+, except you can also specify + # a queue in which the job will be placed after the + # timestamp has passed. It respects Resque.inline option, by + # creating the job right away instead of adding to the queue. + def enqueue_at_with_queue(queue, timestamp, klass, *args) + return false unless plugin.run_before_schedule_hooks(klass, *args) + + if Resque.inline? || timestamp.to_i < Time.now.to_i + # Just create the job and let resque perform it right away with + # inline. If the class is a custom job class, call self#scheduled + # on it. This allows you to do things like + # Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1). + # Otherwise, pass off to Resque. + if klass.respond_to?(:scheduled) + klass.scheduled(queue, klass.to_s, *args) + else + Resque::Job.create(queue, klass, *args) + end + else + delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) + end + + plugin.run_after_schedule_hooks(klass, *args) + end + + # Identical to enqueue_at but takes number_of_seconds_from_now + # instead of a timestamp. + def enqueue_in(number_of_seconds_from_now, klass, *args) + enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) + end + + # Identical to +enqueue_in+, except you can also specify + # a queue in which the job will be placed after the + # number of seconds has passed. + def enqueue_in_with_queue(queue, number_of_seconds_from_now, + klass, *args) + enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, + klass, *args) + end + + # Used internally to stuff the item into the schedule sorted list. + # +timestamp+ can be either in seconds or a datetime object Insertion + # if O(log(n)). Returns true if it's the first job to be scheduled at + # that time, else false + def delayed_push(timestamp, item) + # First add this item to the list for this timestamp + redis.rpush("delayed:#{timestamp.to_i}", encode(item)) + + # Store the timestamps at with this item occurs + redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") + + # Now, add this timestamp to the zsets. The score and the value are + # the same since we'll be querying by timestamp, and we don't have + # anything else to store. + redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i + end + + # Returns an array of timestamps based on start and count + def delayed_queue_peek(start, count) + result = redis.zrange(:delayed_queue_schedule, start, + start + count - 1) + Array(result).map(&:to_i) + end + + # Returns the size of the delayed queue schedule + def delayed_queue_schedule_size + redis.zcard :delayed_queue_schedule + end + + # Returns the number of jobs for a given timestamp in the delayed queue + # schedule + def delayed_timestamp_size(timestamp) + redis.llen("delayed:#{timestamp.to_i}").to_i + end + + # Returns an array of delayed items for the given timestamp + def delayed_timestamp_peek(timestamp, start, count) + if 1 == count + r = list_range "delayed:#{timestamp.to_i}", start, count + r.nil? ? [] : [r] + else + list_range "delayed:#{timestamp.to_i}", start, count + end + end + + # Returns the next delayed queue timestamp + # (don't call directly) + def next_delayed_timestamp(at_time = nil) + items = redis.zrangebyscore( + :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, + limit: [0, 1] + ) + timestamp = items.nil? ? nil : Array(items).first + timestamp.to_i unless timestamp.nil? + end + + # Returns the next item to be processed for a given timestamp, nil if + # done. (don't call directly) + # +timestamp+ can either be in seconds or a datetime + def next_item_for_timestamp(timestamp) + key = "delayed:#{timestamp.to_i}" + + encoded_item = redis.lpop(key) + redis.srem("timestamps:#{encoded_item}", key) + item = decode(encoded_item) + + # If the list is empty, remove it. + clean_up_timestamp(key, timestamp) + item + end + + # Clears all jobs created with enqueue_at or enqueue_in + def reset_delayed_queue + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| + key = "delayed:#{item}" + items = redis.lrange(key, 0, -1) + redis.pipelined do + items.each { |ts_item| redis.del("timestamps:#{ts_item}") } + end + redis.del key + end + + redis.del :delayed_queue_schedule + end + + # Given an encoded item, remove it from the delayed_queue + def remove_delayed(klass, *args) + search = encode(job_to_hash(klass, args)) + timestamps = redis.smembers("timestamps:#{search}") + + replies = redis.pipelined do + timestamps.each do |key| + redis.lrem(key, 0, search) + redis.srem("timestamps:#{search}", key) + end + end + + return 0 if replies.nil? || replies.empty? + replies.each_slice(2).map(&:first).inject(:+) + end + + # Given an encoded item, enqueue it now + def enqueue_delayed(klass, *args) + hash = job_to_hash(klass, args) + remove_delayed(klass, *args).times do + Resque::Scheduler.enqueue_from_config(hash) + end + end + + # Given a block, remove jobs that return true from a block + # + # This allows for removal of delayed jobs that have arguments matching + # certain criteria + def remove_delayed_selection + fail ArgumentError, 'Please supply a block' unless block_given? + + destroyed = 0 + # There is no way to search Redis list entries for a partial match, + # so we query for all delayed job tasks and do our matching after + # decoding the payload data + jobs = Resque.redis.keys('delayed:*') + jobs.each do |job| + index = Resque.redis.llen(job) - 1 + while index >= 0 + payload = Resque.redis.lindex(job, index) + decoded_payload = decode(payload) + if yield(decoded_payload['args']) + removed = redis.lrem job, 0, payload + destroyed += removed + index -= removed + else + index -= 1 + end + end + end + destroyed + end + + # Given a timestamp and job (klass + args) it removes all instances and + # returns the count of jobs removed. + # + # O(N) where N is the number of jobs scheduled to fire at the given + # timestamp + def remove_delayed_job_from_timestamp(timestamp, klass, *args) + key = "delayed:#{timestamp.to_i}" + encoded_job = encode(job_to_hash(klass, args)) + + redis.srem("timestamps:#{encoded_job}", key) + count = redis.lrem(key, 0, encoded_job) + clean_up_timestamp(key, timestamp) + + count + end + + def count_all_scheduled_jobs + total_jobs = 0 + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |ts| + total_jobs += redis.llen("delayed:#{ts}").to_i + end + total_jobs + end + + # Discover if a job has been delayed. + # Examples + # Resque.delayed?(MyJob) + # Resque.delayed?(MyJob, id: 1) + # Returns true if the job has been delayed + def delayed?(klass, *args) + !scheduled_at(klass, *args).empty? + end + + # Returns delayed jobs schedule timestamp for +klass+, +args+. + def scheduled_at(klass, *args) + search = encode(job_to_hash(klass, args)) + redis.smembers("timestamps:#{search}").map do |key| + key.tr('delayed:', '').to_i + end + end + + private + + def job_to_hash(klass, args) + { class: klass.to_s, args: args, queue: queue_from_class(klass) } + end + + def job_to_hash_with_queue(queue, klass, args) + { class: klass.to_s, args: args, queue: queue } + end + + def clean_up_timestamp(key, timestamp) + # If the list is empty, remove it. + + # Use a watch here to ensure nobody adds jobs to this delayed + # queue while we're removing it. + redis.watch key + if 0 == redis.llen(key).to_i + redis.multi do + redis.del key + redis.zrem :delayed_queue_schedule, timestamp.to_i + end + else + redis.unwatch + end + end + + def plugin + Resque::Scheduler::Plugin + end + end + end +end diff --git a/lib/resque/scheduler/extension.rb b/lib/resque/scheduler/extension.rb index 7f922b04..f2728fa9 100644 --- a/lib/resque/scheduler/extension.rb +++ b/lib/resque/scheduler/extension.rb @@ -1,406 +1,13 @@ # vim:fileencoding=utf-8 -require 'rubygems' -require 'resque' -require_relative 'version' -require_relative 'util' -require_relative '../scheduler' -require_relative 'plugin' + +require_relative 'scheduling_extensions' +require_relative 'delaying_extensions' module Resque module Scheduler module Extension - # Accepts a new schedule configuration of the form: - # - # { - # "MakeTea" => { - # "every" => "1m" }, - # "some_name" => { - # "cron" => "5/* * * *", - # "class" => "DoSomeWork", - # "args" => "work on this string", - # "description" => "this thing works it"s butter off" }, - # ... - # } - # - # Hash keys can be anything and are used to describe and reference - # the scheduled job. If the "class" argument is missing, the key - # is used implicitly as "class" argument - in the "MakeTea" example, - # "MakeTea" is used both as job name and resque worker class. - # - # Any jobs that were in the old schedule, but are not - # present in the new schedule, will be removed. - # - # :cron can be any cron scheduling string - # - # :every can be used in lieu of :cron. see rufus-scheduler's 'every' - # usage for valid syntax. If :cron is present it will take precedence - # over :every. - # - # :class must be a resque worker class. If it is missing, the job name - # (hash key) will be used as :class. - # - # :args can be any yaml which will be converted to a ruby literal and - # passed in a params. (optional) - # - # :rails_envs is the list of envs where the job gets loaded. Envs are - # comma separated (optional) - # - # :description is just that, a description of the job (optional). If - # params is an array, each element in the array is passed as a separate - # param, otherwise params is passed in as the only parameter to perform. - def schedule=(schedule_hash) - # clean the schedules as it exists in redis - clean_schedules - - schedule_hash = prepare_schedule(schedule_hash) - - # store all schedules in redis, so we can retrieve them back - # everywhere. - schedule_hash.each do |name, job_spec| - set_schedule(name, job_spec) - end - - # ensure only return the successfully saved data! - reload_schedule! - end - - # Returns the schedule hash - def schedule - @schedule ||= all_schedules - @schedule || {} - end - - # reloads the schedule from redis - def reload_schedule! - @schedule = all_schedules - end - - # gets the schedules as it exists in redis - def all_schedules - return nil unless redis.exists(:schedules) - - redis.hgetall(:schedules).tap do |h| - h.each do |name, config| - h[name] = decode(config) - end - end - end - - # clean the schedules as it exists in redis, useful for first setup? - def clean_schedules - if redis.exists(:schedules) - redis.hkeys(:schedules).each do |key| - remove_schedule(key) unless schedule_persisted?(key) - end - end - @schedule = nil - true - end - - # Create or update a schedule with the provided name and configuration. - # - # Note: values for class and custom_job_class need to be strings, - # not constants. - # - # Resque.set_schedule('some_job', {:class => 'SomeJob', - # :every => '15mins', - # :queue => 'high', - # :args => '/tmp/poop'}) - def set_schedule(name, config) - existing_config = fetch_schedule(name) - persist = config.delete(:persist) || config.delete('persist') - unless existing_config && existing_config == config - redis.pipelined do - redis.hset(:schedules, name, encode(config)) - redis.sadd(:schedules_changed, name) - redis.sadd(:persisted_schedules, name) if persist - end - end - config - end - - # retrive the schedule configuration for the given name - def fetch_schedule(name) - decode(redis.hget(:schedules, name)) - end - - def schedule_persisted?(name) - redis.sismember(:persisted_schedules, name) - end - - # remove a given schedule by name - def remove_schedule(name) - redis.pipelined do - redis.hdel(:schedules, name) - redis.srem(:persisted_schedules, name) - redis.sadd(:schedules_changed, name) - end - end - - # This method is nearly identical to +enqueue+ only it also - # takes a timestamp which will be used to schedule the job - # for queueing. Until timestamp is in the past, the job will - # sit in the schedule list. - def enqueue_at(timestamp, klass, *args) - validate(klass) - enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args) - end - - # Identical to +enqueue_at+, except you can also specify - # a queue in which the job will be placed after the - # timestamp has passed. It respects Resque.inline option, by - # creating the job right away instead of adding to the queue. - def enqueue_at_with_queue(queue, timestamp, klass, *args) - return false unless Plugin.run_before_schedule_hooks(klass, *args) - - if Resque.inline? || timestamp.to_i < Time.now.to_i - # Just create the job and let resque perform it right away with - # inline. If the class is a custom job class, call self#scheduled on - # it. This allows you to do things like Resque.enqueue_at(timestamp, - # CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque. - if klass.respond_to?(:scheduled) - klass.scheduled(queue, klass.to_s, *args) - else - Resque::Job.create(queue, klass, *args) - end - else - delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) - end - - Plugin.run_after_schedule_hooks(klass, *args) - end - - # Identical to enqueue_at but takes number_of_seconds_from_now - # instead of a timestamp. - def enqueue_in(number_of_seconds_from_now, klass, *args) - enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) - end - - # Identical to +enqueue_in+, except you can also specify - # a queue in which the job will be placed after the - # number of seconds has passed. - def enqueue_in_with_queue(queue, number_of_seconds_from_now, - klass, *args) - enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, - klass, *args) - end - - # Used internally to stuff the item into the schedule sorted list. - # +timestamp+ can be either in seconds or a datetime object Insertion if - # O(log(n)). Returns true if it's the first job to be scheduled at that - # time, else false - def delayed_push(timestamp, item) - # First add this item to the list for this timestamp - redis.rpush("delayed:#{timestamp.to_i}", encode(item)) - - # Store the timestamps at with this item occurs - redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") - - # Now, add this timestamp to the zsets. The score and the value are - # the same since we'll be querying by timestamp, and we don't have - # anything else to store. - redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i - end - - # Returns an array of timestamps based on start and count - def delayed_queue_peek(start, count) - result = redis.zrange(:delayed_queue_schedule, start, - start + count - 1) - Array(result).map(&:to_i) - end - - # Returns the size of the delayed queue schedule - def delayed_queue_schedule_size - redis.zcard :delayed_queue_schedule - end - - # Returns the number of jobs for a given timestamp in the delayed queue - # schedule - def delayed_timestamp_size(timestamp) - redis.llen("delayed:#{timestamp.to_i}").to_i - end - - # Returns an array of delayed items for the given timestamp - def delayed_timestamp_peek(timestamp, start, count) - if 1 == count - r = list_range "delayed:#{timestamp.to_i}", start, count - r.nil? ? [] : [r] - else - list_range "delayed:#{timestamp.to_i}", start, count - end - end - - # Returns the next delayed queue timestamp - # (don't call directly) - def next_delayed_timestamp(at_time = nil) - items = redis.zrangebyscore( - :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, - limit: [0, 1] - ) - timestamp = items.nil? ? nil : Array(items).first - timestamp.to_i unless timestamp.nil? - end - - # Returns the next item to be processed for a given timestamp, nil if - # done. (don't call directly) - # +timestamp+ can either be in seconds or a datetime - def next_item_for_timestamp(timestamp) - key = "delayed:#{timestamp.to_i}" - - encoded_item = redis.lpop(key) - redis.srem("timestamps:#{encoded_item}", key) - item = decode(encoded_item) - - # If the list is empty, remove it. - clean_up_timestamp(key, timestamp) - item - end - - # Clears all jobs created with enqueue_at or enqueue_in - def reset_delayed_queue - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| - key = "delayed:#{item}" - items = redis.lrange(key, 0, -1) - redis.pipelined do - items.each { |ts_item| redis.del("timestamps:#{ts_item}") } - end - redis.del key - end - - redis.del :delayed_queue_schedule - end - - # Given an encoded item, remove it from the delayed_queue - def remove_delayed(klass, *args) - search = encode(job_to_hash(klass, args)) - timestamps = redis.smembers("timestamps:#{search}") - - replies = redis.pipelined do - timestamps.each do |key| - redis.lrem(key, 0, search) - redis.srem("timestamps:#{search}", key) - end - end - - return 0 if replies.nil? || replies.empty? - replies.each_slice(2).map(&:first).inject(:+) - end - - # Given an encoded item, enqueue it now - def enqueue_delayed(klass, *args) - hash = job_to_hash(klass, args) - remove_delayed(klass, *args).times do - Resque::Scheduler.enqueue_from_config(hash) - end - end - - # Given a block, remove jobs that return true from a block - # - # This allows for removal of delayed jobs that have arguments matching - # certain criteria - def remove_delayed_selection - fail ArgumentError, 'Please supply a block' unless block_given? - - destroyed = 0 - # There is no way to search Redis list entries for a partial match, so - # we query for all delayed job tasks and do our matching after decoding - # the payload data - jobs = Resque.redis.keys('delayed:*') - jobs.each do |job| - index = Resque.redis.llen(job) - 1 - while index >= 0 - payload = Resque.redis.lindex(job, index) - decoded_payload = decode(payload) - if yield(decoded_payload['args']) - removed = redis.lrem job, 0, payload - destroyed += removed - index -= removed - else - index -= 1 - end - end - end - destroyed - end - - # Given a timestamp and job (klass + args) it removes all instances and - # returns the count of jobs removed. - # - # O(N) where N is the number of jobs scheduled to fire at the given - # timestamp - def remove_delayed_job_from_timestamp(timestamp, klass, *args) - key = "delayed:#{timestamp.to_i}" - encoded_job = encode(job_to_hash(klass, args)) - - redis.srem("timestamps:#{encoded_job}", key) - count = redis.lrem(key, 0, encoded_job) - clean_up_timestamp(key, timestamp) - - count - end - - def count_all_scheduled_jobs - total_jobs = 0 - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp| - total_jobs += redis.llen("delayed:#{timestamp}").to_i - end - total_jobs - end - - # Discover if a job has been delayed. - # Examples - # Resque.delayed?(MyJob) - # Resque.delayed?(MyJob, id: 1) - # Returns true if the job has been delayed - def delayed?(klass, *args) - !scheduled_at(klass, *args).empty? - end - - # Returns delayed jobs schedule timestamp for +klass+, +args+. - def scheduled_at(klass, *args) - search = encode(job_to_hash(klass, args)) - redis.smembers("timestamps:#{search}").map do |key| - key.tr('delayed:', '').to_i - end - end - - private - - def job_to_hash(klass, args) - { class: klass.to_s, args: args, queue: queue_from_class(klass) } - end - - def job_to_hash_with_queue(queue, klass, args) - { class: klass.to_s, args: args, queue: queue } - end - - def clean_up_timestamp(key, timestamp) - # If the list is empty, remove it. - - # Use a watch here to ensure nobody adds jobs to this delayed - # queue while we're removing it. - redis.watch key - if 0 == redis.llen(key).to_i - redis.multi do - redis.del key - redis.zrem :delayed_queue_schedule, timestamp.to_i - end - else - redis.unwatch - end - end - - def prepare_schedule(schedule_hash) - prepared_hash = {} - schedule_hash.each do |name, job_spec| - job_spec = job_spec.dup - unless job_spec.key?('class') || job_spec.key?(:class) - job_spec['class'] = name - end - prepared_hash[name] = job_spec - end - prepared_hash - end + include SchedulingExtensions + include DelayingExtensions end end end diff --git a/lib/resque/scheduler/scheduling_extensions.rb b/lib/resque/scheduler/scheduling_extensions.rb new file mode 100644 index 00000000..8be42c4a --- /dev/null +++ b/lib/resque/scheduler/scheduling_extensions.rb @@ -0,0 +1,150 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module SchedulingExtensions + # Accepts a new schedule configuration of the form: + # + # { + # "MakeTea" => { + # "every" => "1m" }, + # "some_name" => { + # "cron" => "5/* * * *", + # "class" => "DoSomeWork", + # "args" => "work on this string", + # "description" => "this thing works it"s butter off" }, + # ... + # } + # + # Hash keys can be anything and are used to describe and reference + # the scheduled job. If the "class" argument is missing, the key + # is used implicitly as "class" argument - in the "MakeTea" example, + # "MakeTea" is used both as job name and resque worker class. + # + # Any jobs that were in the old schedule, but are not + # present in the new schedule, will be removed. + # + # :cron can be any cron scheduling string + # + # :every can be used in lieu of :cron. see rufus-scheduler's 'every' + # usage for valid syntax. If :cron is present it will take precedence + # over :every. + # + # :class must be a resque worker class. If it is missing, the job name + # (hash key) will be used as :class. + # + # :args can be any yaml which will be converted to a ruby literal and + # passed in a params. (optional) + # + # :rails_envs is the list of envs where the job gets loaded. Envs are + # comma separated (optional) + # + # :description is just that, a description of the job (optional). If + # params is an array, each element in the array is passed as a separate + # param, otherwise params is passed in as the only parameter to + # perform. + def schedule=(schedule_hash) + # clean the schedules as it exists in redis + clean_schedules + + schedule_hash = prepare_schedule(schedule_hash) + + # store all schedules in redis, so we can retrieve them back + # everywhere. + schedule_hash.each do |name, job_spec| + set_schedule(name, job_spec) + end + + # ensure only return the successfully saved data! + reload_schedule! + end + + # Returns the schedule hash + def schedule + @schedule ||= all_schedules + @schedule || {} + end + + # reloads the schedule from redis + def reload_schedule! + @schedule = all_schedules + end + + # gets the schedules as it exists in redis + def all_schedules + return nil unless redis.exists(:schedules) + + redis.hgetall(:schedules).tap do |h| + h.each do |name, config| + h[name] = decode(config) + end + end + end + + # clean the schedules as it exists in redis, useful for first setup? + def clean_schedules + if redis.exists(:schedules) + redis.hkeys(:schedules).each do |key| + remove_schedule(key) unless schedule_persisted?(key) + end + end + @schedule = nil + true + end + + # Create or update a schedule with the provided name and configuration. + # + # Note: values for class and custom_job_class need to be strings, + # not constants. + # + # Resque.set_schedule('some_job', {:class => 'SomeJob', + # :every => '15mins', + # :queue => 'high', + # :args => '/tmp/poop'}) + def set_schedule(name, config) + existing_config = fetch_schedule(name) + persist = config.delete(:persist) || config.delete('persist') + unless existing_config && existing_config == config + redis.pipelined do + redis.hset(:schedules, name, encode(config)) + redis.sadd(:schedules_changed, name) + redis.sadd(:persisted_schedules, name) if persist + end + end + config + end + + # retrive the schedule configuration for the given name + def fetch_schedule(name) + decode(redis.hget(:schedules, name)) + end + + def schedule_persisted?(name) + redis.sismember(:persisted_schedules, name) + end + + # remove a given schedule by name + def remove_schedule(name) + redis.pipelined do + redis.hdel(:schedules, name) + redis.srem(:persisted_schedules, name) + redis.sadd(:schedules_changed, name) + end + end + + private + + def prepare_schedule(schedule_hash) + prepared_hash = {} + schedule_hash.each do |name, job_spec| + job_spec = job_spec.dup + unless job_spec.key?('class') || job_spec.key?(:class) + job_spec['class'] = name + end + prepared_hash[name] = job_spec + end + prepared_hash + end + end + end +end From 56555987205d553631f33a63fa10645752c2f1d2 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Sat, 22 Feb 2014 08:56:56 -0500 Subject: [PATCH 11/12] Trying to correct JRuby explosion --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index e873edc9..49855910 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,6 +8,7 @@ rvm: env: global: - RESQUE_SCHEDULER_DISABLE_TEST_REDIS_SERVER=1 + - JRUBY_OPTS='-Xcext.enabled=true' services: - redis-server notifications: From 34d83a4cedde7cdddbae029af687262317c15c19 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Sat, 22 Feb 2014 09:02:38 -0500 Subject: [PATCH 12/12] Switching to kramdown to make JRuby happy --- resque-scheduler.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resque-scheduler.gemspec b/resque-scheduler.gemspec index e160d6e1..de96c546 100644 --- a/resque-scheduler.gemspec +++ b/resque-scheduler.gemspec @@ -22,11 +22,11 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'bundler', '~> 1.5' spec.add_development_dependency 'json' + spec.add_development_dependency 'kramdown' spec.add_development_dependency 'mocha' spec.add_development_dependency 'pry' spec.add_development_dependency 'rack-test' spec.add_development_dependency 'rake' - spec.add_development_dependency 'redcarpet' spec.add_development_dependency 'rubocop' spec.add_development_dependency 'simplecov' spec.add_development_dependency 'yard'