diff --git a/.gitignore b/.gitignore index 2d30b632..be819545 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,12 @@ -.bundle/ -.idea/ +/.bundle/ +/.idea/ +/.yardoc/ -doc/ -pkg +/doc/ +/pkg/ nbproject -Gemfile.lock -.rvmrc +/Gemfile.lock +/.rvmrc *.swp /coverage/ diff --git a/.rubocop.yml b/.rubocop.yml index 83fd47ef..60b610bd 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,5 +1,5 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2014-02-09 12:18:33 -0500 using RuboCop version 0.18.1. +# on 2014-02-12 01:55:22 -0500 using RuboCop version 0.18.1. # The point is for the user to remove these configuration records # one by one as the offences are removed from the code base. # Note that changes in the inspected code, or installation of new @@ -9,30 +9,31 @@ AllCops: Includes: - Gemfile - Rakefile - - '*.gemspec' - - 'bin/*' + - resque-scheduler.gemspec + - bin/resque-scheduler # Offence count: 1 CaseEquality: Enabled: false -# Offence count: 1 +# Offence count: 2 # Configuration parameters: CountComments. ClassLength: - Max: 342 + Max: 130 -# Offence count: 4 +# Offence count: 3 CyclomaticComplexity: - Max: 24 + Max: 21 + +# Offence count: 29 +Documentation: + Enabled: false # Offence count: 17 # Configuration parameters: CountComments. MethodLength: - Max: 150 + Max: 145 # Offence count: 1 RescueException: Enabled: false - -Documentation: - Enabled: false diff --git a/.travis.yml b/.travis.yml index e873edc9..49855910 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,6 +8,7 @@ rvm: env: global: - RESQUE_SCHEDULER_DISABLE_TEST_REDIS_SERVER=1 + - JRUBY_OPTS='-Xcext.enabled=true' services: - redis-server notifications: diff --git a/README.md b/README.md index e82a21ab..7cfee33d 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ gem 'resque-scheduler' Adding the resque:scheduler rake task: ```ruby -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' ``` ### Rake integration @@ -66,12 +66,12 @@ everything `resque` needs to know. ```ruby # Resque tasks require 'resque/tasks' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' namespace :resque do task :setup do require 'resque' - require 'resque_scheduler' + require 'resque-scheduler' # you probably already have this somewhere Resque.redis = 'localhost:6379' @@ -145,8 +145,8 @@ scheduled job must run (coerced with `Kernel#Float()`) (default `5`) * `LOGFORMAT` - Log output format to use (either `'text'` or `'json'`, default `'text'`) * `PIDFILE` - If non-empty, write process PID to file (default empty) -* `QUIET` or `MUTE` - Silence most output if non-empty (equivalent to -a level of `Logger::FATAL`, default `false`) +* `QUIET` - Silence most output if non-empty (equivalent to a level of +`Logger::FATAL`, default `false`) * `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level of `Logger::DEBUG`, default `false`) @@ -448,7 +448,7 @@ redis instance and schedule. The scheduler processes will use redis to elect a master process and detect failover when the master dies. Precautions are taken to prevent jobs from potentially being queued twice during failover even when the clocks of the scheduler machines are slightly out of sync (or load affects -scheduled job firing time). If you want the gory details, look at Resque::SchedulerLocking. +scheduled job firing time). If you want the gory details, look at Resque::Scheduler::Locking. If the scheduler process(es) goes down for whatever reason, the delayed items that should have fired during the outage will fire once the scheduler process @@ -495,8 +495,8 @@ Now, you want to add the following: ```ruby # This will make the tabs show up. -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' ``` That should make the scheduler tabs show up in `resque-web`. @@ -534,8 +534,8 @@ worker is started. There are several options to toggle the way scheduler logs its actions. They are toggled by environment variables: - - `MUTE` will stop logging anything. Completely silent. - - `VERBOSE` opposite of 'mute'; will log even debug information + - `QUIET` will stop logging anything. Completely silent. + - `VERBOSE` opposite of 'QUIET'; will log even debug information - `LOGFILE` specifies the file to write logs to. (default standard output) - `LOGFORMAT` specifies either "text" or "json" output format (default "text") @@ -545,7 +545,7 @@ values: ```ruby Resque::Scheduler.configure do |c| - c.mute = false + c.quiet = false c.verbose = false c.logfile = nil # meaning all messages go to $stdout c.logformat = 'text' diff --git a/ROADMAP.md b/ROADMAP.md index 2e102190..cf31f6a5 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -39,8 +39,12 @@ v2.6.0 TODO for v3.0.0 =============== -- Clean up all RuboCop offences, breaking the API if necessary -- Get to at least 95% test coverage +- [x] Clean up all RuboCop offences, breaking the API if necessary +- [x] Get to at least 95% test coverage +- [x] Collapse `lib/resque_scheduler` into `lib/resque/scheduler` +- [ ] Reduce public API on `Resque::Scheduler::Extension` to fewer than 10 methods +- [ ] Reduce public API on `Resque::Scheduler` to fewer than 10 methods +- [ ] Anything else? TODO for v3.1.0 =============== diff --git a/Rakefile b/Rakefile index 28d36e81..26084bd0 100644 --- a/Rakefile +++ b/Rakefile @@ -2,6 +2,7 @@ require 'bundler/gem_tasks' require 'rake/testtask' require 'rubocop/rake_task' +require 'yard' task default: [:rubocop, :test] unless RUBY_PLATFORM =~ /java/ task default: [:test] if RUBY_PLATFORM =~ /java/ @@ -16,3 +17,5 @@ Rake::TestTask.new do |t| o << '--verbose ' if ENV['VERBOSE'] end end + +YARD::Rake::YardocTask.new diff --git a/bin/resque-scheduler b/bin/resque-scheduler index 124b9063..f26121d1 100755 --- a/bin/resque-scheduler +++ b/bin/resque-scheduler @@ -1,5 +1,5 @@ #!/usr/bin/env ruby # vim:fileencoding=utf-8 -require 'resque_scheduler' -ResqueScheduler::Cli.run! +require 'resque-scheduler' +Resque::Scheduler::Cli.run! diff --git a/examples/Rakefile b/examples/Rakefile index bf314d02..bf31d618 100644 --- a/examples/Rakefile +++ b/examples/Rakefile @@ -1,2 +1,2 @@ # vim:fileencoding=utf-8 -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' diff --git a/examples/config/initializers/resque-web.rb b/examples/config/initializers/resque-web.rb index 9a2ed792..20e1a141 100644 --- a/examples/config/initializers/resque-web.rb +++ b/examples/config/initializers/resque-web.rb @@ -7,8 +7,8 @@ redis_env_var = ENV['REDIS_PROVIDER'] || 'REDIS_URL' Resque.redis = ENV[redis_env_var] || 'localhost:6379' -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' schedule_yml = ENV['RESQUE_SCHEDULE_YML'] if schedule_yml diff --git a/examples/dynamic-scheduling/lib/tasks/resque.rake b/examples/dynamic-scheduling/lib/tasks/resque.rake index 76b8dbc1..5e7e8a07 100644 --- a/examples/dynamic-scheduling/lib/tasks/resque.rake +++ b/examples/dynamic-scheduling/lib/tasks/resque.rake @@ -1,13 +1,13 @@ # vim:fileencoding=utf-8 require 'resque/tasks' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' require 'yaml' namespace :resque do task :setup do require 'resque' - require 'resque_scheduler' + require 'resque-scheduler' rails_root = ENV['RAILS_ROOT'] || File.expand_path('../../../', __FILE__) rails_env = ENV['RAILS_ENV'] || 'development' diff --git a/lib/resque-scheduler.rb b/lib/resque-scheduler.rb index b1e9d337..b770cd91 100644 --- a/lib/resque-scheduler.rb +++ b/lib/resque-scheduler.rb @@ -1,2 +1,4 @@ # vim:fileencoding=utf-8 -require File.expand_path('../resque_scheduler', __FILE__) +require_relative 'resque/scheduler' + +Resque.extend Resque::Scheduler::Extension diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index e41fd215..a33f420d 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -1,96 +1,26 @@ # vim:fileencoding=utf-8 + require 'rufus/scheduler' -require 'resque/scheduler_locking' -require 'resque_scheduler/logger_builder' +require_relative 'scheduler/configuration' +require_relative 'scheduler/locking' +require_relative 'scheduler/logger_builder' +require_relative 'scheduler/signal_handling' module Resque - class Scheduler - extend Resque::SchedulerLocking - - class << self - # Allows for block-style configuration - def configure - yield self - end - - attr_writer :signal_queue - - def signal_queue - @signal_queue ||= [] - end - - # Used in `#load_schedule_job` - attr_writer :env - - def env - return @env if @env - @env ||= Rails.env if defined?(Rails) - @env ||= ENV['RAILS_ENV'] - @env - end - - # If true, logs more stuff... - attr_writer :verbose - - def verbose - @verbose ||= !!ENV['VERBOSE'] - end - - # If set, produces no output - attr_writer :mute + module Scheduler + autoload :Cli, 'resque/scheduler/cli' + autoload :Extension, 'resque/scheduler/extension' + autoload :Util, 'resque/scheduler/util' - def mute - @mute ||= !!ENV['MUTE'] - end - - # If set, will write messages to the file - attr_writer :logfile - - def logfile - @logfile ||= ENV['LOGFILE'] - end - - # Sets whether to log in 'text' or 'json' - attr_writer :logformat - - def logformat - @logformat ||= ENV['LOGFORMAT'] - end + private - # If set, will try to update the schedule in the loop - attr_writer :dynamic + extend Resque::Scheduler::Locking + extend Resque::Scheduler::Configuration + extend Resque::Scheduler::SignalHandling - def dynamic - @dynamic ||= !!ENV['DYNAMIC_SCHEDULE'] - end - - # If set, will append the app name to procline - attr_writer :app_name - - def app_name - @app_name ||= ENV['APP_NAME'] - end - - # Amount of time in seconds to sleep between polls of the delayed - # queue. Defaults to 5 - attr_writer :poll_sleep_amount - - def poll_sleep_amount - @poll_sleep_amount ||= - Float(ENV.fetch('RESQUE_SCHEDULER_INTERVAL', '5')) - end - - attr_writer :logger - - def logger - @logger ||= ResqueScheduler::LoggerBuilder.new( - mute: mute, - verbose: verbose, - log_dev: logfile, - format: logformat - ).build - end + public + class << self # the Rufus::Scheduler jobs that are scheduled attr_reader :scheduled_jobs @@ -136,28 +66,6 @@ def run end end - # For all signals, set the shutdown flag and wait for current - # poll/enqueing to finish (should be almost instant). In the - # case of sleeping, exit immediately. - def register_signal_handlers - %w(INT TERM USR1 USR2 QUIT).each do |sig| - trap(sig) { signal_queue << sig } - end - end - - def handle_signals - loop do - sig = signal_queue.shift - break unless sig - log! "Got #{sig} signal" - case sig - when 'INT', 'TERM', 'QUIT' then shutdown - when 'USR1' then print_schedule - when 'USR2' then reload_schedule! - end - end - end - def print_schedule if rufus_scheduler log! "Scheduling Info\tLast Run" @@ -202,8 +110,8 @@ def optionizate_interval_value(value) args end - # Loads a job schedule into the Rufus::Scheduler and stores it in - # @scheduled_jobs + # Loads a job schedule into the Rufus::Scheduler and stores it + # in @scheduled_jobs def load_schedule_job(name, config) # If `rails_env` or `env` is set in the config, load jobs only if they # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or @@ -307,7 +215,7 @@ def enqueue_from_config(job_config) klass_name = job_config['class'] || job_config[:class] begin - klass = ResqueScheduler::Util.constantize(klass_name) + klass = Resque::Scheduler::Util.constantize(klass_name) rescue NameError klass = klass_name end @@ -325,7 +233,7 @@ def enqueue_from_config(job_config) # from the web perhaps), fall back to enqueing normally via # Resque::Job.create. begin - ResqueScheduler::Util.constantize(job_klass).scheduled( + Resque::Scheduler::Util.constantize(job_klass).scheduled( queue, klass_name, *params ) rescue NameError @@ -338,7 +246,7 @@ def enqueue_from_config(job_config) # for non-existent classes (for example: running scheduler in # one app that schedules for another. if Class === klass - ResqueScheduler::Plugin.run_before_delayed_enqueue_hooks( + Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks( klass, *params ) @@ -462,6 +370,17 @@ def procline(string) private + attr_writer :logger + + def logger + @logger ||= Resque::Scheduler::LoggerBuilder.new( + quiet: quiet, + verbose: verbose, + log_dev: logfile, + format: logformat + ).build + end + def app_str app_name ? "[#{app_name}]" : '' end @@ -475,7 +394,7 @@ def build_procline(string) end def internal_name - "resque-scheduler-#{ResqueScheduler::VERSION}" + "resque-scheduler-#{Resque::Scheduler::VERSION}" end end end diff --git a/lib/resque/scheduler/cli.rb b/lib/resque/scheduler/cli.rb new file mode 100644 index 00000000..6d37ab19 --- /dev/null +++ b/lib/resque/scheduler/cli.rb @@ -0,0 +1,144 @@ +# vim:fileencoding=utf-8 + +require 'optparse' + +module Resque + module Scheduler + CLI_OPTIONS_ENV_MAPPING = { + app_name: 'APP_NAME', + background: 'BACKGROUND', + dynamic: 'DYNAMIC_SCHEDULE', + env: 'RAILS_ENV', + initializer_path: 'INITIALIZER_PATH', + logfile: 'LOGFILE', + logformat: 'LOGFORMAT', + quiet: 'QUIET', + pidfile: 'PIDFILE', + poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL', + verbose: 'VERBOSE' + } + + class Cli + BANNER = <<-EOF.gsub(/ {6}/, '') + Usage: resque-scheduler [options] + + Runs a resque scheduler process directly (rather than via rake). + + EOF + OPTIONS = [ + { + args: ['-n', '--app-name [APP_NAME]', + 'Application name for procline'], + callback: ->(options) { ->(n) { options[:app_name] = n } } + }, + { + args: ['-B', '--background', 'Run in the background [BACKGROUND]'], + callback: ->(options) { ->(b) { options[:background] = b } } + }, + { + args: ['-D', '--dynamic-schedule', + 'Enable dynamic scheduling [DYNAMIC_SCHEDULE]'], + callback: ->(options) { ->(d) { options[:dynamic] = d } } + }, + { + args: ['-E', '--environment [RAILS_ENV]', 'Environment name'], + callback: ->(options) { ->(e) { options[:env] = e } } + }, + { + args: ['-I', '--initializer-path [INITIALIZER_PATH]', + 'Path to optional initializer ruby file'], + callback: ->(options) { ->(i) { options[:initializer_path] = i } } + }, + { + args: ['-i', '--interval [RESQUE_SCHEDULER_INTERVAL]', + 'Interval for checking if a scheduled job must run'], + callback: ->(options) { ->(i) { options[:poll_sleep_amount] = i } } + }, + { + args: ['-l', '--logfile [LOGFILE]', 'Log file name'], + callback: ->(options) { ->(l) { options[:logfile] = l } } + }, + { + args: ['-F', '--logformat [LOGFORMAT]', 'Log output format'], + callback: ->(options) { ->(f) { options[:logformat] = f } } + }, + { + args: ['-P', '--pidfile [PIDFILE]', 'PID file name'], + callback: ->(options) { ->(p) { options[:pidfile] = p } } + }, + { + args: ['-q', '--quiet', 'Run with minimal output [QUIET]'], + callback: ->(options) { ->(q) { options[:quiet] = q } } + }, + { + args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'], + callback: ->(options) { ->(v) { options[:verbose] = v } } + } + ].freeze + + def self.run!(argv = ARGV, env = ENV) + new(argv, env).run! + end + + def initialize(argv = ARGV, env = ENV) + @argv = argv + @env = env + end + + def run! + pre_run + run_forever + end + + def pre_run + parse_options + pre_setup + setup_env + end + + def parse_options + option_parser.parse!(argv.dup) + end + + def pre_setup + if options[:initializer_path] + load options[:initializer_path].to_s.strip + else + false + end + end + + def setup_env + require_relative 'env' + runtime_env.setup + end + + def run_forever + Resque::Scheduler.run + end + + private + + attr_reader :argv, :env + + def runtime_env + Resque::Scheduler::Env.new(options) + end + + def option_parser + OptionParser.new do |opts| + opts.banner = BANNER + OPTIONS.each do |opt| + opts.on(*opt[:args], &(opt[:callback].call(options))) + end + end + end + + def options + @options ||= {}.tap do |o| + CLI_OPTIONS_ENV_MAPPING.map { |key, envvar| o[key] = env[envvar] } + end + end + end + end +end diff --git a/lib/resque/scheduler/configuration.rb b/lib/resque/scheduler/configuration.rb new file mode 100644 index 00000000..b2cccf07 --- /dev/null +++ b/lib/resque/scheduler/configuration.rb @@ -0,0 +1,73 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module Configuration + # Allows for block-style configuration + def configure + yield self + end + + # Used in `#load_schedule_job` + attr_writer :env + + def env + return @env if @env + @env ||= Rails.env if defined?(Rails) + @env ||= ENV['RAILS_ENV'] + @env + end + + # If true, logs more stuff... + attr_writer :verbose + + def verbose + @verbose ||= !!ENV['VERBOSE'] + end + + # If set, produces no output + attr_writer :quiet + + def quiet + @quiet ||= !!ENV['QUIET'] + end + + # If set, will write messages to the file + attr_writer :logfile + + def logfile + @logfile ||= ENV['LOGFILE'] + end + + # Sets whether to log in 'text' or 'json' + attr_writer :logformat + + def logformat + @logformat ||= ENV['LOGFORMAT'] + end + + # If set, will try to update the schedule in the loop + attr_writer :dynamic + + def dynamic + @dynamic ||= !!ENV['DYNAMIC_SCHEDULE'] + end + + # If set, will append the app name to procline + attr_writer :app_name + + def app_name + @app_name ||= ENV['APP_NAME'] + end + + # Amount of time in seconds to sleep between polls of the delayed + # queue. Defaults to 5 + attr_writer :poll_sleep_amount + + def poll_sleep_amount + @poll_sleep_amount ||= + Float(ENV.fetch('RESQUE_SCHEDULER_INTERVAL', '5')) + end + end + end +end diff --git a/lib/resque/scheduler/delaying_extensions.rb b/lib/resque/scheduler/delaying_extensions.rb new file mode 100644 index 00000000..8dc280a3 --- /dev/null +++ b/lib/resque/scheduler/delaying_extensions.rb @@ -0,0 +1,278 @@ +# vim:fileencoding=utf-8 +require 'resque' +require_relative 'plugin' +require_relative '../scheduler' + +module Resque + module Scheduler + module DelayingExtensions + # This method is nearly identical to +enqueue+ only it also + # takes a timestamp which will be used to schedule the job + # for queueing. Until timestamp is in the past, the job will + # sit in the schedule list. + def enqueue_at(timestamp, klass, *args) + validate(klass) + enqueue_at_with_queue( + queue_from_class(klass), timestamp, klass, *args + ) + end + + # Identical to +enqueue_at+, except you can also specify + # a queue in which the job will be placed after the + # timestamp has passed. It respects Resque.inline option, by + # creating the job right away instead of adding to the queue. + def enqueue_at_with_queue(queue, timestamp, klass, *args) + return false unless plugin.run_before_schedule_hooks(klass, *args) + + if Resque.inline? || timestamp.to_i < Time.now.to_i + # Just create the job and let resque perform it right away with + # inline. If the class is a custom job class, call self#scheduled + # on it. This allows you to do things like + # Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1). + # Otherwise, pass off to Resque. + if klass.respond_to?(:scheduled) + klass.scheduled(queue, klass.to_s, *args) + else + Resque::Job.create(queue, klass, *args) + end + else + delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) + end + + plugin.run_after_schedule_hooks(klass, *args) + end + + # Identical to enqueue_at but takes number_of_seconds_from_now + # instead of a timestamp. + def enqueue_in(number_of_seconds_from_now, klass, *args) + enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) + end + + # Identical to +enqueue_in+, except you can also specify + # a queue in which the job will be placed after the + # number of seconds has passed. + def enqueue_in_with_queue(queue, number_of_seconds_from_now, + klass, *args) + enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, + klass, *args) + end + + # Used internally to stuff the item into the schedule sorted list. + # +timestamp+ can be either in seconds or a datetime object Insertion + # if O(log(n)). Returns true if it's the first job to be scheduled at + # that time, else false + def delayed_push(timestamp, item) + # First add this item to the list for this timestamp + redis.rpush("delayed:#{timestamp.to_i}", encode(item)) + + # Store the timestamps at with this item occurs + redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") + + # Now, add this timestamp to the zsets. The score and the value are + # the same since we'll be querying by timestamp, and we don't have + # anything else to store. + redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i + end + + # Returns an array of timestamps based on start and count + def delayed_queue_peek(start, count) + result = redis.zrange(:delayed_queue_schedule, start, + start + count - 1) + Array(result).map(&:to_i) + end + + # Returns the size of the delayed queue schedule + def delayed_queue_schedule_size + redis.zcard :delayed_queue_schedule + end + + # Returns the number of jobs for a given timestamp in the delayed queue + # schedule + def delayed_timestamp_size(timestamp) + redis.llen("delayed:#{timestamp.to_i}").to_i + end + + # Returns an array of delayed items for the given timestamp + def delayed_timestamp_peek(timestamp, start, count) + if 1 == count + r = list_range "delayed:#{timestamp.to_i}", start, count + r.nil? ? [] : [r] + else + list_range "delayed:#{timestamp.to_i}", start, count + end + end + + # Returns the next delayed queue timestamp + # (don't call directly) + def next_delayed_timestamp(at_time = nil) + items = redis.zrangebyscore( + :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, + limit: [0, 1] + ) + timestamp = items.nil? ? nil : Array(items).first + timestamp.to_i unless timestamp.nil? + end + + # Returns the next item to be processed for a given timestamp, nil if + # done. (don't call directly) + # +timestamp+ can either be in seconds or a datetime + def next_item_for_timestamp(timestamp) + key = "delayed:#{timestamp.to_i}" + + encoded_item = redis.lpop(key) + redis.srem("timestamps:#{encoded_item}", key) + item = decode(encoded_item) + + # If the list is empty, remove it. + clean_up_timestamp(key, timestamp) + item + end + + # Clears all jobs created with enqueue_at or enqueue_in + def reset_delayed_queue + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| + key = "delayed:#{item}" + items = redis.lrange(key, 0, -1) + redis.pipelined do + items.each { |ts_item| redis.del("timestamps:#{ts_item}") } + end + redis.del key + end + + redis.del :delayed_queue_schedule + end + + # Given an encoded item, remove it from the delayed_queue + def remove_delayed(klass, *args) + search = encode(job_to_hash(klass, args)) + timestamps = redis.smembers("timestamps:#{search}") + + replies = redis.pipelined do + timestamps.each do |key| + redis.lrem(key, 0, search) + redis.srem("timestamps:#{search}", key) + end + end + + return 0 if replies.nil? || replies.empty? + replies.each_slice(2).map(&:first).inject(:+) + end + + # Given an encoded item, enqueue it now + def enqueue_delayed(klass, *args) + hash = job_to_hash(klass, args) + remove_delayed(klass, *args).times do + Resque::Scheduler.enqueue_from_config(hash) + end + end + + # Given a block, remove jobs that return true from a block + # + # This allows for removal of delayed jobs that have arguments matching + # certain criteria + def remove_delayed_selection + fail ArgumentError, 'Please supply a block' unless block_given? + + destroyed = 0 + # There is no way to search Redis list entries for a partial match, + # so we query for all delayed job tasks and do our matching after + # decoding the payload data + jobs = Resque.redis.keys('delayed:*') + jobs.each do |job| + index = Resque.redis.llen(job) - 1 + while index >= 0 + payload = Resque.redis.lindex(job, index) + decoded_payload = decode(payload) + if yield(decoded_payload['args']) + removed = redis.lrem job, 0, payload + destroyed += removed + index -= removed + else + index -= 1 + end + end + end + destroyed + end + + # Given a timestamp and job (klass + args) it removes all instances and + # returns the count of jobs removed. + # + # O(N) where N is the number of jobs scheduled to fire at the given + # timestamp + def remove_delayed_job_from_timestamp(timestamp, klass, *args) + key = "delayed:#{timestamp.to_i}" + encoded_job = encode(job_to_hash(klass, args)) + + redis.srem("timestamps:#{encoded_job}", key) + count = redis.lrem(key, 0, encoded_job) + clean_up_timestamp(key, timestamp) + + count + end + + def count_all_scheduled_jobs + total_jobs = 0 + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |ts| + total_jobs += redis.llen("delayed:#{ts}").to_i + end + total_jobs + end + + # Discover if a job has been delayed. + # Examples + # Resque.delayed?(MyJob) + # Resque.delayed?(MyJob, id: 1) + # Returns true if the job has been delayed + def delayed?(klass, *args) + !scheduled_at(klass, *args).empty? + end + + # Returns delayed jobs schedule timestamp for +klass+, +args+. + def scheduled_at(klass, *args) + search = encode(job_to_hash(klass, args)) + redis.smembers("timestamps:#{search}").map do |key| + key.tr('delayed:', '').to_i + end + end + + def last_enqueued_at(job_name, date) + redis.hset('delayed:last_enqueued_at', job_name, date) + end + + def get_last_enqueued_at(job_name) + redis.hget('delayed:last_enqueued_at', job_name) + end + + private + + def job_to_hash(klass, args) + { class: klass.to_s, args: args, queue: queue_from_class(klass) } + end + + def job_to_hash_with_queue(queue, klass, args) + { class: klass.to_s, args: args, queue: queue } + end + + def clean_up_timestamp(key, timestamp) + # If the list is empty, remove it. + + # Use a watch here to ensure nobody adds jobs to this delayed + # queue while we're removing it. + redis.watch key + if 0 == redis.llen(key).to_i + redis.multi do + redis.del key + redis.zrem :delayed_queue_schedule, timestamp.to_i + end + else + redis.unwatch + end + end + + def plugin + Resque::Scheduler::Plugin + end + end + end +end diff --git a/lib/resque/scheduler/env.rb b/lib/resque/scheduler/env.rb new file mode 100644 index 00000000..61945bcc --- /dev/null +++ b/lib/resque/scheduler/env.rb @@ -0,0 +1,61 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + class Env + def initialize(options) + @options = options + end + + def setup + require 'resque' + require 'resque/scheduler' + + setup_backgrounding + setup_pid_file + setup_scheduler_configuration + end + + private + + attr_reader :options + + def setup_backgrounding + # Need to set this here for conditional Process.daemon redirect of + # stderr/stdout to /dev/null + Resque::Scheduler.quiet = !!options[:quiet] + + if options[:background] + unless Process.respond_to?('daemon') + abort 'background option is set, which requires ruby >= 1.9' + end + + Process.daemon(true, !Resque::Scheduler.quiet) + Resque.redis.client.reconnect + end + end + + def setup_pid_file + File.open(options[:pidfile], 'w') do |f| + f.puts $PROCESS_ID + end if options[:pidfile] + end + + def setup_scheduler_configuration + Resque::Scheduler.configure do |c| + # These settings are somewhat redundant given the defaults present + # in the attr reader methods. They are left here for clarity and + # to serve as an example of how to use `.configure`. + + c.app_name = options[:app_name] + c.dynamic = !!options[:dynamic] + c.env = options[:env] + c.logfile = options[:logfile] + c.logformat = options[:logformat] + c.poll_sleep_amount = Float(options[:poll_sleep_amount] || '5') + c.verbose = !!options[:verbose] + end + end + end + end +end diff --git a/lib/resque/scheduler/extension.rb b/lib/resque/scheduler/extension.rb new file mode 100644 index 00000000..f2728fa9 --- /dev/null +++ b/lib/resque/scheduler/extension.rb @@ -0,0 +1,13 @@ +# vim:fileencoding=utf-8 + +require_relative 'scheduling_extensions' +require_relative 'delaying_extensions' + +module Resque + module Scheduler + module Extension + include SchedulingExtensions + include DelayingExtensions + end + end +end diff --git a/lib/resque/scheduler/lock/base.rb b/lib/resque/scheduler/lock/base.rb index 52aa43fc..fe33eb4d 100644 --- a/lib/resque/scheduler/lock/base.rb +++ b/lib/resque/scheduler/lock/base.rb @@ -1,7 +1,7 @@ # vim:fileencoding=utf-8 module Resque - class Scheduler + module Scheduler module Lock class Base attr_reader :key diff --git a/lib/resque/scheduler/lock/basic.rb b/lib/resque/scheduler/lock/basic.rb index 10cecd77..b2a030f1 100644 --- a/lib/resque/scheduler/lock/basic.rb +++ b/lib/resque/scheduler/lock/basic.rb @@ -1,8 +1,8 @@ # vim:fileencoding=utf-8 -require 'resque/scheduler/lock/base' +require_relative 'base' module Resque - class Scheduler + module Scheduler module Lock class Basic < Base def acquire! diff --git a/lib/resque/scheduler/lock/resilient.rb b/lib/resque/scheduler/lock/resilient.rb index 07edb002..2c860961 100644 --- a/lib/resque/scheduler/lock/resilient.rb +++ b/lib/resque/scheduler/lock/resilient.rb @@ -1,8 +1,8 @@ # vim:fileencoding=utf-8 -require 'resque/scheduler/lock/base' +require_relative 'base' module Resque - class Scheduler + module Scheduler module Lock class Resilient < Base def acquire! diff --git a/lib/resque/scheduler_locking.rb b/lib/resque/scheduler/locking.rb similarity index 77% rename from lib/resque/scheduler_locking.rb rename to lib/resque/scheduler/locking.rb index 2a09bcd7..5a100172 100644 --- a/lib/resque/scheduler_locking.rb +++ b/lib/resque/scheduler/locking.rb @@ -49,44 +49,46 @@ # you stop cron, no jobs fire while it's stopped and it doesn't fire jobs that # were missed when it starts up again. -require 'resque/scheduler/lock' +require_relative 'lock' module Resque - module SchedulerLocking - def master_lock - @master_lock ||= build_master_lock - end + module Scheduler + module Locking + def master_lock + @master_lock ||= build_master_lock + end - def supports_lua? - redis_master_version >= 2.5 - end + def supports_lua? + redis_master_version >= 2.5 + end - def master? - master_lock.acquire! || master_lock.locked? - end + def master? + master_lock.acquire! || master_lock.locked? + end - def release_master_lock! - master_lock.release! - end + def release_master_lock! + master_lock.release! + end - private + private - def build_master_lock - if supports_lua? - Resque::Scheduler::Lock::Resilient.new(master_lock_key) - else - Resque::Scheduler::Lock::Basic.new(master_lock_key) + def build_master_lock + if supports_lua? + Resque::Scheduler::Lock::Resilient.new(master_lock_key) + else + Resque::Scheduler::Lock::Basic.new(master_lock_key) + end end - end - def master_lock_key - lock_prefix = ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] || '' - lock_prefix += ':' if lock_prefix != '' - "#{Resque.redis.namespace}:#{lock_prefix}resque_scheduler_master_lock" - end + def master_lock_key + lock_prefix = ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] || '' + lock_prefix += ':' if lock_prefix != '' + "#{Resque.redis.namespace}:#{lock_prefix}resque_scheduler_master_lock" + end - def redis_master_version - Resque.redis.info['redis_version'].to_f + def redis_master_version + Resque.redis.info['redis_version'].to_f + end end end end diff --git a/lib/resque/scheduler/logger_builder.rb b/lib/resque/scheduler/logger_builder.rb new file mode 100644 index 00000000..58a3ee84 --- /dev/null +++ b/lib/resque/scheduler/logger_builder.rb @@ -0,0 +1,70 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + # Just builds a logger, with specified verbosity and destination. + # The simplest example: + # + # Resque::Scheduler::LoggerBuilder.new.build + class LoggerBuilder + # Initializes new instance of the builder + # + # Pass :opts Hash with + # - :quiet if logger needs to be silent for all levels. Default - false + # - :verbose if there is a need in debug messages. Default - false + # - :log_dev to output logs into a desired file. Default - STDOUT + # - :format log format, either 'text' or 'json'. Default - 'text' + # + # Example: + # + # LoggerBuilder.new( + # :quiet => false, :verbose => true, :log_dev => 'log/scheduler.log' + # ) + def initialize(opts = {}) + @quiet = !!opts[:quiet] + @verbose = !!opts[:verbose] + @log_dev = opts[:log_dev] || $stdout + @format = opts[:format] || 'text' + end + + # Returns an instance of Logger + def build + logger = Logger.new(@log_dev) + logger.level = level + logger.formatter = send(:"#{@format}_formatter") + logger + end + + private + + def level + if @verbose && !@quiet + Logger::DEBUG + elsif !@quiet + Logger::INFO + else + Logger::FATAL + end + end + + def text_formatter + proc do |severity, datetime, progname, msg| + "resque-scheduler: [#{severity}] #{datetime.iso8601}: #{msg}\n" + end + end + + def json_formatter + proc do |severity, datetime, progname, msg| + require 'json' + JSON.dump( + name: 'resque-scheduler', + progname: progname, + level: severity, + timestamp: datetime.iso8601, + msg: msg + ) + "\n" + end + end + end + end +end diff --git a/lib/resque/scheduler/plugin.rb b/lib/resque/scheduler/plugin.rb new file mode 100644 index 00000000..bab898b4 --- /dev/null +++ b/lib/resque/scheduler/plugin.rb @@ -0,0 +1,31 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module Plugin + def self.hooks(job, pattern) + job.methods.grep(/^#{pattern}/).sort + end + + def self.run_hooks(job, pattern, *args) + results = hooks(job, pattern).map do |hook| + job.send(hook, *args) + end + + results.all? { |result| result != false } + end + + def self.run_before_delayed_enqueue_hooks(klass, *args) + run_hooks(klass, 'before_delayed_enqueue', *args) + end + + def self.run_before_schedule_hooks(klass, *args) + run_hooks(klass, 'before_schedule', *args) + end + + def self.run_after_schedule_hooks(klass, *args) + run_hooks(klass, 'after_schedule', *args) + end + end + end +end diff --git a/lib/resque/scheduler/scheduling_extensions.rb b/lib/resque/scheduler/scheduling_extensions.rb new file mode 100644 index 00000000..8be42c4a --- /dev/null +++ b/lib/resque/scheduler/scheduling_extensions.rb @@ -0,0 +1,150 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module SchedulingExtensions + # Accepts a new schedule configuration of the form: + # + # { + # "MakeTea" => { + # "every" => "1m" }, + # "some_name" => { + # "cron" => "5/* * * *", + # "class" => "DoSomeWork", + # "args" => "work on this string", + # "description" => "this thing works it"s butter off" }, + # ... + # } + # + # Hash keys can be anything and are used to describe and reference + # the scheduled job. If the "class" argument is missing, the key + # is used implicitly as "class" argument - in the "MakeTea" example, + # "MakeTea" is used both as job name and resque worker class. + # + # Any jobs that were in the old schedule, but are not + # present in the new schedule, will be removed. + # + # :cron can be any cron scheduling string + # + # :every can be used in lieu of :cron. see rufus-scheduler's 'every' + # usage for valid syntax. If :cron is present it will take precedence + # over :every. + # + # :class must be a resque worker class. If it is missing, the job name + # (hash key) will be used as :class. + # + # :args can be any yaml which will be converted to a ruby literal and + # passed in a params. (optional) + # + # :rails_envs is the list of envs where the job gets loaded. Envs are + # comma separated (optional) + # + # :description is just that, a description of the job (optional). If + # params is an array, each element in the array is passed as a separate + # param, otherwise params is passed in as the only parameter to + # perform. + def schedule=(schedule_hash) + # clean the schedules as it exists in redis + clean_schedules + + schedule_hash = prepare_schedule(schedule_hash) + + # store all schedules in redis, so we can retrieve them back + # everywhere. + schedule_hash.each do |name, job_spec| + set_schedule(name, job_spec) + end + + # ensure only return the successfully saved data! + reload_schedule! + end + + # Returns the schedule hash + def schedule + @schedule ||= all_schedules + @schedule || {} + end + + # reloads the schedule from redis + def reload_schedule! + @schedule = all_schedules + end + + # gets the schedules as it exists in redis + def all_schedules + return nil unless redis.exists(:schedules) + + redis.hgetall(:schedules).tap do |h| + h.each do |name, config| + h[name] = decode(config) + end + end + end + + # clean the schedules as it exists in redis, useful for first setup? + def clean_schedules + if redis.exists(:schedules) + redis.hkeys(:schedules).each do |key| + remove_schedule(key) unless schedule_persisted?(key) + end + end + @schedule = nil + true + end + + # Create or update a schedule with the provided name and configuration. + # + # Note: values for class and custom_job_class need to be strings, + # not constants. + # + # Resque.set_schedule('some_job', {:class => 'SomeJob', + # :every => '15mins', + # :queue => 'high', + # :args => '/tmp/poop'}) + def set_schedule(name, config) + existing_config = fetch_schedule(name) + persist = config.delete(:persist) || config.delete('persist') + unless existing_config && existing_config == config + redis.pipelined do + redis.hset(:schedules, name, encode(config)) + redis.sadd(:schedules_changed, name) + redis.sadd(:persisted_schedules, name) if persist + end + end + config + end + + # retrive the schedule configuration for the given name + def fetch_schedule(name) + decode(redis.hget(:schedules, name)) + end + + def schedule_persisted?(name) + redis.sismember(:persisted_schedules, name) + end + + # remove a given schedule by name + def remove_schedule(name) + redis.pipelined do + redis.hdel(:schedules, name) + redis.srem(:persisted_schedules, name) + redis.sadd(:schedules_changed, name) + end + end + + private + + def prepare_schedule(schedule_hash) + prepared_hash = {} + schedule_hash.each do |name, job_spec| + job_spec = job_spec.dup + unless job_spec.key?('class') || job_spec.key?(:class) + job_spec['class'] = name + end + prepared_hash[name] = job_spec + end + prepared_hash + end + end + end +end diff --git a/lib/resque/scheduler/server.rb b/lib/resque/scheduler/server.rb new file mode 100644 index 00000000..9bce802e --- /dev/null +++ b/lib/resque/scheduler/server.rb @@ -0,0 +1,229 @@ +# vim:fileencoding=utf-8 +require 'resque-scheduler' +require 'resque/server' +require 'json' + +# Extend Resque::Server to add tabs +module Resque + module Scheduler + module Server + def self.included(base) + base.class_eval do + helpers { include HelperMethods } + include ServerMethods + + get('/schedule') { schedule } + post('/schedule/requeue') { schedule_requeue } + post('/schedule/requeue_with_params') do + schedule_requeue_with_params + end + get('/delayed') { delayed } + get('/delayed/jobs/:klass') { delayed_jobs_klass } + post('/delayed/search') { delayed_search } + get('/delayed/:timestamp') { delayed_timestamp } + post('/delayed/queue_now') { delayed_queue_now } + post('/delayed/cancel_now') { delayed_cancel_now } + post('/delayed/clear') { delayed_clear } + end + end + + module ServerMethods + def schedule + Resque.reload_schedule! if Resque::Scheduler.dynamic + erb scheduler_template('scheduler') + end + + def schedule_requeue + @job_name = params['job_name'] || params[:job_name] + config = Resque.schedule[@job_name] + @parameters = config['parameters'] || config[:parameters] + if @parameters + erb scheduler_template('requeue-params') + else + Resque::Scheduler.enqueue_from_config(config) + redirect u('/overview') + end + end + + def schedule_requeue_with_params + job_name = params['job_name'] || params[:job_name] + config = Resque.schedule[job_name] + # Build args hash from post data (removing the job name) + submitted_args = params.reject do |key, value| + key == 'job_name' || key == :job_name + end + + # Merge constructed args hash with existing args hash for + # the job, if it exists + config_args = config['args'] || config[:args] || {} + config_args = config_args.merge(submitted_args) + + # Insert the args hash into config and queue the resque job + config = config.merge('args' => config_args) + Resque::Scheduler.enqueue_from_config(config) + redirect u('/overview') + end + + def delayed + erb scheduler_template('delayed') + end + + def delayed_jobs_klass + begin + klass = Resque::Scheduler::Util.constantize(params[:klass]) + @args = JSON.load(URI.decode(params[:args])) + @timestamps = Resque.scheduled_at(klass, *@args) + rescue + @timestamps = [] + end + + erb scheduler_template('delayed_schedules') + end + + def delayed_search + @jobs = find_job(params[:search]) + erb scheduler_template('search') + end + + def delayed_timestamp + erb scheduler_template('delayed_timestamp') + end + + def delayed_queue_now + timestamp = params['timestamp'].to_i + if timestamp > 0 + Resque::Scheduler.enqueue_delayed_items_for_timestamp(timestamp) + end + redirect u('/overview') + end + + def delayed_cancel_now + klass = Resque::Scheduler::Util.constantize(params['klass']) + timestamp = params['timestamp'] + args = Resque.decode params['args'] + Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args) + redirect u('/delayed') + end + + def delayed_clear + Resque.reset_delayed_queue + redirect u('delayed') + end + end + + module HelperMethods + def format_time(t) + t.strftime('%Y-%m-%d %H:%M:%S %z') + end + + def queue_from_class_name(class_name) + Resque.queue_from_class( + Resque::Scheduler::Util.constantize(class_name) + ) + end + + def find_job(worker) + worker = worker.downcase + results = working_jobs_for_worker(worker) + + dels = delayed_jobs_for_worker(worker) + results += dels.select do |j| + j['class'].downcase.include?(worker) && + j.merge!('where_at' => 'delayed') + end + + Resque.queues.each do |queue| + queued = Resque.peek(queue, 0, Resque.size(queue)) + queued = [queued] unless queued.is_a?(Array) + results += queued.select do |j| + j['class'].downcase.include?(worker) && + j.merge!('queue' => queue, 'where_at' => 'queued') + end + end + + results + end + + def schedule_interval(config) + if config['every'] + schedule_interval_every(config['every']) + elsif config['cron'] + 'cron: ' + config['cron'].to_s + else + 'Not currently scheduled' + end + end + + def schedule_interval_every(every) + every = [*every] + s = 'every: ' << every.first + + return s unless every.length > 1 + + s << ' (' + meta = every.last.map do |key, value| + "#{key.to_s.gsub(/_/, ' ')} #{value}" + end + s << meta.join(', ') << ')' + end + + def schedule_class(config) + if config['class'].nil? && !config['custom_job_class'].nil? + config['custom_job_class'] + else + config['class'] + end + end + + def scheduler_template(name) + File.read( + File.expand_path("../server/views/#{name}.erb", __FILE__) + ) + end + + def scheduled_in_this_env?(name) + return true if Resque.schedule[name]['rails_env'].nil? + Resque.schedule[name]['rails_env'] == Resque::Scheduler.env + end + + private + + def working_jobs_for_worker(worker) + [].tap do |results| + working = [*Resque.working] + work = working.select do |w| + w.job && w.job['payload'] && + w.job['payload']['class'].downcase.include?(worker) + end + work.each do |w| + results += [ + w.job['payload'].merge( + 'queue' => w.job['queue'], 'where_at' => 'working' + ) + ] + end + end + end + + def delayed_jobs_for_worker(worker) + [].tap do |dels| + schedule_size = Resque.delayed_queue_schedule_size + Resque.delayed_queue_peek(0, schedule_size).each do |d| + Resque.delayed_timestamp_peek( + d, 0, Resque.delayed_timestamp_size(d)).each do |j| + dels << j.merge!('timestamp' => d) + end + end + end + end + end + end + end +end + +Resque::Server.tabs << 'Schedule' +Resque::Server.tabs << 'Delayed' + +Resque::Server.class_eval do + include Resque::Scheduler::Server +end diff --git a/lib/resque_scheduler/server/views/delayed.erb b/lib/resque/scheduler/server/views/delayed.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed.erb rename to lib/resque/scheduler/server/views/delayed.erb diff --git a/lib/resque_scheduler/server/views/delayed_schedules.erb b/lib/resque/scheduler/server/views/delayed_schedules.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed_schedules.erb rename to lib/resque/scheduler/server/views/delayed_schedules.erb diff --git a/lib/resque_scheduler/server/views/delayed_timestamp.erb b/lib/resque/scheduler/server/views/delayed_timestamp.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed_timestamp.erb rename to lib/resque/scheduler/server/views/delayed_timestamp.erb diff --git a/lib/resque_scheduler/server/views/requeue-params.erb b/lib/resque/scheduler/server/views/requeue-params.erb similarity index 100% rename from lib/resque_scheduler/server/views/requeue-params.erb rename to lib/resque/scheduler/server/views/requeue-params.erb diff --git a/lib/resque_scheduler/server/views/scheduler.erb b/lib/resque/scheduler/server/views/scheduler.erb similarity index 100% rename from lib/resque_scheduler/server/views/scheduler.erb rename to lib/resque/scheduler/server/views/scheduler.erb diff --git a/lib/resque_scheduler/server/views/search.erb b/lib/resque/scheduler/server/views/search.erb similarity index 100% rename from lib/resque_scheduler/server/views/search.erb rename to lib/resque/scheduler/server/views/search.erb diff --git a/lib/resque_scheduler/server/views/search_form.erb b/lib/resque/scheduler/server/views/search_form.erb similarity index 100% rename from lib/resque_scheduler/server/views/search_form.erb rename to lib/resque/scheduler/server/views/search_form.erb diff --git a/lib/resque/scheduler/signal_handling.rb b/lib/resque/scheduler/signal_handling.rb new file mode 100644 index 00000000..e779f022 --- /dev/null +++ b/lib/resque/scheduler/signal_handling.rb @@ -0,0 +1,35 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module SignalHandling + attr_writer :signal_queue + + def signal_queue + @signal_queue ||= [] + end + + # For all signals, set the shutdown flag and wait for current + # poll/enqueing to finish (should be almost instant). In the + # case of sleeping, exit immediately. + def register_signal_handlers + %w(INT TERM USR1 USR2 QUIT).each do |sig| + trap(sig) { signal_queue << sig } + end + end + + def handle_signals + loop do + sig = signal_queue.shift + break unless sig + log! "Got #{sig} signal" + case sig + when 'INT', 'TERM', 'QUIT' then shutdown + when 'USR1' then print_schedule + when 'USR2' then reload_schedule! + end + end + end + end + end +end diff --git a/lib/resque_scheduler/tasks.rb b/lib/resque/scheduler/tasks.rb similarity index 85% rename from lib/resque_scheduler/tasks.rb rename to lib/resque/scheduler/tasks.rb index 6e6cfd1c..52eed1aa 100644 --- a/lib/resque_scheduler/tasks.rb +++ b/lib/resque/scheduler/tasks.rb @@ -2,13 +2,13 @@ require 'English' require 'resque/tasks' -require 'resque_scheduler' +require 'resque-scheduler' namespace :resque do task :setup def scheduler_cli - @scheduler_cli ||= ResqueScheduler::Cli.new( + @scheduler_cli ||= Resque::Scheduler::Cli.new( %W(#{ENV['RESQUE_SCHEDULER_OPTIONS']}) ) end diff --git a/lib/resque/scheduler/util.rb b/lib/resque/scheduler/util.rb new file mode 100644 index 00000000..ad983a9b --- /dev/null +++ b/lib/resque/scheduler/util.rb @@ -0,0 +1,41 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + class Util + # In order to upgrade to resque(1.25) which has deprecated following + # methods, we just added these usefull helpers back to use in Resque + # Scheduler. refer to: + # https://github.com/resque/resque-scheduler/pull/273 + + def self.constantize(camel_cased_word) + camel_cased_word = camel_cased_word.to_s + + if camel_cased_word.include?('-') + camel_cased_word = classify(camel_cased_word) + end + + names = camel_cased_word.split('::') + names.shift if names.empty? || names.first.empty? + + constant = Object + names.each do |name| + args = Module.method(:const_get).arity != 1 ? [false] : [] + + if constant.const_defined?(name, *args) + constant = constant.const_get(name) + else + constant = constant.const_missing(name) + end + end + constant + end + + def self.classify(dashed_word) + dashed_word.split('-').each do|part| + part[0] = part[0].chr.upcase + end.join + end + end + end +end diff --git a/lib/resque/scheduler/version.rb b/lib/resque/scheduler/version.rb new file mode 100644 index 00000000..6d55ff62 --- /dev/null +++ b/lib/resque/scheduler/version.rb @@ -0,0 +1,7 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + VERSION = '2.5.2' + end +end diff --git a/lib/resque_scheduler.rb b/lib/resque_scheduler.rb deleted file mode 100644 index 58b2b4c9..00000000 --- a/lib/resque_scheduler.rb +++ /dev/null @@ -1,411 +0,0 @@ -# vim:fileencoding=utf-8 -require 'rubygems' -require 'resque' -require 'resque_scheduler/version' -require 'resque_scheduler/util' -require 'resque/scheduler' -require 'resque_scheduler/plugin' - -module ResqueScheduler - autoload :Cli, 'resque_scheduler/cli' - - # - # Accepts a new schedule configuration of the form: - # - # { - # "MakeTea" => { - # "every" => "1m" }, - # "some_name" => { - # "cron" => "5/* * * *", - # "class" => "DoSomeWork", - # "args" => "work on this string", - # "description" => "this thing works it"s butter off" }, - # ... - # } - # - # Hash keys can be anything and are used to describe and reference - # the scheduled job. If the "class" argument is missing, the key - # is used implicitly as "class" argument - in the "MakeTea" example, - # "MakeTea" is used both as job name and resque worker class. - # - # Any jobs that were in the old schedule, but are not - # present in the new schedule, will be removed. - # - # :cron can be any cron scheduling string - # - # :every can be used in lieu of :cron. see rufus-scheduler's 'every' usage - # for valid syntax. If :cron is present it will take precedence over :every. - # - # :class must be a resque worker class. If it is missing, the job name (hash - # key) will be used as :class. - # - # :args can be any yaml which will be converted to a ruby literal and - # passed in a params. (optional) - # - # :rails_envs is the list of envs where the job gets loaded. Envs are - # comma separated (optional) - # - # :description is just that, a description of the job (optional). If - # params is an array, each element in the array is passed as a separate - # param, otherwise params is passed in as the only parameter to perform. - def schedule=(schedule_hash) - # clean the schedules as it exists in redis - clean_schedules - - schedule_hash = prepare_schedule(schedule_hash) - - # store all schedules in redis, so we can retrieve them back everywhere. - schedule_hash.each do |name, job_spec| - set_schedule(name, job_spec) - end - - # ensure only return the successfully saved data! - reload_schedule! - end - - # Returns the schedule hash - def schedule - @schedule ||= all_schedules - @schedule || {} - end - - # reloads the schedule from redis - def reload_schedule! - @schedule = all_schedules - end - - # gets the schedules as it exists in redis - def all_schedules - return nil unless redis.exists(:schedules) - - redis.hgetall(:schedules).tap do |h| - h.each do |name, config| - h[name] = decode(config) - end - end - end - - # clean the schedules as it exists in redis, useful for first setup? - def clean_schedules - if redis.exists(:schedules) - redis.hkeys(:schedules).each do |key| - remove_schedule(key) unless schedule_persisted?(key) - end - end - @schedule = nil - true - end - - # Create or update a schedule with the provided name and configuration. - # - # Note: values for class and custom_job_class need to be strings, - # not constants. - # - # Resque.set_schedule('some_job', {:class => 'SomeJob', - # :every => '15mins', - # :queue => 'high', - # :args => '/tmp/poop'}) - def set_schedule(name, config) - existing_config = fetch_schedule(name) - persist = config.delete(:persist) || config.delete('persist') - unless existing_config && existing_config == config - redis.pipelined do - redis.hset(:schedules, name, encode(config)) - redis.sadd(:schedules_changed, name) - redis.sadd(:persisted_schedules, name) if persist - end - end - config - end - - # retrive the schedule configuration for the given name - def fetch_schedule(name) - decode(redis.hget(:schedules, name)) - end - - def schedule_persisted?(name) - redis.sismember(:persisted_schedules, name) - end - - # remove a given schedule by name - def remove_schedule(name) - redis.pipelined do - redis.hdel(:schedules, name) - redis.srem(:persisted_schedules, name) - redis.sadd(:schedules_changed, name) - end - end - - # This method is nearly identical to +enqueue+ only it also - # takes a timestamp which will be used to schedule the job - # for queueing. Until timestamp is in the past, the job will - # sit in the schedule list. - def enqueue_at(timestamp, klass, *args) - validate(klass) - enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args) - end - - # Identical to +enqueue_at+, except you can also specify - # a queue in which the job will be placed after the - # timestamp has passed. It respects Resque.inline option, by - # creating the job right away instead of adding to the queue. - def enqueue_at_with_queue(queue, timestamp, klass, *args) - return false unless Plugin.run_before_schedule_hooks(klass, *args) - - if Resque.inline? || timestamp.to_i < Time.now.to_i - # Just create the job and let resque perform it right away with inline. - # If the class is a custom job class, call self#scheduled on it. This - # allows you to do things like Resque.enqueue_at(timestamp, - # CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque. - if klass.respond_to?(:scheduled) - klass.scheduled(queue, klass.to_s, *args) - else - Resque::Job.create(queue, klass, *args) - end - else - delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) - end - - Plugin.run_after_schedule_hooks(klass, *args) - end - - # Identical to enqueue_at but takes number_of_seconds_from_now - # instead of a timestamp. - def enqueue_in(number_of_seconds_from_now, klass, *args) - enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) - end - - # Identical to +enqueue_in+, except you can also specify - # a queue in which the job will be placed after the - # number of seconds has passed. - def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) - enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, - klass, *args) - end - - # Used internally to stuff the item into the schedule sorted list. - # +timestamp+ can be either in seconds or a datetime object - # Insertion if O(log(n)). - # Returns true if it's the first job to be scheduled at that time, else false - def delayed_push(timestamp, item) - # First add this item to the list for this timestamp - redis.rpush("delayed:#{timestamp.to_i}", encode(item)) - - # Store the timestamps at with this item occurs - redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") - - # Now, add this timestamp to the zsets. The score and the value are - # the same since we'll be querying by timestamp, and we don't have - # anything else to store. - redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i - end - - # Returns an array of timestamps based on start and count - def delayed_queue_peek(start, count) - result = redis.zrange(:delayed_queue_schedule, start, start + count - 1) - Array(result).map(&:to_i) - end - - # Returns the size of the delayed queue schedule - def delayed_queue_schedule_size - redis.zcard :delayed_queue_schedule - end - - # Returns the number of jobs for a given timestamp in the delayed queue - # schedule - def delayed_timestamp_size(timestamp) - redis.llen("delayed:#{timestamp.to_i}").to_i - end - - # Returns an array of delayed items for the given timestamp - def delayed_timestamp_peek(timestamp, start, count) - if 1 == count - r = list_range "delayed:#{timestamp.to_i}", start, count - r.nil? ? [] : [r] - else - list_range "delayed:#{timestamp.to_i}", start, count - end - end - - # Returns the next delayed queue timestamp - # (don't call directly) - def next_delayed_timestamp(at_time = nil) - items = redis.zrangebyscore( - :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, - limit: [0, 1] - ) - timestamp = items.nil? ? nil : Array(items).first - timestamp.to_i unless timestamp.nil? - end - - # Returns the next item to be processed for a given timestamp, nil if - # done. (don't call directly) - # +timestamp+ can either be in seconds or a datetime - def next_item_for_timestamp(timestamp) - key = "delayed:#{timestamp.to_i}" - - encoded_item = redis.lpop(key) - redis.srem("timestamps:#{encoded_item}", key) - item = decode(encoded_item) - - # If the list is empty, remove it. - clean_up_timestamp(key, timestamp) - item - end - - # Clears all jobs created with enqueue_at or enqueue_in - def reset_delayed_queue - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| - key = "delayed:#{item}" - items = redis.lrange(key, 0, -1) - redis.pipelined do - items.each { |ts_item| redis.del("timestamps:#{ts_item}") } - end - redis.del key - end - - redis.del :delayed_queue_schedule - end - - # Given an encoded item, remove it from the delayed_queue - def remove_delayed(klass, *args) - search = encode(job_to_hash(klass, args)) - timestamps = redis.smembers("timestamps:#{search}") - - replies = redis.pipelined do - timestamps.each do |key| - redis.lrem(key, 0, search) - redis.srem("timestamps:#{search}", key) - end - end - - return 0 if replies.nil? || replies.empty? - replies.each_slice(2).map(&:first).inject(:+) - end - - # Given an encoded item, enqueue it now - def enqueue_delayed(klass, *args) - hash = job_to_hash(klass, args) - remove_delayed(klass, *args).times do - Resque::Scheduler.enqueue_from_config(hash) - end - end - - # Given a block, remove jobs that return true from a block - # - # This allows for removal of delayed jobs that have arguments matching - # certain criteria - def remove_delayed_selection - fail ArgumentError, 'Please supply a block' unless block_given? - - destroyed = 0 - # There is no way to search Redis list entries for a partial match, so we - # query for all delayed job tasks and do our matching after decoding the - # payload data - jobs = Resque.redis.keys('delayed:*') - jobs.each do |job| - index = Resque.redis.llen(job) - 1 - while index >= 0 - payload = Resque.redis.lindex(job, index) - decoded_payload = decode(payload) - if yield(decoded_payload['args']) - removed = redis.lrem job, 0, payload - destroyed += removed - index -= removed - else - index -= 1 - end - end - end - destroyed - end - - # Given a timestamp and job (klass + args) it removes all instances and - # returns the count of jobs removed. - # - # O(N) where N is the number of jobs scheduled to fire at the given - # timestamp - def remove_delayed_job_from_timestamp(timestamp, klass, *args) - key = "delayed:#{timestamp.to_i}" - encoded_job = encode(job_to_hash(klass, args)) - - redis.srem("timestamps:#{encoded_job}", key) - count = redis.lrem(key, 0, encoded_job) - clean_up_timestamp(key, timestamp) - - count - end - - def count_all_scheduled_jobs - total_jobs = 0 - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp| - total_jobs += redis.llen("delayed:#{timestamp}").to_i - end - total_jobs - end - - # Discover if a job hs been delayed. - # Examples - # Resque.delayed? MyJob - # Resque.delayed? MyJob, id: 1 - # Returns true if the job has been delayed - def delayed?(klass, *args) - !scheduled_at(klass, *args).empty? - end - - # Returns delayed jobs schedule timestamp for +klass+, +args+. - def scheduled_at(klass, *args) - search = encode(job_to_hash(klass, args)) - redis.smembers("timestamps:#{search}").map do |key| - key.tr('delayed:', '').to_i - end - end - - def last_enqueued_at(job_name, date) - redis.hset('delayed:last_enqueued_at', job_name, date) - end - - def get_last_enqueued_at(job_name) - redis.hget('delayed:last_enqueued_at', job_name) - end - - private - - def job_to_hash(klass, args) - { class: klass.to_s, args: args, queue: queue_from_class(klass) } - end - - def job_to_hash_with_queue(queue, klass, args) - { class: klass.to_s, args: args, queue: queue } - end - - def clean_up_timestamp(key, timestamp) - # If the list is empty, remove it. - - # Use a watch here to ensure nobody adds jobs to this delayed - # queue while we're removing it. - redis.watch key - if 0 == redis.llen(key).to_i - redis.multi do - redis.del key - redis.zrem :delayed_queue_schedule, timestamp.to_i - end - else - redis.unwatch - end - end - - def prepare_schedule(schedule_hash) - prepared_hash = {} - schedule_hash.each do |name, job_spec| - job_spec = job_spec.dup - unless job_spec.key?('class') || job_spec.key?(:class) - job_spec['class'] = name - end - prepared_hash[name] = job_spec - end - prepared_hash - end -end - -Resque.extend ResqueScheduler diff --git a/lib/resque_scheduler/cli.rb b/lib/resque_scheduler/cli.rb deleted file mode 100644 index 0bcc43aa..00000000 --- a/lib/resque_scheduler/cli.rb +++ /dev/null @@ -1,160 +0,0 @@ -# vim:fileencoding=utf-8 - -require 'optparse' - -module ResqueScheduler - class Cli - BANNER = <<-EOF.gsub(/ {6}/, '') - Usage: resque-scheduler [options] - - Runs a resque scheduler process directly (rather than via rake). - - EOF - OPTIONS = [ - { - args: ['-n', '--app-name [APP_NAME]', 'Application name for procline'], - callback: ->(options) { ->(n) { options[:app_name] = n } } - }, - { - args: ['-B', '--background', 'Run in the background [BACKGROUND]'], - callback: ->(options) { ->(b) { options[:background] = b } } - }, - { - args: ['-D', '--dynamic-schedule', - 'Enable dynamic scheduling [DYNAMIC_SCHEDULE]'], - callback: ->(options) { ->(d) { options[:dynamic] = d } } - }, - { - args: ['-E', '--environment [RAILS_ENV]', 'Environment name'], - callback: ->(options) { ->(e) { options[:env] = e } } - }, - { - args: ['-I', '--initializer-path [INITIALIZER_PATH]', - 'Path to optional initializer ruby file'], - callback: ->(options) { ->(i) { options[:initializer_path] = i } } - }, - { - args: ['-i', '--interval [RESQUE_SCHEDULER_INTERVAL]', - 'Interval for checking if a scheduled job must run'], - callback: ->(options) { ->(i) { options[:poll_sleep_amount] = i } } - }, - { - args: ['-l', '--logfile [LOGFILE]', 'Log file name'], - callback: ->(options) { ->(l) { options[:logfile] = l } } - }, - { - args: ['-F', '--logformat [LOGFORMAT]', 'Log output format'], - callback: ->(options) { ->(f) { options[:logformat] = f } } - }, - { - args: ['-P', '--pidfile [PIDFILE]', 'PID file name'], - callback: ->(options) { ->(p) { options[:pidfile] = p } } - }, - { - args: ['-q', '--quiet', 'Run with minimal output [QUIET] (or [MUTE])'], - callback: ->(options) { ->(q) { options[:mute] = q } } - }, - { - args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'], - callback: ->(options) { ->(v) { options[:verbose] = v } } - } - ].freeze - - def self.run!(argv = ARGV, env = ENV) - new(argv, env).run! - end - - def initialize(argv = ARGV, env = ENV) - @argv = argv - @env = env - end - - def run! - pre_run - run_forever - end - - def pre_run - parse_options - pre_setup - setup_env - end - - def parse_options - OptionParser.new do |opts| - opts.banner = BANNER - OPTIONS.each do |opt| - opts.on(*opt[:args], &(opt[:callback].call(options))) - end - end.parse!(argv.dup) - end - - def pre_setup - if options[:initializer_path] - load options[:initializer_path].to_s.strip - else - false - end - end - - def setup_env - require 'resque' - require 'resque/scheduler' - - # Need to set this here for conditional Process.daemon redirect of - # stderr/stdout to /dev/null - Resque::Scheduler.mute = !!options[:mute] - - if options[:background] - unless Process.respond_to?('daemon') - abort 'background option is set, which requires ruby >= 1.9' - end - - Process.daemon(true, !Resque::Scheduler.mute) - Resque.redis.client.reconnect - end - - File.open(options[:pidfile], 'w') do |f| - f.puts $PROCESS_ID - end if options[:pidfile] - - Resque::Scheduler.configure do |c| - # These settings are somewhat redundant given the defaults present - # in the attr reader methods. They are left here for clarity and - # to serve as an example of how to use `.configure`. - - c.app_name = options[:app_name] - c.dynamic = !!options[:dynamic] - c.env = options[:env] - c.logfile = options[:logfile] - c.logformat = options[:logformat] - c.poll_sleep_amount = Float(options[:poll_sleep_amount] || '5') - c.verbose = !!options[:verbose] - end - end - - def run_forever - Resque::Scheduler.run - end - - private - - attr_reader :argv, :env - - def options - @options ||= { - app_name: env['APP_NAME'], - background: env['BACKGROUND'], - dynamic: env['DYNAMIC_SCHEDULE'], - env: env['RAILS_ENV'], - initializer_path: env['INITIALIZER_PATH'], - logfile: env['LOGFILE'], - logformat: env['LOGFORMAT'], - mute: env['MUTE'] || env['QUIET'], - pidfile: env['PIDFILE'], - poll_sleep_amount: env['RESQUE_SCHEDULER_INTERVAL'], - verbose: env['VERBOSE'] - } - end - end -end diff --git a/lib/resque_scheduler/logger_builder.rb b/lib/resque_scheduler/logger_builder.rb deleted file mode 100644 index 85e2d41e..00000000 --- a/lib/resque_scheduler/logger_builder.rb +++ /dev/null @@ -1,70 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - # Just builds a logger, with specified verbosity and destination. - # The simplest example: - # - # ResqueScheduler::LoggerBuilder.new.build - class LoggerBuilder - # Initializes new instance of the builder - # - # Pass :opts Hash with - # - :mute if logger needs to be silent for all levels. Default - false - # - :verbose if there is a need in debug messages. Default - false - # - :log_dev to output logs into a desired file. Default - STDOUT - # - :format log format, either 'text' or 'json'. Default - 'text' - # - # Example: - # - # LoggerBuilder.new( - # :mute => false, :verbose => true, :log_dev => 'log/scheduler.log' - # ) - def initialize(opts = {}) - @muted = !!opts[:mute] - @verbose = !!opts[:verbose] - @log_dev = opts[:log_dev] || $stdout - @format = opts[:format] || 'text' - end - - # Returns an instance of Logger - def build - logger = Logger.new(@log_dev) - logger.level = level - logger.formatter = send(:"#{@format}_formatter") - logger - end - - private - - def level - if @verbose && !@muted - Logger::DEBUG - elsif !@muted - Logger::INFO - else - Logger::FATAL - end - end - - def text_formatter - proc do |severity, datetime, progname, msg| - "resque-scheduler: [#{severity}] #{datetime.iso8601}: #{msg}\n" - end - end - - def json_formatter - proc do |severity, datetime, progname, msg| - require 'json' - JSON.dump( - - name: 'resque-scheduler', - progname: progname, - level: severity, - timestamp: datetime.iso8601, - msg: msg - - ) + "\n" - end - end - end -end diff --git a/lib/resque_scheduler/plugin.rb b/lib/resque_scheduler/plugin.rb deleted file mode 100644 index db8f7fc2..00000000 --- a/lib/resque_scheduler/plugin.rb +++ /dev/null @@ -1,29 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - module Plugin - def self.hooks(job, pattern) - job.methods.grep(/^#{pattern}/).sort - end - - def self.run_hooks(job, pattern, *args) - results = hooks(job, pattern).map do |hook| - job.send(hook, *args) - end - - results.all? { |result| result != false } - end - - def self.run_before_delayed_enqueue_hooks(klass, *args) - run_hooks(klass, 'before_delayed_enqueue', *args) - end - - def self.run_before_schedule_hooks(klass, *args) - run_hooks(klass, 'before_schedule', *args) - end - - def self.run_after_schedule_hooks(klass, *args) - run_hooks(klass, 'after_schedule', *args) - end - end -end diff --git a/lib/resque_scheduler/server.rb b/lib/resque_scheduler/server.rb deleted file mode 100644 index 87400bfb..00000000 --- a/lib/resque_scheduler/server.rb +++ /dev/null @@ -1,200 +0,0 @@ -# vim:fileencoding=utf-8 -require 'resque_scheduler' -require 'resque/server' -require 'json' - -# Extend Resque::Server to add tabs -module ResqueScheduler - module Server - def self.included(base) - base.class_eval do - helpers do - def format_time(t) - t.strftime('%Y-%m-%d %H:%M:%S %z') - end - - def queue_from_class_name(class_name) - Resque.queue_from_class( - ResqueScheduler::Util.constantize(class_name) - ) - end - - def find_job(worker) - worker = worker.downcase - results = Array.new - - # Check working jobs - working = [*Resque.working] - work = working.select do |w| - w.job && w.job['payload'] && - w.job['payload']['class'].downcase.include?(worker) - end - work.each do |w| - results += [ - w.job['payload'].merge( - 'queue' => w.job['queue'], 'where_at' => 'working' - ) - ] - end - - # Check delayed Jobs - dels = Array.new - schedule_size = Resque.delayed_queue_schedule_size - Resque.delayed_queue_peek(0, schedule_size).each do |d| - Resque.delayed_timestamp_peek( - d, 0, Resque.delayed_timestamp_size(d)).each do |j| - dels << j.merge!('timestamp' => d) - end - end - results += dels.select do |j| - j['class'].downcase.include?(worker) && - j.merge!('where_at' => 'delayed') - end - - # Check Queues - Resque.queues.each do |queue| - queued = Resque.peek(queue, 0, Resque.size(queue)) - queued = [queued] unless queued.is_a?(Array) - results += queued.select do |j| - j['class'].downcase.include?(worker) && - j.merge!('queue' => queue, 'where_at' => 'queued') - end - end - results - end - - def schedule_interval(config) - if config['every'] - schedule_interval_every(config['every']) - elsif config['cron'] - 'cron: ' + config['cron'].to_s - else - 'Not currently scheduled' - end - end - - def schedule_interval_every(every) - every = [*every] - s = 'every: ' << every.first - - return s unless every.length > 1 - - s << ' (' - meta = every.last.map do |key, value| - "#{key.to_s.gsub(/_/, ' ')} #{value}" - end - s << meta.join(', ') << ')' - end - - def schedule_class(config) - if config['class'].nil? && !config['custom_job_class'].nil? - config['custom_job_class'] - else - config['class'] - end - end - - def scheduler_template(name) - File.read( - File.expand_path("../server/views/#{name}.erb", __FILE__) - ) - end - - def scheduled_in_this_env?(name) - return true if Resque.schedule[name]['rails_env'].nil? - Resque.schedule[name]['rails_env'] == Resque::Scheduler.env - end - end - - get '/schedule' do - Resque.reload_schedule! if Resque::Scheduler.dynamic - erb scheduler_template('scheduler') - end - - post '/schedule/requeue' do - @job_name = params['job_name'] || params[:job_name] - config = Resque.schedule[@job_name] - @parameters = config['parameters'] || config[:parameters] - if @parameters - erb scheduler_template('requeue-params') - else - Resque::Scheduler.enqueue_from_config(config) - redirect u('/overview') - end - end - - post '/schedule/requeue_with_params' do - job_name = params['job_name'] || params[:job_name] - config = Resque.schedule[job_name] - # Build args hash from post data (removing the job name) - submitted_args = params.reject do |key, value| - key == 'job_name' || key == :job_name - end - - # Merge constructed args hash with existing args hash for - # the job, if it exists - config_args = config['args'] || config[:args] || {} - config_args = config_args.merge(submitted_args) - - # Insert the args hash into config and queue the resque job - config = config.merge('args' => config_args) - Resque::Scheduler.enqueue_from_config(config) - redirect u('/overview') - end - - get '/delayed' do - erb scheduler_template('delayed') - end - - get '/delayed/jobs/:klass' do - begin - klass = ResqueScheduler::Util.constantize(params[:klass]) - @args = JSON.load(URI.decode(params[:args])) - @timestamps = Resque.scheduled_at(klass, *@args) - rescue - @timestamps = [] - end - - erb scheduler_template('delayed_schedules') - end - - post '/delayed/search' do - @jobs = find_job(params[:search]) - erb scheduler_template('search') - end - - get '/delayed/:timestamp' do - erb scheduler_template('delayed_timestamp') - end - - post '/delayed/queue_now' do - timestamp = params['timestamp'].to_i - if timestamp > 0 - Resque::Scheduler.enqueue_delayed_items_for_timestamp(timestamp) - end - redirect u('/overview') - end - - post '/delayed/cancel_now' do - klass = ResqueScheduler::Util.constantize params['klass'] - timestamp = params['timestamp'] - args = Resque.decode params['args'] - Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args) - redirect u('/delayed') - end - - post '/delayed/clear' do - Resque.reset_delayed_queue - redirect u('delayed') - end - end - end - end -end - -Resque::Server.tabs << 'Schedule' -Resque::Server.tabs << 'Delayed' - -Resque::Server.class_eval do - include ResqueScheduler::Server -end diff --git a/lib/resque_scheduler/util.rb b/lib/resque_scheduler/util.rb deleted file mode 100644 index fe81a177..00000000 --- a/lib/resque_scheduler/util.rb +++ /dev/null @@ -1,36 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - class Util - # In order to upgrade to resque(1.25) which has deprecated following - # methods, we just added these usefull helpers back to use in Resque - # Scheduler. refer to: https://github.com/resque/resque-scheduler/pull/273 - - def self.constantize(camel_cased_word) - camel_cased_word = camel_cased_word.to_s - - if camel_cased_word.include?('-') - camel_cased_word = classify(camel_cased_word) - end - - names = camel_cased_word.split('::') - names.shift if names.empty? || names.first.empty? - - constant = Object - names.each do |name| - args = Module.method(:const_get).arity != 1 ? [false] : [] - - if constant.const_defined?(name, *args) - constant = constant.const_get(name) - else - constant = constant.const_missing(name) - end - end - constant - end - - def self.classify(dashed_word) - dashed_word.split('-').each { |part| part[0] = part[0].chr.upcase }.join - end - end -end diff --git a/lib/resque_scheduler/version.rb b/lib/resque_scheduler/version.rb deleted file mode 100644 index 241c0f71..00000000 --- a/lib/resque_scheduler/version.rb +++ /dev/null @@ -1,5 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - VERSION = '2.5.4' -end diff --git a/resque-scheduler.gemspec b/resque-scheduler.gemspec index cb610d5a..de96c546 100644 --- a/resque-scheduler.gemspec +++ b/resque-scheduler.gemspec @@ -1,11 +1,11 @@ # vim:fileencoding=utf-8 lib = File.expand_path('../lib', __FILE__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) -require 'resque_scheduler/version' +require 'resque/scheduler/version' Gem::Specification.new do |spec| spec.name = 'resque-scheduler' - spec.version = ResqueScheduler::VERSION + spec.version = Resque::Scheduler::VERSION spec.authors = ['Ben VandenBos'] spec.email = ['bvandenbos@gmail.com'] spec.homepage = 'http://github.com/resque/resque-scheduler' @@ -22,12 +22,14 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'bundler', '~> 1.5' spec.add_development_dependency 'json' + spec.add_development_dependency 'kramdown' spec.add_development_dependency 'mocha' spec.add_development_dependency 'pry' spec.add_development_dependency 'rack-test' spec.add_development_dependency 'rake' spec.add_development_dependency 'rubocop' spec.add_development_dependency 'simplecov' + spec.add_development_dependency 'yard' spec.add_runtime_dependency 'redis', '~> 3.0' spec.add_runtime_dependency 'resque', '~> 1.25' diff --git a/tasks/resque_scheduler.rake b/tasks/resque_scheduler.rake index a6f2b4b8..fb0a9300 100644 --- a/tasks/resque_scheduler.rake +++ b/tasks/resque_scheduler.rake @@ -1,2 +1,2 @@ $LOAD_PATH.unshift File.dirname(__FILE__) + '/../lib' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' diff --git a/test/cli_test.rb b/test/cli_test.rb index 5a0c8e06..418d9a53 100644 --- a/test/cli_test.rb +++ b/test/cli_test.rb @@ -2,8 +2,14 @@ require_relative 'test_helper' context 'Cli' do + def mock_runtime_env + mock.tap { |m| m.stubs(:setup) } + end + def new_cli(argv = [], env = {}) - ResqueScheduler::Cli.new(argv, env) + Resque::Scheduler::Cli.new(argv, env).tap do |cli| + cli.stubs(:runtime_env).returns(mock_runtime_env) + end end test 'does not require any positional arguments' do @@ -52,31 +58,6 @@ def new_cli(argv = [], env = {}) assert_equal(true, cli.send(:options)[:background]) end - test 'daemonizes when background is true' do - Process.expects(:daemon) - cli = new_cli(%w(--background)) - cli.pre_run - end - - test 'reconnects redis when background is true' do - Process.stubs(:daemon) - mock_redis_client = mock('redis_client') - mock_redis = mock('redis') - mock_redis.expects(:client).returns(mock_redis_client) - mock_redis_client.expects(:reconnect) - Resque.expects(:redis).returns(mock_redis) - cli = new_cli(%w(--background)) - cli.pre_run - end - - test 'aborts when background is given and Process does not support daemon' do - Process.stubs(:daemon) - Process.expects(:respond_to?).with('daemon').returns(false) - cli = new_cli(%w(--background)) - cli.expects(:abort) - cli.pre_run - end - test 'initializes pidfile from the env' do cli = new_cli([], 'PIDFILE' => 'bar') assert_equal('bar', cli.send(:options)[:pidfile]) @@ -98,14 +79,6 @@ def new_cli(argv = [], env = {}) assert_equal('foo', cli.send(:options)[:pidfile]) end - test 'writes pid to pidfile when given' do - mock_pidfile = mock('pidfile') - mock_pidfile.expects(:puts) - File.expects(:open).with('derp.pid', 'w').yields(mock_pidfile) - cli = new_cli(%w(--pidfile derp.pid)) - cli.pre_run - end - test 'defaults to nil dynamic' do assert_equal(nil, new_cli.send(:options)[:dynamic]) end @@ -158,25 +131,25 @@ def new_cli(argv = [], env = {}) cli.pre_run end - test 'initializes mute/quiet from the env' do + test 'initializes quiet from the env' do cli = new_cli([], 'QUIET' => '1') - assert_equal('1', cli.send(:options)[:mute]) + assert_equal('1', cli.send(:options)[:quiet]) end - test 'defaults to unmuted' do - assert_equal(false, !!new_cli.send(:options)[:mute]) + test 'defaults to un-quieted' do + assert_equal(false, !!new_cli.send(:options)[:quiet]) end - test 'accepts mute/quiet via -q' do + test 'accepts quiet via -q' do cli = new_cli(%w(-q)) cli.parse_options - assert_equal(true, cli.send(:options)[:mute]) + assert_equal(true, cli.send(:options)[:quiet]) end - test 'accepts mute via --quiet' do + test 'accepts quiet via --quiet' do cli = new_cli(%w(--quiet)) cli.parse_options - assert_equal(true, cli.send(:options)[:mute]) + assert_equal(true, cli.send(:options)[:quiet]) end test 'initializes logfile from the env' do @@ -248,6 +221,6 @@ def new_cli(argv = [], env = {}) test 'runs Resque::Scheduler' do Resque::Scheduler.expects(:run) - ResqueScheduler::Cli.run!([], {}) + Resque::Scheduler::Cli.run!([], {}) end end diff --git a/test/delayed_queue_test.rb b/test/delayed_queue_test.rb index d99f468b..ba6eb7f3 100644 --- a/test/delayed_queue_test.rb +++ b/test/delayed_queue_test.rb @@ -3,7 +3,7 @@ context 'DelayedQueue' do setup do - Resque::Scheduler.mute = true + Resque::Scheduler.quiet = true Resque.redis.flushall end @@ -534,11 +534,9 @@ Resque.enqueue_at Time.now + 1, SomeIvarJob Resque.enqueue_at Time.now + 1, SomeIvarJob, id: 1 - assert(Resque.delayed?(SomeIvarJob, id: 1)) - assert(!Resque.delayed?(SomeIvarJob, id: 2)) - assert(Resque.delayed?(SomeIvarJob)) - assert(!Resque.delayed?(SomeJob)) - + assert Resque.delayed?(SomeIvarJob, id: 1) + assert !Resque.delayed?(SomeIvarJob, id: 2) + assert Resque.delayed?(SomeIvarJob) + assert !Resque.delayed?(SomeJob) end - end diff --git a/test/env_test.rb b/test/env_test.rb new file mode 100644 index 00000000..01b543b9 --- /dev/null +++ b/test/env_test.rb @@ -0,0 +1,41 @@ +# vim:fileencoding=utf-8 +require_relative 'test_helper' + +context 'Env' do + def new_env(options = {}) + Resque::Scheduler::Env.new(options) + end + + test 'daemonizes when background is true' do + Process.expects(:daemon) + env = new_env(background: true) + env.setup + end + + test 'reconnects redis when background is true' do + Process.stubs(:daemon) + mock_redis_client = mock('redis_client') + mock_redis = mock('redis') + mock_redis.expects(:client).returns(mock_redis_client) + mock_redis_client.expects(:reconnect) + Resque.expects(:redis).returns(mock_redis) + env = new_env(background: true) + env.setup + end + + test 'aborts when background is given and Process does not support daemon' do + Process.stubs(:daemon) + Process.expects(:respond_to?).with('daemon').returns(false) + env = new_env(background: true) + env.expects(:abort) + env.setup + end + + test 'writes pid to pidfile when given' do + mock_pidfile = mock('pidfile') + mock_pidfile.expects(:puts) + File.expects(:open).with('derp.pid', 'w').yields(mock_pidfile) + env = new_env(pidfile: 'derp.pid') + env.setup + end +end diff --git a/test/scheduler_args_test.rb b/test/scheduler_args_test.rb index f32702b9..c10cf3f9 100644 --- a/test/scheduler_args_test.rb +++ b/test/scheduler_args_test.rb @@ -5,8 +5,11 @@ context 'scheduling jobs with arguments' do setup do Resque::Scheduler.clear_schedule! - Resque::Scheduler.dynamic = false - Resque::Scheduler.mute = true + Resque::Scheduler.configure do |c| + c.dynamic = false + c.quiet = true + c.poll_sleep_amount = nil + end end test 'enqueue_from_config puts stuff in resque without class loaded' do diff --git a/test/scheduler_locking_test.rb b/test/scheduler_locking_test.rb index 29e06103..891d7a6f 100644 --- a/test/scheduler_locking_test.rb +++ b/test/scheduler_locking_test.rb @@ -9,7 +9,7 @@ def lock_is_not_held(lock) context '#master_lock_key' do setup do - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -25,7 +25,7 @@ def lock_is_not_held(lock) context 'with a prefix set via ENV' do setup do ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'my.prefix' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -43,7 +43,7 @@ def lock_is_not_held(lock) context 'with a namespace set for resque' do setup do Resque.redis.namespace = 'my.namespace' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -61,7 +61,7 @@ def lock_is_not_held(lock) setup do Resque.redis.namespace = 'my.namespace' ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'my.prefix' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -79,9 +79,9 @@ def lock_is_not_held(lock) end end -context 'Resque::SchedulerLocking' do +context 'Resque::Scheduler::Locking' do setup do - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do diff --git a/test/scheduler_setup_test.rb b/test/scheduler_setup_test.rb index e757ed6f..351a4e72 100644 --- a/test/scheduler_setup_test.rb +++ b/test/scheduler_setup_test.rb @@ -14,16 +14,16 @@ test 'set custom logger' do custom_logger = Logger.new('/dev/null') - Resque::Scheduler.logger = custom_logger - assert_equal(custom_logger, Resque::Scheduler.logger) + Resque::Scheduler.send(:logger=, custom_logger) + assert_equal(custom_logger, Resque::Scheduler.send(:logger)) end test 'configure block' do - Resque::Scheduler.mute = false + Resque::Scheduler.quiet = false Resque::Scheduler.configure do |c| - c.mute = true + c.quiet = true end - assert_equal(Resque::Scheduler.mute, true) + assert_equal(Resque::Scheduler.quiet, true) end context 'when getting the env' do @@ -58,16 +58,17 @@ def wipe test 'uses STDOUT' do assert_equal( - Resque::Scheduler.logger.instance_variable_get(:@logdev).dev, $stdout + Resque::Scheduler.send(:logger) + .instance_variable_get(:@logdev).dev, $stdout ) end test 'not verbose' do - assert Resque::Scheduler.logger.level > Logger::DEBUG + assert Resque::Scheduler.send(:logger).level > Logger::DEBUG end - test 'not muted' do - assert Resque::Scheduler.logger.level < Logger::FATAL + test 'not quieted' do + assert Resque::Scheduler.send(:logger).level < Logger::FATAL end end @@ -78,19 +79,20 @@ def wipe test 'uses logfile' do Resque::Scheduler.logfile = '/dev/null' assert_equal( - Resque::Scheduler.logger.instance_variable_get(:@logdev).filename, + Resque::Scheduler.send(:logger) + .instance_variable_get(:@logdev).filename, '/dev/null' ) end test 'set verbosity' do Resque::Scheduler.verbose = true - assert Resque::Scheduler.logger.level == Logger::DEBUG + assert Resque::Scheduler.send(:logger).level == Logger::DEBUG end - test 'mute logger' do - Resque::Scheduler.mute = true - assert Resque::Scheduler.logger.level == Logger::FATAL + test 'quiet logger' do + Resque::Scheduler.quiet = true + assert Resque::Scheduler.send(:logger).level == Logger::FATAL end end diff --git a/test/scheduler_task_test.rb b/test/scheduler_task_test.rb index 09c95853..e6b151d1 100644 --- a/test/scheduler_task_test.rb +++ b/test/scheduler_task_test.rb @@ -3,9 +3,12 @@ context 'Resque::Scheduler' do setup do - Resque::Scheduler.dynamic = false + Resque::Scheduler.configure do |c| + c.dynamic = false + c.poll_sleep_amount = 0.1 + end Resque.redis.flushall - Resque::Scheduler.mute = true + Resque::Scheduler.quiet = true Resque::Scheduler.clear_schedule! Resque::Scheduler.send(:instance_variable_set, :@scheduled_jobs, {}) Resque::Scheduler.send(:instance_variable_set, :@shutdown, false) @@ -23,7 +26,7 @@ @pid = Process.pid Thread.new do - sleep 0.5 + sleep(0.05) Process.kill(:TERM, @pid) end diff --git a/test/scheduler_test.rb b/test/scheduler_test.rb index e465b44b..0e956c5f 100644 --- a/test/scheduler_test.rb +++ b/test/scheduler_test.rb @@ -5,7 +5,7 @@ setup do Resque::Scheduler.configure do |c| c.dynamic = false - c.mute = true + c.quiet = true c.env = nil c.app_name = nil end @@ -378,7 +378,7 @@ test 'adheres to lint' do assert_nothing_raised do Resque::Plugin.lint(Resque::Scheduler) - Resque::Plugin.lint(ResqueScheduler) + Resque::Plugin.lint(Resque::Scheduler::Extension) end end diff --git a/test/test_helper.rb b/test/test_helper.rb index da9c07fb..c232ee39 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -7,8 +7,8 @@ require 'resque' $LOAD_PATH.unshift File.dirname(File.expand_path(__FILE__)) + '/../lib' -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' unless ENV['RESQUE_SCHEDULER_DISABLE_TEST_REDIS_SERVER'] # Start our own Redis when the tests start. RedisInstance will take care of @@ -101,7 +101,7 @@ def self.perform(*args) DYNAMIC_SCHEDULE LOGFILE LOGFORMAT - MUTE + QUIET RAILS_ENV RESQUE_SCHEDULER_INTERVAL VERBOSE @@ -111,10 +111,10 @@ def self.perform(*args) def nullify_logger Resque::Scheduler.configure do |c| - c.mute = nil + c.quiet = nil c.verbose = nil c.logfile = nil - c.logger = nil + c.send(:logger=, nil) end ENV['LOGFILE'] = nil diff --git a/test/util_test.rb b/test/util_test.rb index f3782eee..8fdad2c9 100644 --- a/test/util_test.rb +++ b/test/util_test.rb @@ -2,10 +2,10 @@ context 'Util' do def util - ResqueScheduler::Util + Resque::Scheduler::Util end test 'constantizing' do - assert util.constantize('resque-scheduler') == ResqueScheduler + assert util.constantize('Resque::Scheduler') == Resque::Scheduler end end