Skip to content

Commit

Permalink
Merge pull request #19448 from kbrock/cu_queue
Browse files Browse the repository at this point in the history
Cap&U Extract logic to determine queue items
  • Loading branch information
agrare authored Nov 5, 2019
2 parents afc0c44 + 8a6a0ea commit 37f1c09
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 47 deletions.
18 changes: 3 additions & 15 deletions app/models/metric/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ def self.historical_start_time
historical_days.days.ago.utc.beginning_of_day
end

def self.targets_archived_from
archived_for_setting = Settings.performance.targets.archived_for
archived_for_setting.to_i_with_method.seconds.ago.utc
end

def self.concurrent_requests(interval_name)
requests = ::Settings.performance.concurrent_requests[interval_name]
requests = 20 if requests < 20 && interval_name == 'realtime'
Expand Down Expand Up @@ -204,17 +199,10 @@ def self.queue_captures(targets, target_options)

targets.each do |target|
interval_name = perf_target_to_interval_name(target)

options = target_options[target]

begin
target.perf_capture_queue(interval_name, options)
if !target.kind_of?(Storage) && use_historical && target.last_perf_capture_on.nil?
target.perf_capture_queue('historical')
end
rescue => err
_log.warn("Failed to queue perf_capture for target [#{target.class.name}], [#{target.id}], [#{target.name}]: #{err}")
end
target.perf_capture_queue(interval_name, options)
rescue => err
_log.warn("Failed to queue perf_capture for target [#{target.class.name}], [#{target.id}], [#{target.name}]: #{err}")
end
end
private_class_method :queue_captures
Expand Down
66 changes: 35 additions & 31 deletions app/models/metric/ci_mixin/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.d
private :split_capture_intervals

def perf_capture_queue(interval_name, options = {})
start_time = options[:start_time]
end_time = options[:end_time]
start_time = options[:start_time]&.utc
end_time = options[:end_time]&.utc
priority = options[:priority] || Metric::Capture.const_get("#{interval_name.upcase}_PRIORITY")
task_id = options[:task_id]
zone = options[:zone] || my_zone
Expand All @@ -40,33 +40,9 @@ def perf_capture_queue(interval_name, options = {})
raise ArgumentError, "end_time cannot be specified if start_time is nil" if start_time.nil? && !end_time.nil?
raise ArgumentError, "target does not have an ExtManagementSystem" if ems.nil?

start_time = start_time.utc unless start_time.nil?
end_time = end_time.utc unless end_time.nil?

# Determine the items to queue up
# cb is the task used to group cluster realtime metrics
cb = nil
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
items = split_capture_intervals(interval_name, start_time, end_time)
else
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
items =
if last_perf_capture_on.nil?
[[interval_name, realtime_cut_off]]
elsif last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end

cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id
end
cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id && interval_name == 'realtime'
items = queue_items_for_interval(interval_name, start_time, end_time)

# Queue up the actual items
queue_item = {
Expand All @@ -86,7 +62,7 @@ def perf_capture_queue(interval_name, options = {})
next if item_interval != 'realtime' && messages[start_and_end_time].try(:priority) == priority
MiqQueue.put_or_update(queue_item_options) do |msg, qi|
# reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered.
qi[:miq_task_id] = task_id if task_id
qi[:miq_task_id] = task_id if task_id && item_interval == "realtime"
if msg.nil?
qi[:priority] = priority
qi.delete(:state)
Expand All @@ -98,9 +74,9 @@ def perf_capture_queue(interval_name, options = {})
qi[:priority] = priority
# rerun the job (either with new task or higher priority)
qi.delete(:state)
if task_id
if task_id && item_interval == "realtime"
existing_tasks = (((msg.miq_callback || {})[:args] || []).first) || []
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]]) if item_interval == "realtime"
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
end
qi
else
Expand All @@ -113,6 +89,34 @@ def perf_capture_queue(interval_name, options = {})
end
end

def queue_items_for_interval(interval_name, start_time, end_time)
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
split_capture_intervals(interval_name, start_time, end_time)
else
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
if last_perf_capture_on.nil?
# for initial refresh of non-Storage objects, also go back historically
if !kind_of?(Storage) && Metric::Capture.historical_days != 0
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", Metric::Capture.historical_start_time, 1.day.from_now.utc.beginning_of_day)
else
[[interval_name, realtime_cut_off]]
end
elsif last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end
end
end


def perf_capture_realtime(*args)
perf_capture('realtime', *args)
end
Expand Down
7 changes: 6 additions & 1 deletion app/models/metric/targets.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ def self.perf_capture_always=(options)
MiqRegion.my_region.perf_capture_always = options
end

def self.targets_archived_from
archived_for_setting = Settings.performance.targets.archived_for
archived_for_setting.to_i_with_method.seconds.ago.utc
end

def self.capture_ems_targets(ems, options = {})
case ems
when EmsCloud then capture_cloud_targets([ems], options)
Expand Down Expand Up @@ -49,7 +54,7 @@ def self.capture_cloud_targets(emses, options = {})

def self.with_archived(scope)
# We will look also for freshly archived entities, if the entity was short-lived or even sub-hour
archived_from = Metric::Capture.targets_archived_from
archived_from = targets_archived_from
scope.where(:deleted_on => nil).or(scope.where(:deleted_on => (archived_from..Time.now.utc)))
end

Expand Down

0 comments on commit 37f1c09

Please sign in to comment.