diff --git a/.gitignore b/.gitignore index 7d8cb014..e29957fa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,12 @@ -.bundle/ -.idea/ +/.bundle/ +/.idea/ +/.yardoc/ -doc/ -pkg +/doc/ +/pkg/ nbproject -Gemfile.lock -.rvmrc +/Gemfile.lock +/.rvmrc *.swp /coverage/ diff --git a/README.md b/README.md index 97da4e3d..caeec3a3 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ gem 'resque-scheduler' Adding the resque:scheduler rake task: ```ruby -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' ``` ### Rake integration @@ -66,12 +66,12 @@ everything `resque` needs to know. ```ruby # Resque tasks require 'resque/tasks' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' namespace :resque do task :setup do require 'resque' - require 'resque_scheduler' + require 'resque-scheduler' # you probably already have this somewhere Resque.redis = 'localhost:6379' @@ -448,7 +448,7 @@ redis instance and schedule. The scheduler processes will use redis to elect a master process and detect failover when the master dies. Precautions are taken to prevent jobs from potentially being queued twice during failover even when the clocks of the scheduler machines are slightly out of sync (or load affects -scheduled job firing time). If you want the gory details, look at Resque::SchedulerLocking. +scheduled job firing time). If you want the gory details, look at Resque::Scheduler::Locking. If the scheduler process(es) goes down for whatever reason, the delayed items that should have fired during the outage will fire once the scheduler process @@ -495,8 +495,8 @@ Now, you want to add the following: ```ruby # This will make the tabs show up. -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' ``` That should make the scheduler tabs show up in `resque-web`. diff --git a/Rakefile b/Rakefile index 75283ee1..93a3b210 100644 --- a/Rakefile +++ b/Rakefile @@ -2,6 +2,7 @@ require 'bundler/gem_tasks' require 'rake/testtask' require 'rubocop/rake_task' +require 'yard' task default: [:rubocop, :test] @@ -15,3 +16,5 @@ Rake::TestTask.new do |t| o << '--verbose ' if ENV['VERBOSE'] end end + +YARD::Rake::YardocTask.new diff --git a/bin/resque-scheduler b/bin/resque-scheduler index 124b9063..f26121d1 100755 --- a/bin/resque-scheduler +++ b/bin/resque-scheduler @@ -1,5 +1,5 @@ #!/usr/bin/env ruby # vim:fileencoding=utf-8 -require 'resque_scheduler' -ResqueScheduler::Cli.run! +require 'resque-scheduler' +Resque::Scheduler::Cli.run! diff --git a/examples/Rakefile b/examples/Rakefile index bf314d02..bf31d618 100644 --- a/examples/Rakefile +++ b/examples/Rakefile @@ -1,2 +1,2 @@ # vim:fileencoding=utf-8 -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' diff --git a/examples/config/initializers/resque-web.rb b/examples/config/initializers/resque-web.rb index 9a2ed792..20e1a141 100644 --- a/examples/config/initializers/resque-web.rb +++ b/examples/config/initializers/resque-web.rb @@ -7,8 +7,8 @@ redis_env_var = ENV['REDIS_PROVIDER'] || 'REDIS_URL' Resque.redis = ENV[redis_env_var] || 'localhost:6379' -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' schedule_yml = ENV['RESQUE_SCHEDULE_YML'] if schedule_yml diff --git a/examples/dynamic-scheduling/lib/tasks/resque.rake b/examples/dynamic-scheduling/lib/tasks/resque.rake index 76b8dbc1..5e7e8a07 100644 --- a/examples/dynamic-scheduling/lib/tasks/resque.rake +++ b/examples/dynamic-scheduling/lib/tasks/resque.rake @@ -1,13 +1,13 @@ # vim:fileencoding=utf-8 require 'resque/tasks' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' require 'yaml' namespace :resque do task :setup do require 'resque' - require 'resque_scheduler' + require 'resque-scheduler' rails_root = ENV['RAILS_ROOT'] || File.expand_path('../../../', __FILE__) rails_env = ENV['RAILS_ENV'] || 'development' diff --git a/lib/resque-scheduler.rb b/lib/resque-scheduler.rb index b1e9d337..b770cd91 100644 --- a/lib/resque-scheduler.rb +++ b/lib/resque-scheduler.rb @@ -1,2 +1,4 @@ # vim:fileencoding=utf-8 -require File.expand_path('../resque_scheduler', __FILE__) +require_relative 'resque/scheduler' + +Resque.extend Resque::Scheduler::Extension diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index f08af92b..d207f4f5 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -1,11 +1,14 @@ # vim:fileencoding=utf-8 require 'rufus/scheduler' -require 'resque/scheduler_locking' -require 'resque_scheduler/logger_builder' +require_relative 'scheduler/locking' +require_relative 'scheduler/logger_builder' module Resque - class Scheduler - extend Resque::SchedulerLocking + module Scheduler + autoload :Cli, 'resque/scheduler/cli' + autoload :Extension, 'resque/scheduler/extension' + + extend Resque::Scheduler::Locking class << self # Allows for block-style configuration @@ -83,7 +86,7 @@ def poll_sleep_amount attr_writer :logger def logger - @logger ||= ResqueScheduler::LoggerBuilder.new( + @logger ||= Resque::Scheduler::LoggerBuilder.new( mute: mute, verbose: verbose, log_dev: logfile, @@ -202,8 +205,8 @@ def optionizate_interval_value(value) args end - # Loads a job schedule into the Rufus::Scheduler and stores it in - # @scheduled_jobs + # Loads a job schedule into the Rufus::Scheduler and stores it + # in @scheduled_jobs def load_schedule_job(name, config) # If `rails_env` or `env` is set in the config, load jobs only if they # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or @@ -306,7 +309,7 @@ def enqueue_from_config(job_config) klass_name = job_config['class'] || job_config[:class] begin - klass = ResqueScheduler::Util.constantize(klass_name) + klass = Resque::Scheduler::Util.constantize(klass_name) rescue NameError klass = klass_name end @@ -324,7 +327,7 @@ def enqueue_from_config(job_config) # from the web perhaps), fall back to enqueing normally via # Resque::Job.create. begin - ResqueScheduler::Util.constantize(job_klass).scheduled( + Resque::Scheduler::Util.constantize(job_klass).scheduled( queue, klass_name, *params ) rescue NameError @@ -337,7 +340,7 @@ def enqueue_from_config(job_config) # for non-existent classes (for example: running scheduler in # one app that schedules for another. if Class === klass - ResqueScheduler::Plugin.run_before_delayed_enqueue_hooks( + Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks( klass, *params ) @@ -474,7 +477,7 @@ def build_procline(string) end def internal_name - "resque-scheduler-#{ResqueScheduler::VERSION}" + "resque-scheduler-#{Resque::Scheduler::VERSION}" end end end diff --git a/lib/resque/scheduler/cli.rb b/lib/resque/scheduler/cli.rb new file mode 100644 index 00000000..9915d3b4 --- /dev/null +++ b/lib/resque/scheduler/cli.rb @@ -0,0 +1,164 @@ +# vim:fileencoding=utf-8 + +require 'optparse' + +module Resque + module Scheduler + class Cli + BANNER = <<-EOF.gsub(/ {6}/, '') + Usage: resque-scheduler [options] + + Runs a resque scheduler process directly (rather than via rake). + + EOF + OPTIONS = [ + { + args: ['-n', '--app-name [APP_NAME]', + 'Application name for procline'], + callback: ->(options) { ->(n) { options[:app_name] = n } } + }, + { + args: ['-B', '--background', 'Run in the background [BACKGROUND]'], + callback: ->(options) { ->(b) { options[:background] = b } } + }, + { + args: ['-D', '--dynamic-schedule', + 'Enable dynamic scheduling [DYNAMIC_SCHEDULE]'], + callback: ->(options) { ->(d) { options[:dynamic] = d } } + }, + { + args: ['-E', '--environment [RAILS_ENV]', 'Environment name'], + callback: ->(options) { ->(e) { options[:env] = e } } + }, + { + args: ['-I', '--initializer-path [INITIALIZER_PATH]', + 'Path to optional initializer ruby file'], + callback: ->(options) { ->(i) { options[:initializer_path] = i } } + }, + { + args: ['-i', '--interval [RESQUE_SCHEDULER_INTERVAL]', + 'Interval for checking if a scheduled job must run'], + callback: ->(options) { ->(i) { options[:poll_sleep_amount] = i } } + }, + { + args: ['-l', '--logfile [LOGFILE]', 'Log file name'], + callback: ->(options) { ->(l) { options[:logfile] = l } } + }, + { + args: ['-F', '--logformat [LOGFORMAT]', 'Log output format'], + callback: ->(options) { ->(f) { options[:logformat] = f } } + }, + { + args: ['-P', '--pidfile [PIDFILE]', 'PID file name'], + callback: ->(options) { ->(p) { options[:pidfile] = p } } + }, + { + args: ['-q', '--quiet', + 'Run with minimal output [QUIET] (or [MUTE])'], + callback: ->(options) { ->(q) { options[:mute] = q } } + }, + { + args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'], + callback: ->(options) { ->(v) { options[:verbose] = v } } + } + ].freeze + + def self.run!(argv = ARGV, env = ENV) + new(argv, env).run! + end + + def initialize(argv = ARGV, env = ENV) + @argv = argv + @env = env + end + + def run! + pre_run + run_forever + end + + def pre_run + parse_options + pre_setup + setup_env + end + + def parse_options + OptionParser.new do |opts| + opts.banner = BANNER + OPTIONS.each do |opt| + opts.on(*opt[:args], &(opt[:callback].call(options))) + end + end.parse!(argv.dup) + end + + def pre_setup + if options[:initializer_path] + load options[:initializer_path].to_s.strip + else + false + end + end + + def setup_env + require 'resque' + require 'resque/scheduler' + + # Need to set this here for conditional Process.daemon redirect of + # stderr/stdout to /dev/null + Resque::Scheduler.mute = !!options[:mute] + + if options[:background] + unless Process.respond_to?('daemon') + abort 'background option is set, which requires ruby >= 1.9' + end + + Process.daemon(true, !Resque::Scheduler.mute) + Resque.redis.client.reconnect + end + + File.open(options[:pidfile], 'w') do |f| + f.puts $PROCESS_ID + end if options[:pidfile] + + Resque::Scheduler.configure do |c| + # These settings are somewhat redundant given the defaults present + # in the attr reader methods. They are left here for clarity and + # to serve as an example of how to use `.configure`. + + c.app_name = options[:app_name] + c.dynamic = !!options[:dynamic] + c.env = options[:env] + c.logfile = options[:logfile] + c.logformat = options[:logformat] + c.poll_sleep_amount = Float(options[:poll_sleep_amount] || '5') + c.verbose = !!options[:verbose] + end + end + + def run_forever + Resque::Scheduler.run + end + + private + + attr_reader :argv, :env + + def options + @options ||= { + app_name: env['APP_NAME'], + background: env['BACKGROUND'], + dynamic: env['DYNAMIC_SCHEDULE'], + env: env['RAILS_ENV'], + initializer_path: env['INITIALIZER_PATH'], + logfile: env['LOGFILE'], + logformat: env['LOGFORMAT'], + mute: env['MUTE'] || env['QUIET'], + pidfile: env['PIDFILE'], + poll_sleep_amount: env['RESQUE_SCHEDULER_INTERVAL'], + verbose: env['VERBOSE'] + } + end + end + end +end diff --git a/lib/resque/scheduler/extension.rb b/lib/resque/scheduler/extension.rb new file mode 100644 index 00000000..6564252c --- /dev/null +++ b/lib/resque/scheduler/extension.rb @@ -0,0 +1,397 @@ +# vim:fileencoding=utf-8 +require 'rubygems' +require 'resque' +require_relative 'version' +require_relative 'util' +require_relative '../scheduler' +require_relative 'plugin' + +module Resque + module Scheduler + module Extension + # Accepts a new schedule configuration of the form: + # + # { + # "MakeTea" => { + # "every" => "1m" }, + # "some_name" => { + # "cron" => "5/* * * *", + # "class" => "DoSomeWork", + # "args" => "work on this string", + # "description" => "this thing works it"s butter off" }, + # ... + # } + # + # Hash keys can be anything and are used to describe and reference + # the scheduled job. If the "class" argument is missing, the key + # is used implicitly as "class" argument - in the "MakeTea" example, + # "MakeTea" is used both as job name and resque worker class. + # + # Any jobs that were in the old schedule, but are not + # present in the new schedule, will be removed. + # + # :cron can be any cron scheduling string + # + # :every can be used in lieu of :cron. see rufus-scheduler's 'every' + # usage for valid syntax. If :cron is present it will take precedence + # over :every. + # + # :class must be a resque worker class. If it is missing, the job name + # (hash key) will be used as :class. + # + # :args can be any yaml which will be converted to a ruby literal and + # passed in a params. (optional) + # + # :rails_envs is the list of envs where the job gets loaded. Envs are + # comma separated (optional) + # + # :description is just that, a description of the job (optional). If + # params is an array, each element in the array is passed as a separate + # param, otherwise params is passed in as the only parameter to perform. + def schedule=(schedule_hash) + # clean the schedules as it exists in redis + clean_schedules + + schedule_hash = prepare_schedule(schedule_hash) + + # store all schedules in redis, so we can retrieve them back + # everywhere. + schedule_hash.each do |name, job_spec| + set_schedule(name, job_spec) + end + + # ensure only return the successfully saved data! + reload_schedule! + end + + # Returns the schedule hash + def schedule + @schedule ||= all_schedules + @schedule || {} + end + + # reloads the schedule from redis + def reload_schedule! + @schedule = all_schedules + end + + # gets the schedules as it exists in redis + def all_schedules + return nil unless redis.exists(:schedules) + + redis.hgetall(:schedules).tap do |h| + h.each do |name, config| + h[name] = decode(config) + end + end + end + + # clean the schedules as it exists in redis, useful for first setup? + def clean_schedules + if redis.exists(:schedules) + redis.hkeys(:schedules).each do |key| + remove_schedule(key) unless schedule_persisted?(key) + end + end + @schedule = nil + true + end + + # Create or update a schedule with the provided name and configuration. + # + # Note: values for class and custom_job_class need to be strings, + # not constants. + # + # Resque.set_schedule('some_job', {:class => 'SomeJob', + # :every => '15mins', + # :queue => 'high', + # :args => '/tmp/poop'}) + def set_schedule(name, config) + existing_config = fetch_schedule(name) + persist = config.delete(:persist) || config.delete('persist') + unless existing_config && existing_config == config + redis.pipelined do + redis.hset(:schedules, name, encode(config)) + redis.sadd(:schedules_changed, name) + redis.sadd(:persisted_schedules, name) if persist + end + end + config + end + + # retrive the schedule configuration for the given name + def fetch_schedule(name) + decode(redis.hget(:schedules, name)) + end + + def schedule_persisted?(name) + redis.sismember(:persisted_schedules, name) + end + + # remove a given schedule by name + def remove_schedule(name) + redis.pipelined do + redis.hdel(:schedules, name) + redis.srem(:persisted_schedules, name) + redis.sadd(:schedules_changed, name) + end + end + + # This method is nearly identical to +enqueue+ only it also + # takes a timestamp which will be used to schedule the job + # for queueing. Until timestamp is in the past, the job will + # sit in the schedule list. + def enqueue_at(timestamp, klass, *args) + validate(klass) + enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args) + end + + # Identical to +enqueue_at+, except you can also specify + # a queue in which the job will be placed after the + # timestamp has passed. It respects Resque.inline option, by + # creating the job right away instead of adding to the queue. + def enqueue_at_with_queue(queue, timestamp, klass, *args) + return false unless Plugin.run_before_schedule_hooks(klass, *args) + + if Resque.inline? || timestamp.to_i < Time.now.to_i + # Just create the job and let resque perform it right away with + # inline. If the class is a custom job class, call self#scheduled on + # it. This allows you to do things like Resque.enqueue_at(timestamp, + # CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque. + if klass.respond_to?(:scheduled) + klass.scheduled(queue, klass.to_s, *args) + else + Resque::Job.create(queue, klass, *args) + end + else + delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) + end + + Plugin.run_after_schedule_hooks(klass, *args) + end + + # Identical to enqueue_at but takes number_of_seconds_from_now + # instead of a timestamp. + def enqueue_in(number_of_seconds_from_now, klass, *args) + enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) + end + + # Identical to +enqueue_in+, except you can also specify + # a queue in which the job will be placed after the + # number of seconds has passed. + def enqueue_in_with_queue(queue, number_of_seconds_from_now, + klass, *args) + enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, + klass, *args) + end + + # Used internally to stuff the item into the schedule sorted list. + # +timestamp+ can be either in seconds or a datetime object Insertion if + # O(log(n)). Returns true if it's the first job to be scheduled at that + # time, else false + def delayed_push(timestamp, item) + # First add this item to the list for this timestamp + redis.rpush("delayed:#{timestamp.to_i}", encode(item)) + + # Store the timestamps at with this item occurs + redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") + + # Now, add this timestamp to the zsets. The score and the value are + # the same since we'll be querying by timestamp, and we don't have + # anything else to store. + redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i + end + + # Returns an array of timestamps based on start and count + def delayed_queue_peek(start, count) + result = redis.zrange(:delayed_queue_schedule, start, + start + count - 1) + Array(result).map(&:to_i) + end + + # Returns the size of the delayed queue schedule + def delayed_queue_schedule_size + redis.zcard :delayed_queue_schedule + end + + # Returns the number of jobs for a given timestamp in the delayed queue + # schedule + def delayed_timestamp_size(timestamp) + redis.llen("delayed:#{timestamp.to_i}").to_i + end + + # Returns an array of delayed items for the given timestamp + def delayed_timestamp_peek(timestamp, start, count) + if 1 == count + r = list_range "delayed:#{timestamp.to_i}", start, count + r.nil? ? [] : [r] + else + list_range "delayed:#{timestamp.to_i}", start, count + end + end + + # Returns the next delayed queue timestamp + # (don't call directly) + def next_delayed_timestamp(at_time = nil) + items = redis.zrangebyscore( + :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, + limit: [0, 1] + ) + timestamp = items.nil? ? nil : Array(items).first + timestamp.to_i unless timestamp.nil? + end + + # Returns the next item to be processed for a given timestamp, nil if + # done. (don't call directly) + # +timestamp+ can either be in seconds or a datetime + def next_item_for_timestamp(timestamp) + key = "delayed:#{timestamp.to_i}" + + encoded_item = redis.lpop(key) + redis.srem("timestamps:#{encoded_item}", key) + item = decode(encoded_item) + + # If the list is empty, remove it. + clean_up_timestamp(key, timestamp) + item + end + + # Clears all jobs created with enqueue_at or enqueue_in + def reset_delayed_queue + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| + key = "delayed:#{item}" + items = redis.lrange(key, 0, -1) + redis.pipelined do + items.each { |ts_item| redis.del("timestamps:#{ts_item}") } + end + redis.del key + end + + redis.del :delayed_queue_schedule + end + + # Given an encoded item, remove it from the delayed_queue + def remove_delayed(klass, *args) + search = encode(job_to_hash(klass, args)) + timestamps = redis.smembers("timestamps:#{search}") + + replies = redis.pipelined do + timestamps.each do |key| + redis.lrem(key, 0, search) + redis.srem("timestamps:#{search}", key) + end + end + + return 0 if replies.nil? || replies.empty? + replies.each_slice(2).map(&:first).inject(:+) + end + + # Given an encoded item, enqueue it now + def enqueue_delayed(klass, *args) + hash = job_to_hash(klass, args) + remove_delayed(klass, *args).times do + Resque::Scheduler.enqueue_from_config(hash) + end + end + + # Given a block, remove jobs that return true from a block + # + # This allows for removal of delayed jobs that have arguments matching + # certain criteria + def remove_delayed_selection + fail ArgumentError, 'Please supply a block' unless block_given? + + destroyed = 0 + # There is no way to search Redis list entries for a partial match, so + # we query for all delayed job tasks and do our matching after decoding + # the payload data + jobs = Resque.redis.keys('delayed:*') + jobs.each do |job| + index = Resque.redis.llen(job) - 1 + while index >= 0 + payload = Resque.redis.lindex(job, index) + decoded_payload = decode(payload) + if yield(decoded_payload['args']) + removed = redis.lrem job, 0, payload + destroyed += removed + index -= removed + else + index -= 1 + end + end + end + destroyed + end + + # Given a timestamp and job (klass + args) it removes all instances and + # returns the count of jobs removed. + # + # O(N) where N is the number of jobs scheduled to fire at the given + # timestamp + def remove_delayed_job_from_timestamp(timestamp, klass, *args) + key = "delayed:#{timestamp.to_i}" + encoded_job = encode(job_to_hash(klass, args)) + + redis.srem("timestamps:#{encoded_job}", key) + count = redis.lrem(key, 0, encoded_job) + clean_up_timestamp(key, timestamp) + + count + end + + def count_all_scheduled_jobs + total_jobs = 0 + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp| + total_jobs += redis.llen("delayed:#{timestamp}").to_i + end + total_jobs + end + + # Returns delayed jobs schedule timestamp for +klass+, +args+. + def scheduled_at(klass, *args) + search = encode(job_to_hash(klass, args)) + redis.smembers("timestamps:#{search}").map do |key| + key.tr('delayed:', '').to_i + end + end + + private + + def job_to_hash(klass, args) + { class: klass.to_s, args: args, queue: queue_from_class(klass) } + end + + def job_to_hash_with_queue(queue, klass, args) + { class: klass.to_s, args: args, queue: queue } + end + + def clean_up_timestamp(key, timestamp) + # If the list is empty, remove it. + + # Use a watch here to ensure nobody adds jobs to this delayed + # queue while we're removing it. + redis.watch key + if 0 == redis.llen(key).to_i + redis.multi do + redis.del key + redis.zrem :delayed_queue_schedule, timestamp.to_i + end + else + redis.unwatch + end + end + + def prepare_schedule(schedule_hash) + prepared_hash = {} + schedule_hash.each do |name, job_spec| + job_spec = job_spec.dup + unless job_spec.key?('class') || job_spec.key?(:class) + job_spec['class'] = name + end + prepared_hash[name] = job_spec + end + prepared_hash + end + end + end +end diff --git a/lib/resque/scheduler/lock/base.rb b/lib/resque/scheduler/lock/base.rb index 52aa43fc..fe33eb4d 100644 --- a/lib/resque/scheduler/lock/base.rb +++ b/lib/resque/scheduler/lock/base.rb @@ -1,7 +1,7 @@ # vim:fileencoding=utf-8 module Resque - class Scheduler + module Scheduler module Lock class Base attr_reader :key diff --git a/lib/resque/scheduler/lock/basic.rb b/lib/resque/scheduler/lock/basic.rb index 10cecd77..b2a030f1 100644 --- a/lib/resque/scheduler/lock/basic.rb +++ b/lib/resque/scheduler/lock/basic.rb @@ -1,8 +1,8 @@ # vim:fileencoding=utf-8 -require 'resque/scheduler/lock/base' +require_relative 'base' module Resque - class Scheduler + module Scheduler module Lock class Basic < Base def acquire! diff --git a/lib/resque/scheduler/lock/resilient.rb b/lib/resque/scheduler/lock/resilient.rb index 6c7eba00..906a8a91 100644 --- a/lib/resque/scheduler/lock/resilient.rb +++ b/lib/resque/scheduler/lock/resilient.rb @@ -1,8 +1,8 @@ # vim:fileencoding=utf-8 -require 'resque/scheduler/lock/base' +require_relative 'base' module Resque - class Scheduler + module Scheduler module Lock class Resilient < Base def acquire! diff --git a/lib/resque/scheduler_locking.rb b/lib/resque/scheduler/locking.rb similarity index 77% rename from lib/resque/scheduler_locking.rb rename to lib/resque/scheduler/locking.rb index 2a09bcd7..5a100172 100644 --- a/lib/resque/scheduler_locking.rb +++ b/lib/resque/scheduler/locking.rb @@ -49,44 +49,46 @@ # you stop cron, no jobs fire while it's stopped and it doesn't fire jobs that # were missed when it starts up again. -require 'resque/scheduler/lock' +require_relative 'lock' module Resque - module SchedulerLocking - def master_lock - @master_lock ||= build_master_lock - end + module Scheduler + module Locking + def master_lock + @master_lock ||= build_master_lock + end - def supports_lua? - redis_master_version >= 2.5 - end + def supports_lua? + redis_master_version >= 2.5 + end - def master? - master_lock.acquire! || master_lock.locked? - end + def master? + master_lock.acquire! || master_lock.locked? + end - def release_master_lock! - master_lock.release! - end + def release_master_lock! + master_lock.release! + end - private + private - def build_master_lock - if supports_lua? - Resque::Scheduler::Lock::Resilient.new(master_lock_key) - else - Resque::Scheduler::Lock::Basic.new(master_lock_key) + def build_master_lock + if supports_lua? + Resque::Scheduler::Lock::Resilient.new(master_lock_key) + else + Resque::Scheduler::Lock::Basic.new(master_lock_key) + end end - end - def master_lock_key - lock_prefix = ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] || '' - lock_prefix += ':' if lock_prefix != '' - "#{Resque.redis.namespace}:#{lock_prefix}resque_scheduler_master_lock" - end + def master_lock_key + lock_prefix = ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] || '' + lock_prefix += ':' if lock_prefix != '' + "#{Resque.redis.namespace}:#{lock_prefix}resque_scheduler_master_lock" + end - def redis_master_version - Resque.redis.info['redis_version'].to_f + def redis_master_version + Resque.redis.info['redis_version'].to_f + end end end end diff --git a/lib/resque/scheduler/logger_builder.rb b/lib/resque/scheduler/logger_builder.rb new file mode 100644 index 00000000..d11981a4 --- /dev/null +++ b/lib/resque/scheduler/logger_builder.rb @@ -0,0 +1,70 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + # Just builds a logger, with specified verbosity and destination. + # The simplest example: + # + # Resque::Scheduler::LoggerBuilder.new.build + class LoggerBuilder + # Initializes new instance of the builder + # + # Pass :opts Hash with + # - :mute if logger needs to be silent for all levels. Default - false + # - :verbose if there is a need in debug messages. Default - false + # - :log_dev to output logs into a desired file. Default - STDOUT + # - :format log format, either 'text' or 'json'. Default - 'text' + # + # Example: + # + # LoggerBuilder.new( + # :mute => false, :verbose => true, :log_dev => 'log/scheduler.log' + # ) + def initialize(opts = {}) + @muted = !!opts[:mute] + @verbose = !!opts[:verbose] + @log_dev = opts[:log_dev] || $stdout + @format = opts[:format] || 'text' + end + + # Returns an instance of Logger + def build + logger = Logger.new(@log_dev) + logger.level = level + logger.formatter = send(:"#{@format}_formatter") + logger + end + + private + + def level + if @verbose && !@muted + Logger::DEBUG + elsif !@muted + Logger::INFO + else + Logger::FATAL + end + end + + def text_formatter + proc do |severity, datetime, progname, msg| + "resque-scheduler: [#{severity}] #{datetime.iso8601}: #{msg}\n" + end + end + + def json_formatter + proc do |severity, datetime, progname, msg| + require 'json' + JSON.dump( + name: 'resque-scheduler', + progname: progname, + level: severity, + timestamp: datetime.iso8601, + msg: msg + ) + "\n" + end + end + end + end +end diff --git a/lib/resque/scheduler/plugin.rb b/lib/resque/scheduler/plugin.rb new file mode 100644 index 00000000..bab898b4 --- /dev/null +++ b/lib/resque/scheduler/plugin.rb @@ -0,0 +1,31 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module Plugin + def self.hooks(job, pattern) + job.methods.grep(/^#{pattern}/).sort + end + + def self.run_hooks(job, pattern, *args) + results = hooks(job, pattern).map do |hook| + job.send(hook, *args) + end + + results.all? { |result| result != false } + end + + def self.run_before_delayed_enqueue_hooks(klass, *args) + run_hooks(klass, 'before_delayed_enqueue', *args) + end + + def self.run_before_schedule_hooks(klass, *args) + run_hooks(klass, 'before_schedule', *args) + end + + def self.run_after_schedule_hooks(klass, *args) + run_hooks(klass, 'after_schedule', *args) + end + end + end +end diff --git a/lib/resque/scheduler/server.rb b/lib/resque/scheduler/server.rb new file mode 100644 index 00000000..4d9f9954 --- /dev/null +++ b/lib/resque/scheduler/server.rb @@ -0,0 +1,197 @@ +# vim:fileencoding=utf-8 +require 'resque-scheduler' +require 'resque/server' +require 'json' + +# Extend Resque::Server to add tabs +module Resque + module Scheduler + module Server + def self.included(base) + base.class_eval do + helpers do + def format_time(t) + t.strftime('%Y-%m-%d %H:%M:%S %z') + end + + def queue_from_class_name(class_name) + Resque.queue_from_class( + Resque::Scheduler::Util.constantize(class_name) + ) + end + + def find_job(worker) + worker = worker.downcase + results = Array.new + + # Check working jobs + working = [*Resque.working] + work = working.select do |w| + w.job && w.job['payload'] && + w.job['payload']['class'].downcase.include?(worker) + end + work.each do |w| + results += [ + w.job['payload'].merge( + 'queue' => w.job['queue'], 'where_at' => 'working' + ) + ] + end + + # Check delayed Jobs + dels = Array.new + schedule_size = Resque.delayed_queue_schedule_size + Resque.delayed_queue_peek(0, schedule_size).each do |d| + Resque.delayed_timestamp_peek( + d, 0, Resque.delayed_timestamp_size(d)).each do |j| + dels << j.merge!('timestamp' => d) + end + end + results += dels.select do |j| + j['class'].downcase.include?(worker) && + j.merge!('where_at' => 'delayed') + end + + # Check Queues + Resque.queues.each do |queue| + queued = Resque.peek(queue, 0, Resque.size(queue)) + queued = [queued] unless queued.is_a?(Array) + results += queued.select do |j| + j['class'].downcase.include?(worker) && + j.merge!('queue' => queue, 'where_at' => 'queued') + end + end + results + end + + def schedule_interval(config) + if config['every'] + schedule_interval_every(config['every']) + elsif config['cron'] + 'cron: ' + config['cron'].to_s + else + 'Not currently scheduled' + end + end + + def schedule_interval_every(every) + every = [*every] + s = 'every: ' << every.first + + return s unless every.length > 1 + + s << ' (' + meta = every.last.map do |key, value| + "#{key.to_s.gsub(/_/, ' ')} #{value}" + end + s << meta.join(', ') << ')' + end + + def schedule_class(config) + if config['class'].nil? && !config['custom_job_class'].nil? + config['custom_job_class'] + else + config['class'] + end + end + + def scheduler_template(name) + File.read( + File.expand_path("../server/views/#{name}.erb", __FILE__) + ) + end + end + + get '/schedule' do + Resque.reload_schedule! if Resque::Scheduler.dynamic + erb scheduler_template('scheduler') + end + + post '/schedule/requeue' do + @job_name = params['job_name'] || params[:job_name] + config = Resque.schedule[@job_name] + @parameters = config['parameters'] || config[:parameters] + if @parameters + erb scheduler_template('requeue-params') + else + Resque::Scheduler.enqueue_from_config(config) + redirect u('/overview') + end + end + + post '/schedule/requeue_with_params' do + job_name = params['job_name'] || params[:job_name] + config = Resque.schedule[job_name] + # Build args hash from post data (removing the job name) + submitted_args = params.reject do |key, value| + key == 'job_name' || key == :job_name + end + + # Merge constructed args hash with existing args hash for + # the job, if it exists + config_args = config['args'] || config[:args] || {} + config_args = config_args.merge(submitted_args) + + # Insert the args hash into config and queue the resque job + config = config.merge('args' => config_args) + Resque::Scheduler.enqueue_from_config(config) + redirect u('/overview') + end + + get '/delayed' do + erb scheduler_template('delayed') + end + + get '/delayed/jobs/:klass' do + begin + klass = Resque::Scheduler::Util.constantize(params[:klass]) + @args = JSON.load(URI.decode(params[:args])) + @timestamps = Resque.scheduled_at(klass, *@args) + rescue + @timestamps = [] + end + + erb scheduler_template('delayed_schedules') + end + + post '/delayed/search' do + @jobs = find_job(params[:search]) + erb scheduler_template('search') + end + + get '/delayed/:timestamp' do + erb scheduler_template('delayed_timestamp') + end + + post '/delayed/queue_now' do + timestamp = params['timestamp'].to_i + if timestamp > 0 + Resque::Scheduler.enqueue_delayed_items_for_timestamp(timestamp) + end + redirect u('/overview') + end + + post '/delayed/cancel_now' do + klass = Resque::Scheduler::Util.constantize(params['klass']) + timestamp = params['timestamp'] + args = Resque.decode params['args'] + Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args) + redirect u('/delayed') + end + + post '/delayed/clear' do + Resque.reset_delayed_queue + redirect u('delayed') + end + end + end + end + end +end + +Resque::Server.tabs << 'Schedule' +Resque::Server.tabs << 'Delayed' + +Resque::Server.class_eval do + include Resque::Scheduler::Server +end diff --git a/lib/resque_scheduler/server/views/delayed.erb b/lib/resque/scheduler/server/views/delayed.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed.erb rename to lib/resque/scheduler/server/views/delayed.erb diff --git a/lib/resque_scheduler/server/views/delayed_schedules.erb b/lib/resque/scheduler/server/views/delayed_schedules.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed_schedules.erb rename to lib/resque/scheduler/server/views/delayed_schedules.erb diff --git a/lib/resque_scheduler/server/views/delayed_timestamp.erb b/lib/resque/scheduler/server/views/delayed_timestamp.erb similarity index 100% rename from lib/resque_scheduler/server/views/delayed_timestamp.erb rename to lib/resque/scheduler/server/views/delayed_timestamp.erb diff --git a/lib/resque_scheduler/server/views/requeue-params.erb b/lib/resque/scheduler/server/views/requeue-params.erb similarity index 100% rename from lib/resque_scheduler/server/views/requeue-params.erb rename to lib/resque/scheduler/server/views/requeue-params.erb diff --git a/lib/resque_scheduler/server/views/scheduler.erb b/lib/resque/scheduler/server/views/scheduler.erb similarity index 100% rename from lib/resque_scheduler/server/views/scheduler.erb rename to lib/resque/scheduler/server/views/scheduler.erb diff --git a/lib/resque_scheduler/server/views/search.erb b/lib/resque/scheduler/server/views/search.erb similarity index 100% rename from lib/resque_scheduler/server/views/search.erb rename to lib/resque/scheduler/server/views/search.erb diff --git a/lib/resque_scheduler/server/views/search_form.erb b/lib/resque/scheduler/server/views/search_form.erb similarity index 100% rename from lib/resque_scheduler/server/views/search_form.erb rename to lib/resque/scheduler/server/views/search_form.erb diff --git a/lib/resque_scheduler/tasks.rb b/lib/resque/scheduler/tasks.rb similarity index 85% rename from lib/resque_scheduler/tasks.rb rename to lib/resque/scheduler/tasks.rb index 6e6cfd1c..52eed1aa 100644 --- a/lib/resque_scheduler/tasks.rb +++ b/lib/resque/scheduler/tasks.rb @@ -2,13 +2,13 @@ require 'English' require 'resque/tasks' -require 'resque_scheduler' +require 'resque-scheduler' namespace :resque do task :setup def scheduler_cli - @scheduler_cli ||= ResqueScheduler::Cli.new( + @scheduler_cli ||= Resque::Scheduler::Cli.new( %W(#{ENV['RESQUE_SCHEDULER_OPTIONS']}) ) end diff --git a/lib/resque/scheduler/util.rb b/lib/resque/scheduler/util.rb new file mode 100644 index 00000000..ad983a9b --- /dev/null +++ b/lib/resque/scheduler/util.rb @@ -0,0 +1,41 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + class Util + # In order to upgrade to resque(1.25) which has deprecated following + # methods, we just added these usefull helpers back to use in Resque + # Scheduler. refer to: + # https://github.com/resque/resque-scheduler/pull/273 + + def self.constantize(camel_cased_word) + camel_cased_word = camel_cased_word.to_s + + if camel_cased_word.include?('-') + camel_cased_word = classify(camel_cased_word) + end + + names = camel_cased_word.split('::') + names.shift if names.empty? || names.first.empty? + + constant = Object + names.each do |name| + args = Module.method(:const_get).arity != 1 ? [false] : [] + + if constant.const_defined?(name, *args) + constant = constant.const_get(name) + else + constant = constant.const_missing(name) + end + end + constant + end + + def self.classify(dashed_word) + dashed_word.split('-').each do|part| + part[0] = part[0].chr.upcase + end.join + end + end + end +end diff --git a/lib/resque/scheduler/version.rb b/lib/resque/scheduler/version.rb new file mode 100644 index 00000000..6d55ff62 --- /dev/null +++ b/lib/resque/scheduler/version.rb @@ -0,0 +1,7 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + VERSION = '2.5.2' + end +end diff --git a/lib/resque_scheduler.rb b/lib/resque_scheduler.rb deleted file mode 100644 index 316105db..00000000 --- a/lib/resque_scheduler.rb +++ /dev/null @@ -1,403 +0,0 @@ -# vim:fileencoding=utf-8 -require 'rubygems' -require 'resque' -require 'resque_scheduler/version' -require 'resque_scheduler/util' -require 'resque/scheduler' -require 'resque_scheduler/plugin' - -module ResqueScheduler - autoload :Cli, 'resque_scheduler/cli' - - # - # Accepts a new schedule configuration of the form: - # - # { - # "MakeTea" => { - # "every" => "1m" }, - # "some_name" => { - # "cron" => "5/* * * *", - # "class" => "DoSomeWork", - # "args" => "work on this string", - # "description" => "this thing works it"s butter off" }, - # ... - # } - # - # Hash keys can be anything and are used to describe and reference - # the scheduled job. If the "class" argument is missing, the key - # is used implicitly as "class" argument - in the "MakeTea" example, - # "MakeTea" is used both as job name and resque worker class. - # - # Any jobs that were in the old schedule, but are not - # present in the new schedule, will be removed. - # - # :cron can be any cron scheduling string - # - # :every can be used in lieu of :cron. see rufus-scheduler's 'every' usage - # for valid syntax. If :cron is present it will take precedence over :every. - # - # :class must be a resque worker class. If it is missing, the job name (hash - # key) will be used as :class. - # - # :args can be any yaml which will be converted to a ruby literal and - # passed in a params. (optional) - # - # :rails_envs is the list of envs where the job gets loaded. Envs are - # comma separated (optional) - # - # :description is just that, a description of the job (optional). If - # params is an array, each element in the array is passed as a separate - # param, otherwise params is passed in as the only parameter to perform. - def schedule=(schedule_hash) - # clean the schedules as it exists in redis - clean_schedules - - schedule_hash = prepare_schedule(schedule_hash) - - # store all schedules in redis, so we can retrieve them back everywhere. - schedule_hash.each do |name, job_spec| - set_schedule(name, job_spec) - end - - # ensure only return the successfully saved data! - reload_schedule! - end - - # Returns the schedule hash - def schedule - @schedule ||= all_schedules - @schedule || {} - end - - # reloads the schedule from redis - def reload_schedule! - @schedule = all_schedules - end - - # gets the schedules as it exists in redis - def all_schedules - return nil unless redis.exists(:schedules) - - redis.hgetall(:schedules).tap do |h| - h.each do |name, config| - h[name] = decode(config) - end - end - end - - # clean the schedules as it exists in redis, useful for first setup? - def clean_schedules - if redis.exists(:schedules) - redis.hkeys(:schedules).each do |key| - remove_schedule(key) unless schedule_persisted?(key) - end - end - @schedule = nil - true - end - - # Create or update a schedule with the provided name and configuration. - # - # Note: values for class and custom_job_class need to be strings, - # not constants. - # - # Resque.set_schedule('some_job', {:class => 'SomeJob', - # :every => '15mins', - # :queue => 'high', - # :args => '/tmp/poop'}) - def set_schedule(name, config) - existing_config = fetch_schedule(name) - persist = config.delete(:persist) || config.delete('persist') - unless existing_config && existing_config == config - redis.pipelined do - redis.hset(:schedules, name, encode(config)) - redis.sadd(:schedules_changed, name) - redis.sadd(:persisted_schedules, name) if persist - end - end - config - end - - # retrive the schedule configuration for the given name - def fetch_schedule(name) - decode(redis.hget(:schedules, name)) - end - - def schedule_persisted?(name) - redis.sismember(:persisted_schedules, name) - end - - # remove a given schedule by name - def remove_schedule(name) - redis.pipelined do - redis.hdel(:schedules, name) - redis.srem(:persisted_schedules, name) - redis.sadd(:schedules_changed, name) - end - end - - # This method is nearly identical to +enqueue+ only it also - # takes a timestamp which will be used to schedule the job - # for queueing. Until timestamp is in the past, the job will - # sit in the schedule list. - def enqueue_at(timestamp, klass, *args) - validate(klass) - enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args) - end - - # Identical to +enqueue_at+, except you can also specify - # a queue in which the job will be placed after the - # timestamp has passed. It respects Resque.inline option, by - # creating the job right away instead of adding to the queue. - def enqueue_at_with_queue(queue, timestamp, klass, *args) - return false unless Plugin.run_before_schedule_hooks(klass, *args) - - if Resque.inline? || timestamp.to_i < Time.now.to_i - # Just create the job and let resque perform it right away with inline. - # If the class is a custom job class, call self#scheduled on it. This - # allows you to do things like Resque.enqueue_at(timestamp, - # CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque. - if klass.respond_to?(:scheduled) - klass.scheduled(queue, klass.to_s, *args) - else - Resque::Job.create(queue, klass, *args) - end - else - delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) - end - - Plugin.run_after_schedule_hooks(klass, *args) - end - - # Identical to enqueue_at but takes number_of_seconds_from_now - # instead of a timestamp. - def enqueue_in(number_of_seconds_from_now, klass, *args) - enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) - end - - # Identical to +enqueue_in+, except you can also specify - # a queue in which the job will be placed after the - # number of seconds has passed. - def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) - enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, - klass, *args) - end - - # Used internally to stuff the item into the schedule sorted list. - # +timestamp+ can be either in seconds or a datetime object - # Insertion if O(log(n)). - # Returns true if it's the first job to be scheduled at that time, else false - def delayed_push(timestamp, item) - # First add this item to the list for this timestamp - redis.rpush("delayed:#{timestamp.to_i}", encode(item)) - - # Store the timestamps at with this item occurs - redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") - - # Now, add this timestamp to the zsets. The score and the value are - # the same since we'll be querying by timestamp, and we don't have - # anything else to store. - redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i - end - - # Returns an array of timestamps based on start and count - def delayed_queue_peek(start, count) - result = redis.zrange(:delayed_queue_schedule, start, start + count - 1) - Array(result).map(&:to_i) - end - - # Returns the size of the delayed queue schedule - def delayed_queue_schedule_size - redis.zcard :delayed_queue_schedule - end - - # Returns the number of jobs for a given timestamp in the delayed queue - # schedule - def delayed_timestamp_size(timestamp) - redis.llen("delayed:#{timestamp.to_i}").to_i - end - - # Returns an array of delayed items for the given timestamp - def delayed_timestamp_peek(timestamp, start, count) - if 1 == count - r = list_range "delayed:#{timestamp.to_i}", start, count - r.nil? ? [] : [r] - else - list_range "delayed:#{timestamp.to_i}", start, count - end - end - - # Returns the next delayed queue timestamp - # (don't call directly) - def next_delayed_timestamp(at_time = nil) - items = redis.zrangebyscore( - :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, - limit: [0, 1] - ) - timestamp = items.nil? ? nil : Array(items).first - timestamp.to_i unless timestamp.nil? - end - - # Returns the next item to be processed for a given timestamp, nil if - # done. (don't call directly) - # +timestamp+ can either be in seconds or a datetime - def next_item_for_timestamp(timestamp) - key = "delayed:#{timestamp.to_i}" - - encoded_item = redis.lpop(key) - redis.srem("timestamps:#{encoded_item}", key) - item = decode(encoded_item) - - # If the list is empty, remove it. - clean_up_timestamp(key, timestamp) - item - end - - # Clears all jobs created with enqueue_at or enqueue_in - def reset_delayed_queue - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| - key = "delayed:#{item}" - items = redis.lrange(key, 0, -1) - redis.pipelined do - items.each { |ts_item| redis.del("timestamps:#{ts_item}") } - end - redis.del key - end - - redis.del :delayed_queue_schedule - end - - # Given an encoded item, remove it from the delayed_queue - def remove_delayed(klass, *args) - search = encode(job_to_hash(klass, args)) - timestamps = redis.smembers("timestamps:#{search}") - - replies = redis.pipelined do - timestamps.each do |key| - redis.lrem(key, 0, search) - redis.srem("timestamps:#{search}", key) - end - end - - return 0 if replies.nil? || replies.empty? - replies.each_slice(2).map(&:first).inject(:+) - end - - # Given an encoded item, enqueue it now - def enqueue_delayed(klass, *args) - hash = job_to_hash(klass, args) - remove_delayed(klass, *args).times do - Resque::Scheduler.enqueue_from_config(hash) - end - end - - # Given a block, remove jobs that return true from a block - # - # This allows for removal of delayed jobs that have arguments matching - # certain criteria - def remove_delayed_selection - fail ArgumentError, 'Please supply a block' unless block_given? - - destroyed = 0 - # There is no way to search Redis list entries for a partial match, so we - # query for all delayed job tasks and do our matching after decoding the - # payload data - jobs = Resque.redis.keys('delayed:*') - jobs.each do |job| - index = Resque.redis.llen(job) - 1 - while index >= 0 - payload = Resque.redis.lindex(job, index) - decoded_payload = decode(payload) - if yield(decoded_payload['args']) - removed = redis.lrem job, 0, payload - destroyed += removed - index -= removed - else - index -= 1 - end - end - end - destroyed - end - - # Given a timestamp and job (klass + args) it removes all instances and - # returns the count of jobs removed. - # - # O(N) where N is the number of jobs scheduled to fire at the given - # timestamp - def remove_delayed_job_from_timestamp(timestamp, klass, *args) - key = "delayed:#{timestamp.to_i}" - encoded_job = encode(job_to_hash(klass, args)) - - redis.srem("timestamps:#{encoded_job}", key) - count = redis.lrem(key, 0, encoded_job) - clean_up_timestamp(key, timestamp) - - count - end - - def count_all_scheduled_jobs - total_jobs = 0 - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp| - total_jobs += redis.llen("delayed:#{timestamp}").to_i - end - total_jobs - end - - # Discover if a job hs been delayed. - # Examples - # Resque.delayed? MyJob - # Resque.delayed? MyJob, id: 1 - # Returns true if the job has been delayed - def delayed?(klass, *args) - !scheduled_at(klass, *args).empty? - end - - # Returns delayed jobs schedule timestamp for +klass+, +args+. - def scheduled_at(klass, *args) - search = encode(job_to_hash(klass, args)) - redis.smembers("timestamps:#{search}").map do |key| - key.tr('delayed:', '').to_i - end - end - - private - - def job_to_hash(klass, args) - { class: klass.to_s, args: args, queue: queue_from_class(klass) } - end - - def job_to_hash_with_queue(queue, klass, args) - { class: klass.to_s, args: args, queue: queue } - end - - def clean_up_timestamp(key, timestamp) - # If the list is empty, remove it. - - # Use a watch here to ensure nobody adds jobs to this delayed - # queue while we're removing it. - redis.watch key - if 0 == redis.llen(key).to_i - redis.multi do - redis.del key - redis.zrem :delayed_queue_schedule, timestamp.to_i - end - else - redis.unwatch - end - end - - def prepare_schedule(schedule_hash) - prepared_hash = {} - schedule_hash.each do |name, job_spec| - job_spec = job_spec.dup - unless job_spec.key?('class') || job_spec.key?(:class) - job_spec['class'] = name - end - prepared_hash[name] = job_spec - end - prepared_hash - end -end - -Resque.extend ResqueScheduler diff --git a/lib/resque_scheduler/cli.rb b/lib/resque_scheduler/cli.rb deleted file mode 100644 index 0bcc43aa..00000000 --- a/lib/resque_scheduler/cli.rb +++ /dev/null @@ -1,160 +0,0 @@ -# vim:fileencoding=utf-8 - -require 'optparse' - -module ResqueScheduler - class Cli - BANNER = <<-EOF.gsub(/ {6}/, '') - Usage: resque-scheduler [options] - - Runs a resque scheduler process directly (rather than via rake). - - EOF - OPTIONS = [ - { - args: ['-n', '--app-name [APP_NAME]', 'Application name for procline'], - callback: ->(options) { ->(n) { options[:app_name] = n } } - }, - { - args: ['-B', '--background', 'Run in the background [BACKGROUND]'], - callback: ->(options) { ->(b) { options[:background] = b } } - }, - { - args: ['-D', '--dynamic-schedule', - 'Enable dynamic scheduling [DYNAMIC_SCHEDULE]'], - callback: ->(options) { ->(d) { options[:dynamic] = d } } - }, - { - args: ['-E', '--environment [RAILS_ENV]', 'Environment name'], - callback: ->(options) { ->(e) { options[:env] = e } } - }, - { - args: ['-I', '--initializer-path [INITIALIZER_PATH]', - 'Path to optional initializer ruby file'], - callback: ->(options) { ->(i) { options[:initializer_path] = i } } - }, - { - args: ['-i', '--interval [RESQUE_SCHEDULER_INTERVAL]', - 'Interval for checking if a scheduled job must run'], - callback: ->(options) { ->(i) { options[:poll_sleep_amount] = i } } - }, - { - args: ['-l', '--logfile [LOGFILE]', 'Log file name'], - callback: ->(options) { ->(l) { options[:logfile] = l } } - }, - { - args: ['-F', '--logformat [LOGFORMAT]', 'Log output format'], - callback: ->(options) { ->(f) { options[:logformat] = f } } - }, - { - args: ['-P', '--pidfile [PIDFILE]', 'PID file name'], - callback: ->(options) { ->(p) { options[:pidfile] = p } } - }, - { - args: ['-q', '--quiet', 'Run with minimal output [QUIET] (or [MUTE])'], - callback: ->(options) { ->(q) { options[:mute] = q } } - }, - { - args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'], - callback: ->(options) { ->(v) { options[:verbose] = v } } - } - ].freeze - - def self.run!(argv = ARGV, env = ENV) - new(argv, env).run! - end - - def initialize(argv = ARGV, env = ENV) - @argv = argv - @env = env - end - - def run! - pre_run - run_forever - end - - def pre_run - parse_options - pre_setup - setup_env - end - - def parse_options - OptionParser.new do |opts| - opts.banner = BANNER - OPTIONS.each do |opt| - opts.on(*opt[:args], &(opt[:callback].call(options))) - end - end.parse!(argv.dup) - end - - def pre_setup - if options[:initializer_path] - load options[:initializer_path].to_s.strip - else - false - end - end - - def setup_env - require 'resque' - require 'resque/scheduler' - - # Need to set this here for conditional Process.daemon redirect of - # stderr/stdout to /dev/null - Resque::Scheduler.mute = !!options[:mute] - - if options[:background] - unless Process.respond_to?('daemon') - abort 'background option is set, which requires ruby >= 1.9' - end - - Process.daemon(true, !Resque::Scheduler.mute) - Resque.redis.client.reconnect - end - - File.open(options[:pidfile], 'w') do |f| - f.puts $PROCESS_ID - end if options[:pidfile] - - Resque::Scheduler.configure do |c| - # These settings are somewhat redundant given the defaults present - # in the attr reader methods. They are left here for clarity and - # to serve as an example of how to use `.configure`. - - c.app_name = options[:app_name] - c.dynamic = !!options[:dynamic] - c.env = options[:env] - c.logfile = options[:logfile] - c.logformat = options[:logformat] - c.poll_sleep_amount = Float(options[:poll_sleep_amount] || '5') - c.verbose = !!options[:verbose] - end - end - - def run_forever - Resque::Scheduler.run - end - - private - - attr_reader :argv, :env - - def options - @options ||= { - app_name: env['APP_NAME'], - background: env['BACKGROUND'], - dynamic: env['DYNAMIC_SCHEDULE'], - env: env['RAILS_ENV'], - initializer_path: env['INITIALIZER_PATH'], - logfile: env['LOGFILE'], - logformat: env['LOGFORMAT'], - mute: env['MUTE'] || env['QUIET'], - pidfile: env['PIDFILE'], - poll_sleep_amount: env['RESQUE_SCHEDULER_INTERVAL'], - verbose: env['VERBOSE'] - } - end - end -end diff --git a/lib/resque_scheduler/logger_builder.rb b/lib/resque_scheduler/logger_builder.rb deleted file mode 100644 index 85e2d41e..00000000 --- a/lib/resque_scheduler/logger_builder.rb +++ /dev/null @@ -1,70 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - # Just builds a logger, with specified verbosity and destination. - # The simplest example: - # - # ResqueScheduler::LoggerBuilder.new.build - class LoggerBuilder - # Initializes new instance of the builder - # - # Pass :opts Hash with - # - :mute if logger needs to be silent for all levels. Default - false - # - :verbose if there is a need in debug messages. Default - false - # - :log_dev to output logs into a desired file. Default - STDOUT - # - :format log format, either 'text' or 'json'. Default - 'text' - # - # Example: - # - # LoggerBuilder.new( - # :mute => false, :verbose => true, :log_dev => 'log/scheduler.log' - # ) - def initialize(opts = {}) - @muted = !!opts[:mute] - @verbose = !!opts[:verbose] - @log_dev = opts[:log_dev] || $stdout - @format = opts[:format] || 'text' - end - - # Returns an instance of Logger - def build - logger = Logger.new(@log_dev) - logger.level = level - logger.formatter = send(:"#{@format}_formatter") - logger - end - - private - - def level - if @verbose && !@muted - Logger::DEBUG - elsif !@muted - Logger::INFO - else - Logger::FATAL - end - end - - def text_formatter - proc do |severity, datetime, progname, msg| - "resque-scheduler: [#{severity}] #{datetime.iso8601}: #{msg}\n" - end - end - - def json_formatter - proc do |severity, datetime, progname, msg| - require 'json' - JSON.dump( - - name: 'resque-scheduler', - progname: progname, - level: severity, - timestamp: datetime.iso8601, - msg: msg - - ) + "\n" - end - end - end -end diff --git a/lib/resque_scheduler/plugin.rb b/lib/resque_scheduler/plugin.rb deleted file mode 100644 index db8f7fc2..00000000 --- a/lib/resque_scheduler/plugin.rb +++ /dev/null @@ -1,29 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - module Plugin - def self.hooks(job, pattern) - job.methods.grep(/^#{pattern}/).sort - end - - def self.run_hooks(job, pattern, *args) - results = hooks(job, pattern).map do |hook| - job.send(hook, *args) - end - - results.all? { |result| result != false } - end - - def self.run_before_delayed_enqueue_hooks(klass, *args) - run_hooks(klass, 'before_delayed_enqueue', *args) - end - - def self.run_before_schedule_hooks(klass, *args) - run_hooks(klass, 'before_schedule', *args) - end - - def self.run_after_schedule_hooks(klass, *args) - run_hooks(klass, 'after_schedule', *args) - end - end -end diff --git a/lib/resque_scheduler/server.rb b/lib/resque_scheduler/server.rb deleted file mode 100644 index 8b97f121..00000000 --- a/lib/resque_scheduler/server.rb +++ /dev/null @@ -1,195 +0,0 @@ -# vim:fileencoding=utf-8 -require 'resque_scheduler' -require 'resque/server' -require 'json' - -# Extend Resque::Server to add tabs -module ResqueScheduler - module Server - def self.included(base) - base.class_eval do - helpers do - def format_time(t) - t.strftime('%Y-%m-%d %H:%M:%S %z') - end - - def queue_from_class_name(class_name) - Resque.queue_from_class( - ResqueScheduler::Util.constantize(class_name) - ) - end - - def find_job(worker) - worker = worker.downcase - results = Array.new - - # Check working jobs - working = [*Resque.working] - work = working.select do |w| - w.job && w.job['payload'] && - w.job['payload']['class'].downcase.include?(worker) - end - work.each do |w| - results += [ - w.job['payload'].merge( - 'queue' => w.job['queue'], 'where_at' => 'working' - ) - ] - end - - # Check delayed Jobs - dels = Array.new - schedule_size = Resque.delayed_queue_schedule_size - Resque.delayed_queue_peek(0, schedule_size).each do |d| - Resque.delayed_timestamp_peek( - d, 0, Resque.delayed_timestamp_size(d)).each do |j| - dels << j.merge!('timestamp' => d) - end - end - results += dels.select do |j| - j['class'].downcase.include?(worker) && - j.merge!('where_at' => 'delayed') - end - - # Check Queues - Resque.queues.each do |queue| - queued = Resque.peek(queue, 0, Resque.size(queue)) - queued = [queued] unless queued.is_a?(Array) - results += queued.select do |j| - j['class'].downcase.include?(worker) && - j.merge!('queue' => queue, 'where_at' => 'queued') - end - end - results - end - - def schedule_interval(config) - if config['every'] - schedule_interval_every(config['every']) - elsif config['cron'] - 'cron: ' + config['cron'].to_s - else - 'Not currently scheduled' - end - end - - def schedule_interval_every(every) - every = [*every] - s = 'every: ' << every.first - - return s unless every.length > 1 - - s << ' (' - meta = every.last.map do |key, value| - "#{key.to_s.gsub(/_/, ' ')} #{value}" - end - s << meta.join(', ') << ')' - end - - def schedule_class(config) - if config['class'].nil? && !config['custom_job_class'].nil? - config['custom_job_class'] - else - config['class'] - end - end - - def scheduler_template(name) - File.read( - File.expand_path("../server/views/#{name}.erb", __FILE__) - ) - end - end - - get '/schedule' do - Resque.reload_schedule! if Resque::Scheduler.dynamic - erb scheduler_template('scheduler') - end - - post '/schedule/requeue' do - @job_name = params['job_name'] || params[:job_name] - config = Resque.schedule[@job_name] - @parameters = config['parameters'] || config[:parameters] - if @parameters - erb scheduler_template('requeue-params') - else - Resque::Scheduler.enqueue_from_config(config) - redirect u('/overview') - end - end - - post '/schedule/requeue_with_params' do - job_name = params['job_name'] || params[:job_name] - config = Resque.schedule[job_name] - # Build args hash from post data (removing the job name) - submitted_args = params.reject do |key, value| - key == 'job_name' || key == :job_name - end - - # Merge constructed args hash with existing args hash for - # the job, if it exists - config_args = config['args'] || config[:args] || {} - config_args = config_args.merge(submitted_args) - - # Insert the args hash into config and queue the resque job - config = config.merge('args' => config_args) - Resque::Scheduler.enqueue_from_config(config) - redirect u('/overview') - end - - get '/delayed' do - erb scheduler_template('delayed') - end - - get '/delayed/jobs/:klass' do - begin - klass = ResqueScheduler::Util.constantize(params[:klass]) - @args = JSON.load(URI.decode(params[:args])) - @timestamps = Resque.scheduled_at(klass, *@args) - rescue - @timestamps = [] - end - - erb scheduler_template('delayed_schedules') - end - - post '/delayed/search' do - @jobs = find_job(params[:search]) - erb scheduler_template('search') - end - - get '/delayed/:timestamp' do - erb scheduler_template('delayed_timestamp') - end - - post '/delayed/queue_now' do - timestamp = params['timestamp'].to_i - if timestamp > 0 - Resque::Scheduler.enqueue_delayed_items_for_timestamp(timestamp) - end - redirect u('/overview') - end - - post '/delayed/cancel_now' do - klass = ResqueScheduler::Util.constantize params['klass'] - timestamp = params['timestamp'] - args = Resque.decode params['args'] - Resque.remove_delayed_job_from_timestamp(timestamp, klass, *args) - redirect u('/delayed') - end - - post '/delayed/clear' do - Resque.reset_delayed_queue - redirect u('delayed') - end - end - end - end -end - -Resque::Server.tabs << 'Schedule' -Resque::Server.tabs << 'Delayed' - -Resque::Server.class_eval do - include ResqueScheduler::Server -end diff --git a/lib/resque_scheduler/util.rb b/lib/resque_scheduler/util.rb deleted file mode 100644 index fe81a177..00000000 --- a/lib/resque_scheduler/util.rb +++ /dev/null @@ -1,36 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - class Util - # In order to upgrade to resque(1.25) which has deprecated following - # methods, we just added these usefull helpers back to use in Resque - # Scheduler. refer to: https://github.com/resque/resque-scheduler/pull/273 - - def self.constantize(camel_cased_word) - camel_cased_word = camel_cased_word.to_s - - if camel_cased_word.include?('-') - camel_cased_word = classify(camel_cased_word) - end - - names = camel_cased_word.split('::') - names.shift if names.empty? || names.first.empty? - - constant = Object - names.each do |name| - args = Module.method(:const_get).arity != 1 ? [false] : [] - - if constant.const_defined?(name, *args) - constant = constant.const_get(name) - else - constant = constant.const_missing(name) - end - end - constant - end - - def self.classify(dashed_word) - dashed_word.split('-').each { |part| part[0] = part[0].chr.upcase }.join - end - end -end diff --git a/lib/resque_scheduler/version.rb b/lib/resque_scheduler/version.rb deleted file mode 100644 index 241c0f71..00000000 --- a/lib/resque_scheduler/version.rb +++ /dev/null @@ -1,5 +0,0 @@ -# vim:fileencoding=utf-8 - -module ResqueScheduler - VERSION = '2.5.4' -end diff --git a/resque-scheduler.gemspec b/resque-scheduler.gemspec index cb610d5a..e160d6e1 100644 --- a/resque-scheduler.gemspec +++ b/resque-scheduler.gemspec @@ -1,11 +1,11 @@ # vim:fileencoding=utf-8 lib = File.expand_path('../lib', __FILE__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) -require 'resque_scheduler/version' +require 'resque/scheduler/version' Gem::Specification.new do |spec| spec.name = 'resque-scheduler' - spec.version = ResqueScheduler::VERSION + spec.version = Resque::Scheduler::VERSION spec.authors = ['Ben VandenBos'] spec.email = ['bvandenbos@gmail.com'] spec.homepage = 'http://github.com/resque/resque-scheduler' @@ -26,8 +26,10 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'pry' spec.add_development_dependency 'rack-test' spec.add_development_dependency 'rake' + spec.add_development_dependency 'redcarpet' spec.add_development_dependency 'rubocop' spec.add_development_dependency 'simplecov' + spec.add_development_dependency 'yard' spec.add_runtime_dependency 'redis', '~> 3.0' spec.add_runtime_dependency 'resque', '~> 1.25' diff --git a/tasks/resque_scheduler.rake b/tasks/resque_scheduler.rake index a6f2b4b8..fb0a9300 100644 --- a/tasks/resque_scheduler.rake +++ b/tasks/resque_scheduler.rake @@ -1,2 +1,2 @@ $LOAD_PATH.unshift File.dirname(__FILE__) + '/../lib' -require 'resque_scheduler/tasks' +require 'resque/scheduler/tasks' diff --git a/test/cli_test.rb b/test/cli_test.rb index 5a0c8e06..d3d0c36e 100644 --- a/test/cli_test.rb +++ b/test/cli_test.rb @@ -3,7 +3,7 @@ context 'Cli' do def new_cli(argv = [], env = {}) - ResqueScheduler::Cli.new(argv, env) + Resque::Scheduler::Cli.new(argv, env) end test 'does not require any positional arguments' do @@ -248,6 +248,6 @@ def new_cli(argv = [], env = {}) test 'runs Resque::Scheduler' do Resque::Scheduler.expects(:run) - ResqueScheduler::Cli.run!([], {}) + Resque::Scheduler::Cli.run!([], {}) end end diff --git a/test/scheduler_args_test.rb b/test/scheduler_args_test.rb index f32702b9..600969ad 100644 --- a/test/scheduler_args_test.rb +++ b/test/scheduler_args_test.rb @@ -5,8 +5,11 @@ context 'scheduling jobs with arguments' do setup do Resque::Scheduler.clear_schedule! - Resque::Scheduler.dynamic = false - Resque::Scheduler.mute = true + Resque::Scheduler.configure do |c| + c.dynamic = false + c.mute = true + c.poll_sleep_amount = nil + end end test 'enqueue_from_config puts stuff in resque without class loaded' do diff --git a/test/scheduler_locking_test.rb b/test/scheduler_locking_test.rb index aefb5ca6..3bc8f7cd 100644 --- a/test/scheduler_locking_test.rb +++ b/test/scheduler_locking_test.rb @@ -9,7 +9,7 @@ def lock_is_not_held(lock) context '#master_lock_key' do setup do - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -25,7 +25,7 @@ def lock_is_not_held(lock) context 'with a prefix set via ENV' do setup do ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'my.prefix' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -43,7 +43,7 @@ def lock_is_not_held(lock) context 'with a namespace set for resque' do setup do Resque.redis.namespace = 'my.namespace' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -61,7 +61,7 @@ def lock_is_not_held(lock) setup do Resque.redis.namespace = 'my.namespace' ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'my.prefix' - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do @@ -79,9 +79,9 @@ def lock_is_not_held(lock) end end -context 'Resque::SchedulerLocking' do +context 'Resque::Scheduler::Locking' do setup do - @subject = Class.new { extend Resque::SchedulerLocking } + @subject = Class.new { extend Resque::Scheduler::Locking } end teardown do diff --git a/test/scheduler_task_test.rb b/test/scheduler_task_test.rb index c6545098..96dfd939 100644 --- a/test/scheduler_task_test.rb +++ b/test/scheduler_task_test.rb @@ -3,7 +3,10 @@ context 'Resque::Scheduler' do setup do - Resque::Scheduler.dynamic = false + Resque::Scheduler.configure do |c| + c.dynamic = false + c.poll_sleep_amount = 0.1 + end Resque.redis.flushall Resque::Scheduler.mute = true Resque::Scheduler.clear_schedule! @@ -21,7 +24,7 @@ test 'sending TERM to scheduler breaks out of poll_sleep' do Resque::Scheduler.expects(:release_master_lock!) fork do - sleep(0.5) + sleep(0.05) system("kill -TERM #{Process.ppid}") exit! end diff --git a/test/scheduler_test.rb b/test/scheduler_test.rb index e465b44b..6a753c2d 100644 --- a/test/scheduler_test.rb +++ b/test/scheduler_test.rb @@ -378,7 +378,7 @@ test 'adheres to lint' do assert_nothing_raised do Resque::Plugin.lint(Resque::Scheduler) - Resque::Plugin.lint(ResqueScheduler) + Resque::Plugin.lint(Resque::Scheduler::Extension) end end diff --git a/test/test_helper.rb b/test/test_helper.rb index aaf10cc0..6fe86d52 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -7,8 +7,8 @@ require 'resque' $LOAD_PATH.unshift File.dirname(File.expand_path(__FILE__)) + '/../lib' -require 'resque_scheduler' -require 'resque_scheduler/server' +require 'resque-scheduler' +require 'resque/scheduler/server' unless ENV['RESQUE_SCHEDULER_DISABLE_TEST_REDIS_SERVER'] # Start our own Redis when the tests start. RedisInstance will take care of diff --git a/test/util_test.rb b/test/util_test.rb index f3782eee..8fdad2c9 100644 --- a/test/util_test.rb +++ b/test/util_test.rb @@ -2,10 +2,10 @@ context 'Util' do def util - ResqueScheduler::Util + Resque::Scheduler::Util end test 'constantizing' do - assert util.constantize('resque-scheduler') == ResqueScheduler + assert util.constantize('Resque::Scheduler') == Resque::Scheduler end end