From 0aba925ae94340896f157362998a22fbb28d656b Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Tue, 18 Feb 2014 10:15:52 -0500 Subject: [PATCH] Splitting the resque extension's methods up a bit --- lib/resque/scheduler.rb | 1 + lib/resque/scheduler/delaying_extensions.rb | 270 ++++++++++++ lib/resque/scheduler/extension.rb | 403 +----------------- lib/resque/scheduler/scheduling_extensions.rb | 150 +++++++ 4 files changed, 426 insertions(+), 398 deletions(-) create mode 100644 lib/resque/scheduler/delaying_extensions.rb create mode 100644 lib/resque/scheduler/scheduling_extensions.rb diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index d05b8dc3..6cdb7b9e 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -10,6 +10,7 @@ module Resque module Scheduler autoload :Cli, 'resque/scheduler/cli' autoload :Extension, 'resque/scheduler/extension' + autoload :Util, 'resque/scheduler/util' private diff --git a/lib/resque/scheduler/delaying_extensions.rb b/lib/resque/scheduler/delaying_extensions.rb new file mode 100644 index 00000000..b7e588dd --- /dev/null +++ b/lib/resque/scheduler/delaying_extensions.rb @@ -0,0 +1,270 @@ +# vim:fileencoding=utf-8 +require 'resque' +require_relative 'plugin' +require_relative '../scheduler' + +module Resque + module Scheduler + module DelayingExtensions + # This method is nearly identical to +enqueue+ only it also + # takes a timestamp which will be used to schedule the job + # for queueing. Until timestamp is in the past, the job will + # sit in the schedule list. + def enqueue_at(timestamp, klass, *args) + validate(klass) + enqueue_at_with_queue( + queue_from_class(klass), timestamp, klass, *args + ) + end + + # Identical to +enqueue_at+, except you can also specify + # a queue in which the job will be placed after the + # timestamp has passed. It respects Resque.inline option, by + # creating the job right away instead of adding to the queue. + def enqueue_at_with_queue(queue, timestamp, klass, *args) + return false unless plugin.run_before_schedule_hooks(klass, *args) + + if Resque.inline? || timestamp.to_i < Time.now.to_i + # Just create the job and let resque perform it right away with + # inline. If the class is a custom job class, call self#scheduled + # on it. This allows you to do things like + # Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1). + # Otherwise, pass off to Resque. + if klass.respond_to?(:scheduled) + klass.scheduled(queue, klass.to_s, *args) + else + Resque::Job.create(queue, klass, *args) + end + else + delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) + end + + plugin.run_after_schedule_hooks(klass, *args) + end + + # Identical to enqueue_at but takes number_of_seconds_from_now + # instead of a timestamp. + def enqueue_in(number_of_seconds_from_now, klass, *args) + enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) + end + + # Identical to +enqueue_in+, except you can also specify + # a queue in which the job will be placed after the + # number of seconds has passed. + def enqueue_in_with_queue(queue, number_of_seconds_from_now, + klass, *args) + enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, + klass, *args) + end + + # Used internally to stuff the item into the schedule sorted list. + # +timestamp+ can be either in seconds or a datetime object Insertion + # if O(log(n)). Returns true if it's the first job to be scheduled at + # that time, else false + def delayed_push(timestamp, item) + # First add this item to the list for this timestamp + redis.rpush("delayed:#{timestamp.to_i}", encode(item)) + + # Store the timestamps at with this item occurs + redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") + + # Now, add this timestamp to the zsets. The score and the value are + # the same since we'll be querying by timestamp, and we don't have + # anything else to store. + redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i + end + + # Returns an array of timestamps based on start and count + def delayed_queue_peek(start, count) + result = redis.zrange(:delayed_queue_schedule, start, + start + count - 1) + Array(result).map(&:to_i) + end + + # Returns the size of the delayed queue schedule + def delayed_queue_schedule_size + redis.zcard :delayed_queue_schedule + end + + # Returns the number of jobs for a given timestamp in the delayed queue + # schedule + def delayed_timestamp_size(timestamp) + redis.llen("delayed:#{timestamp.to_i}").to_i + end + + # Returns an array of delayed items for the given timestamp + def delayed_timestamp_peek(timestamp, start, count) + if 1 == count + r = list_range "delayed:#{timestamp.to_i}", start, count + r.nil? ? [] : [r] + else + list_range "delayed:#{timestamp.to_i}", start, count + end + end + + # Returns the next delayed queue timestamp + # (don't call directly) + def next_delayed_timestamp(at_time = nil) + items = redis.zrangebyscore( + :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, + limit: [0, 1] + ) + timestamp = items.nil? ? nil : Array(items).first + timestamp.to_i unless timestamp.nil? + end + + # Returns the next item to be processed for a given timestamp, nil if + # done. (don't call directly) + # +timestamp+ can either be in seconds or a datetime + def next_item_for_timestamp(timestamp) + key = "delayed:#{timestamp.to_i}" + + encoded_item = redis.lpop(key) + redis.srem("timestamps:#{encoded_item}", key) + item = decode(encoded_item) + + # If the list is empty, remove it. + clean_up_timestamp(key, timestamp) + item + end + + # Clears all jobs created with enqueue_at or enqueue_in + def reset_delayed_queue + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| + key = "delayed:#{item}" + items = redis.lrange(key, 0, -1) + redis.pipelined do + items.each { |ts_item| redis.del("timestamps:#{ts_item}") } + end + redis.del key + end + + redis.del :delayed_queue_schedule + end + + # Given an encoded item, remove it from the delayed_queue + def remove_delayed(klass, *args) + search = encode(job_to_hash(klass, args)) + timestamps = redis.smembers("timestamps:#{search}") + + replies = redis.pipelined do + timestamps.each do |key| + redis.lrem(key, 0, search) + redis.srem("timestamps:#{search}", key) + end + end + + return 0 if replies.nil? || replies.empty? + replies.each_slice(2).map(&:first).inject(:+) + end + + # Given an encoded item, enqueue it now + def enqueue_delayed(klass, *args) + hash = job_to_hash(klass, args) + remove_delayed(klass, *args).times do + Resque::Scheduler.enqueue_from_config(hash) + end + end + + # Given a block, remove jobs that return true from a block + # + # This allows for removal of delayed jobs that have arguments matching + # certain criteria + def remove_delayed_selection + fail ArgumentError, 'Please supply a block' unless block_given? + + destroyed = 0 + # There is no way to search Redis list entries for a partial match, + # so we query for all delayed job tasks and do our matching after + # decoding the payload data + jobs = Resque.redis.keys('delayed:*') + jobs.each do |job| + index = Resque.redis.llen(job) - 1 + while index >= 0 + payload = Resque.redis.lindex(job, index) + decoded_payload = decode(payload) + if yield(decoded_payload['args']) + removed = redis.lrem job, 0, payload + destroyed += removed + index -= removed + else + index -= 1 + end + end + end + destroyed + end + + # Given a timestamp and job (klass + args) it removes all instances and + # returns the count of jobs removed. + # + # O(N) where N is the number of jobs scheduled to fire at the given + # timestamp + def remove_delayed_job_from_timestamp(timestamp, klass, *args) + key = "delayed:#{timestamp.to_i}" + encoded_job = encode(job_to_hash(klass, args)) + + redis.srem("timestamps:#{encoded_job}", key) + count = redis.lrem(key, 0, encoded_job) + clean_up_timestamp(key, timestamp) + + count + end + + def count_all_scheduled_jobs + total_jobs = 0 + Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |ts| + total_jobs += redis.llen("delayed:#{ts}").to_i + end + total_jobs + end + + # Discover if a job has been delayed. + # Examples + # Resque.delayed?(MyJob) + # Resque.delayed?(MyJob, id: 1) + # Returns true if the job has been delayed + def delayed?(klass, *args) + !scheduled_at(klass, *args).empty? + end + + # Returns delayed jobs schedule timestamp for +klass+, +args+. + def scheduled_at(klass, *args) + search = encode(job_to_hash(klass, args)) + redis.smembers("timestamps:#{search}").map do |key| + key.tr('delayed:', '').to_i + end + end + + private + + def job_to_hash(klass, args) + { class: klass.to_s, args: args, queue: queue_from_class(klass) } + end + + def job_to_hash_with_queue(queue, klass, args) + { class: klass.to_s, args: args, queue: queue } + end + + def clean_up_timestamp(key, timestamp) + # If the list is empty, remove it. + + # Use a watch here to ensure nobody adds jobs to this delayed + # queue while we're removing it. + redis.watch key + if 0 == redis.llen(key).to_i + redis.multi do + redis.del key + redis.zrem :delayed_queue_schedule, timestamp.to_i + end + else + redis.unwatch + end + end + + def plugin + Resque::Scheduler::Plugin + end + end + end +end diff --git a/lib/resque/scheduler/extension.rb b/lib/resque/scheduler/extension.rb index 7f922b04..f2728fa9 100644 --- a/lib/resque/scheduler/extension.rb +++ b/lib/resque/scheduler/extension.rb @@ -1,406 +1,13 @@ # vim:fileencoding=utf-8 -require 'rubygems' -require 'resque' -require_relative 'version' -require_relative 'util' -require_relative '../scheduler' -require_relative 'plugin' + +require_relative 'scheduling_extensions' +require_relative 'delaying_extensions' module Resque module Scheduler module Extension - # Accepts a new schedule configuration of the form: - # - # { - # "MakeTea" => { - # "every" => "1m" }, - # "some_name" => { - # "cron" => "5/* * * *", - # "class" => "DoSomeWork", - # "args" => "work on this string", - # "description" => "this thing works it"s butter off" }, - # ... - # } - # - # Hash keys can be anything and are used to describe and reference - # the scheduled job. If the "class" argument is missing, the key - # is used implicitly as "class" argument - in the "MakeTea" example, - # "MakeTea" is used both as job name and resque worker class. - # - # Any jobs that were in the old schedule, but are not - # present in the new schedule, will be removed. - # - # :cron can be any cron scheduling string - # - # :every can be used in lieu of :cron. see rufus-scheduler's 'every' - # usage for valid syntax. If :cron is present it will take precedence - # over :every. - # - # :class must be a resque worker class. If it is missing, the job name - # (hash key) will be used as :class. - # - # :args can be any yaml which will be converted to a ruby literal and - # passed in a params. (optional) - # - # :rails_envs is the list of envs where the job gets loaded. Envs are - # comma separated (optional) - # - # :description is just that, a description of the job (optional). If - # params is an array, each element in the array is passed as a separate - # param, otherwise params is passed in as the only parameter to perform. - def schedule=(schedule_hash) - # clean the schedules as it exists in redis - clean_schedules - - schedule_hash = prepare_schedule(schedule_hash) - - # store all schedules in redis, so we can retrieve them back - # everywhere. - schedule_hash.each do |name, job_spec| - set_schedule(name, job_spec) - end - - # ensure only return the successfully saved data! - reload_schedule! - end - - # Returns the schedule hash - def schedule - @schedule ||= all_schedules - @schedule || {} - end - - # reloads the schedule from redis - def reload_schedule! - @schedule = all_schedules - end - - # gets the schedules as it exists in redis - def all_schedules - return nil unless redis.exists(:schedules) - - redis.hgetall(:schedules).tap do |h| - h.each do |name, config| - h[name] = decode(config) - end - end - end - - # clean the schedules as it exists in redis, useful for first setup? - def clean_schedules - if redis.exists(:schedules) - redis.hkeys(:schedules).each do |key| - remove_schedule(key) unless schedule_persisted?(key) - end - end - @schedule = nil - true - end - - # Create or update a schedule with the provided name and configuration. - # - # Note: values for class and custom_job_class need to be strings, - # not constants. - # - # Resque.set_schedule('some_job', {:class => 'SomeJob', - # :every => '15mins', - # :queue => 'high', - # :args => '/tmp/poop'}) - def set_schedule(name, config) - existing_config = fetch_schedule(name) - persist = config.delete(:persist) || config.delete('persist') - unless existing_config && existing_config == config - redis.pipelined do - redis.hset(:schedules, name, encode(config)) - redis.sadd(:schedules_changed, name) - redis.sadd(:persisted_schedules, name) if persist - end - end - config - end - - # retrive the schedule configuration for the given name - def fetch_schedule(name) - decode(redis.hget(:schedules, name)) - end - - def schedule_persisted?(name) - redis.sismember(:persisted_schedules, name) - end - - # remove a given schedule by name - def remove_schedule(name) - redis.pipelined do - redis.hdel(:schedules, name) - redis.srem(:persisted_schedules, name) - redis.sadd(:schedules_changed, name) - end - end - - # This method is nearly identical to +enqueue+ only it also - # takes a timestamp which will be used to schedule the job - # for queueing. Until timestamp is in the past, the job will - # sit in the schedule list. - def enqueue_at(timestamp, klass, *args) - validate(klass) - enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args) - end - - # Identical to +enqueue_at+, except you can also specify - # a queue in which the job will be placed after the - # timestamp has passed. It respects Resque.inline option, by - # creating the job right away instead of adding to the queue. - def enqueue_at_with_queue(queue, timestamp, klass, *args) - return false unless Plugin.run_before_schedule_hooks(klass, *args) - - if Resque.inline? || timestamp.to_i < Time.now.to_i - # Just create the job and let resque perform it right away with - # inline. If the class is a custom job class, call self#scheduled on - # it. This allows you to do things like Resque.enqueue_at(timestamp, - # CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque. - if klass.respond_to?(:scheduled) - klass.scheduled(queue, klass.to_s, *args) - else - Resque::Job.create(queue, klass, *args) - end - else - delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args)) - end - - Plugin.run_after_schedule_hooks(klass, *args) - end - - # Identical to enqueue_at but takes number_of_seconds_from_now - # instead of a timestamp. - def enqueue_in(number_of_seconds_from_now, klass, *args) - enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) - end - - # Identical to +enqueue_in+, except you can also specify - # a queue in which the job will be placed after the - # number of seconds has passed. - def enqueue_in_with_queue(queue, number_of_seconds_from_now, - klass, *args) - enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, - klass, *args) - end - - # Used internally to stuff the item into the schedule sorted list. - # +timestamp+ can be either in seconds or a datetime object Insertion if - # O(log(n)). Returns true if it's the first job to be scheduled at that - # time, else false - def delayed_push(timestamp, item) - # First add this item to the list for this timestamp - redis.rpush("delayed:#{timestamp.to_i}", encode(item)) - - # Store the timestamps at with this item occurs - redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}") - - # Now, add this timestamp to the zsets. The score and the value are - # the same since we'll be querying by timestamp, and we don't have - # anything else to store. - redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i - end - - # Returns an array of timestamps based on start and count - def delayed_queue_peek(start, count) - result = redis.zrange(:delayed_queue_schedule, start, - start + count - 1) - Array(result).map(&:to_i) - end - - # Returns the size of the delayed queue schedule - def delayed_queue_schedule_size - redis.zcard :delayed_queue_schedule - end - - # Returns the number of jobs for a given timestamp in the delayed queue - # schedule - def delayed_timestamp_size(timestamp) - redis.llen("delayed:#{timestamp.to_i}").to_i - end - - # Returns an array of delayed items for the given timestamp - def delayed_timestamp_peek(timestamp, start, count) - if 1 == count - r = list_range "delayed:#{timestamp.to_i}", start, count - r.nil? ? [] : [r] - else - list_range "delayed:#{timestamp.to_i}", start, count - end - end - - # Returns the next delayed queue timestamp - # (don't call directly) - def next_delayed_timestamp(at_time = nil) - items = redis.zrangebyscore( - :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, - limit: [0, 1] - ) - timestamp = items.nil? ? nil : Array(items).first - timestamp.to_i unless timestamp.nil? - end - - # Returns the next item to be processed for a given timestamp, nil if - # done. (don't call directly) - # +timestamp+ can either be in seconds or a datetime - def next_item_for_timestamp(timestamp) - key = "delayed:#{timestamp.to_i}" - - encoded_item = redis.lpop(key) - redis.srem("timestamps:#{encoded_item}", key) - item = decode(encoded_item) - - # If the list is empty, remove it. - clean_up_timestamp(key, timestamp) - item - end - - # Clears all jobs created with enqueue_at or enqueue_in - def reset_delayed_queue - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| - key = "delayed:#{item}" - items = redis.lrange(key, 0, -1) - redis.pipelined do - items.each { |ts_item| redis.del("timestamps:#{ts_item}") } - end - redis.del key - end - - redis.del :delayed_queue_schedule - end - - # Given an encoded item, remove it from the delayed_queue - def remove_delayed(klass, *args) - search = encode(job_to_hash(klass, args)) - timestamps = redis.smembers("timestamps:#{search}") - - replies = redis.pipelined do - timestamps.each do |key| - redis.lrem(key, 0, search) - redis.srem("timestamps:#{search}", key) - end - end - - return 0 if replies.nil? || replies.empty? - replies.each_slice(2).map(&:first).inject(:+) - end - - # Given an encoded item, enqueue it now - def enqueue_delayed(klass, *args) - hash = job_to_hash(klass, args) - remove_delayed(klass, *args).times do - Resque::Scheduler.enqueue_from_config(hash) - end - end - - # Given a block, remove jobs that return true from a block - # - # This allows for removal of delayed jobs that have arguments matching - # certain criteria - def remove_delayed_selection - fail ArgumentError, 'Please supply a block' unless block_given? - - destroyed = 0 - # There is no way to search Redis list entries for a partial match, so - # we query for all delayed job tasks and do our matching after decoding - # the payload data - jobs = Resque.redis.keys('delayed:*') - jobs.each do |job| - index = Resque.redis.llen(job) - 1 - while index >= 0 - payload = Resque.redis.lindex(job, index) - decoded_payload = decode(payload) - if yield(decoded_payload['args']) - removed = redis.lrem job, 0, payload - destroyed += removed - index -= removed - else - index -= 1 - end - end - end - destroyed - end - - # Given a timestamp and job (klass + args) it removes all instances and - # returns the count of jobs removed. - # - # O(N) where N is the number of jobs scheduled to fire at the given - # timestamp - def remove_delayed_job_from_timestamp(timestamp, klass, *args) - key = "delayed:#{timestamp.to_i}" - encoded_job = encode(job_to_hash(klass, args)) - - redis.srem("timestamps:#{encoded_job}", key) - count = redis.lrem(key, 0, encoded_job) - clean_up_timestamp(key, timestamp) - - count - end - - def count_all_scheduled_jobs - total_jobs = 0 - Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp| - total_jobs += redis.llen("delayed:#{timestamp}").to_i - end - total_jobs - end - - # Discover if a job has been delayed. - # Examples - # Resque.delayed?(MyJob) - # Resque.delayed?(MyJob, id: 1) - # Returns true if the job has been delayed - def delayed?(klass, *args) - !scheduled_at(klass, *args).empty? - end - - # Returns delayed jobs schedule timestamp for +klass+, +args+. - def scheduled_at(klass, *args) - search = encode(job_to_hash(klass, args)) - redis.smembers("timestamps:#{search}").map do |key| - key.tr('delayed:', '').to_i - end - end - - private - - def job_to_hash(klass, args) - { class: klass.to_s, args: args, queue: queue_from_class(klass) } - end - - def job_to_hash_with_queue(queue, klass, args) - { class: klass.to_s, args: args, queue: queue } - end - - def clean_up_timestamp(key, timestamp) - # If the list is empty, remove it. - - # Use a watch here to ensure nobody adds jobs to this delayed - # queue while we're removing it. - redis.watch key - if 0 == redis.llen(key).to_i - redis.multi do - redis.del key - redis.zrem :delayed_queue_schedule, timestamp.to_i - end - else - redis.unwatch - end - end - - def prepare_schedule(schedule_hash) - prepared_hash = {} - schedule_hash.each do |name, job_spec| - job_spec = job_spec.dup - unless job_spec.key?('class') || job_spec.key?(:class) - job_spec['class'] = name - end - prepared_hash[name] = job_spec - end - prepared_hash - end + include SchedulingExtensions + include DelayingExtensions end end end diff --git a/lib/resque/scheduler/scheduling_extensions.rb b/lib/resque/scheduler/scheduling_extensions.rb new file mode 100644 index 00000000..8be42c4a --- /dev/null +++ b/lib/resque/scheduler/scheduling_extensions.rb @@ -0,0 +1,150 @@ +# vim:fileencoding=utf-8 + +module Resque + module Scheduler + module SchedulingExtensions + # Accepts a new schedule configuration of the form: + # + # { + # "MakeTea" => { + # "every" => "1m" }, + # "some_name" => { + # "cron" => "5/* * * *", + # "class" => "DoSomeWork", + # "args" => "work on this string", + # "description" => "this thing works it"s butter off" }, + # ... + # } + # + # Hash keys can be anything and are used to describe and reference + # the scheduled job. If the "class" argument is missing, the key + # is used implicitly as "class" argument - in the "MakeTea" example, + # "MakeTea" is used both as job name and resque worker class. + # + # Any jobs that were in the old schedule, but are not + # present in the new schedule, will be removed. + # + # :cron can be any cron scheduling string + # + # :every can be used in lieu of :cron. see rufus-scheduler's 'every' + # usage for valid syntax. If :cron is present it will take precedence + # over :every. + # + # :class must be a resque worker class. If it is missing, the job name + # (hash key) will be used as :class. + # + # :args can be any yaml which will be converted to a ruby literal and + # passed in a params. (optional) + # + # :rails_envs is the list of envs where the job gets loaded. Envs are + # comma separated (optional) + # + # :description is just that, a description of the job (optional). If + # params is an array, each element in the array is passed as a separate + # param, otherwise params is passed in as the only parameter to + # perform. + def schedule=(schedule_hash) + # clean the schedules as it exists in redis + clean_schedules + + schedule_hash = prepare_schedule(schedule_hash) + + # store all schedules in redis, so we can retrieve them back + # everywhere. + schedule_hash.each do |name, job_spec| + set_schedule(name, job_spec) + end + + # ensure only return the successfully saved data! + reload_schedule! + end + + # Returns the schedule hash + def schedule + @schedule ||= all_schedules + @schedule || {} + end + + # reloads the schedule from redis + def reload_schedule! + @schedule = all_schedules + end + + # gets the schedules as it exists in redis + def all_schedules + return nil unless redis.exists(:schedules) + + redis.hgetall(:schedules).tap do |h| + h.each do |name, config| + h[name] = decode(config) + end + end + end + + # clean the schedules as it exists in redis, useful for first setup? + def clean_schedules + if redis.exists(:schedules) + redis.hkeys(:schedules).each do |key| + remove_schedule(key) unless schedule_persisted?(key) + end + end + @schedule = nil + true + end + + # Create or update a schedule with the provided name and configuration. + # + # Note: values for class and custom_job_class need to be strings, + # not constants. + # + # Resque.set_schedule('some_job', {:class => 'SomeJob', + # :every => '15mins', + # :queue => 'high', + # :args => '/tmp/poop'}) + def set_schedule(name, config) + existing_config = fetch_schedule(name) + persist = config.delete(:persist) || config.delete('persist') + unless existing_config && existing_config == config + redis.pipelined do + redis.hset(:schedules, name, encode(config)) + redis.sadd(:schedules_changed, name) + redis.sadd(:persisted_schedules, name) if persist + end + end + config + end + + # retrive the schedule configuration for the given name + def fetch_schedule(name) + decode(redis.hget(:schedules, name)) + end + + def schedule_persisted?(name) + redis.sismember(:persisted_schedules, name) + end + + # remove a given schedule by name + def remove_schedule(name) + redis.pipelined do + redis.hdel(:schedules, name) + redis.srem(:persisted_schedules, name) + redis.sadd(:schedules_changed, name) + end + end + + private + + def prepare_schedule(schedule_hash) + prepared_hash = {} + schedule_hash.each do |name, job_spec| + job_spec = job_spec.dup + unless job_spec.key?('class') || job_spec.key?(:class) + job_spec['class'] = name + end + prepared_hash[name] = job_spec + end + prepared_hash + end + end + end +end