Skip to content

Commit

Permalink
Breaking up the #find_job helper method a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
meatballhat committed Feb 18, 2014
1 parent 05ac361 commit 5285799
Showing 1 changed file with 34 additions and 25 deletions.
59 changes: 34 additions & 25 deletions lib/resque/scheduler/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,37 +124,14 @@ def queue_from_class_name(class_name)

def find_job(worker)
worker = worker.downcase
results = []
results = working_jobs_for_worker(worker)

# Check working jobs
working = [*Resque.working]
work = working.select do |w|
w.job && w.job['payload'] &&
w.job['payload']['class'].downcase.include?(worker)
end
work.each do |w|
results += [
w.job['payload'].merge(
'queue' => w.job['queue'], 'where_at' => 'working'
)
]
end

# Check delayed Jobs
dels = []
schedule_size = Resque.delayed_queue_schedule_size
Resque.delayed_queue_peek(0, schedule_size).each do |d|
Resque.delayed_timestamp_peek(
d, 0, Resque.delayed_timestamp_size(d)).each do |j|
dels << j.merge!('timestamp' => d)
end
end
dels = delayed_jobs_for_worker(worker)
results += dels.select do |j|
j['class'].downcase.include?(worker) &&
j.merge!('where_at' => 'delayed')
end

# Check Queues
Resque.queues.each do |queue|
queued = Resque.peek(queue, 0, Resque.size(queue))
queued = [queued] unless queued.is_a?(Array)
Expand All @@ -163,6 +140,7 @@ def find_job(worker)
j.merge!('queue' => queue, 'where_at' => 'queued')
end
end

results
end

Expand Down Expand Up @@ -202,6 +180,37 @@ def scheduler_template(name)
File.expand_path("../server/views/#{name}.erb", __FILE__)
)
end

private

def working_jobs_for_worker(worker)
[].tap do |results|
working = [*Resque.working]
work = working.select do |w|
w.job && w.job['payload'] &&
w.job['payload']['class'].downcase.include?(worker)
end
work.each do |w|
results += [
w.job['payload'].merge(
'queue' => w.job['queue'], 'where_at' => 'working'
)
]
end
end
end

def delayed_jobs_for_worker(worker)
[].tap do |dels|
schedule_size = Resque.delayed_queue_schedule_size
Resque.delayed_queue_peek(0, schedule_size).each do |d|
Resque.delayed_timestamp_peek(
d, 0, Resque.delayed_timestamp_size(d)).each do |j|
dels << j.merge!('timestamp' => d)
end
end
end
end
end
end
end
Expand Down

0 comments on commit 5285799

Please sign in to comment.