From f892862eec1bc10a4bf24002c44c7785c1f4f0b2 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Fri, 17 May 2024 13:36:27 +0200 Subject: [PATCH 1/2] Add execution plan chaining This commit enables execution plans to be chained. Assuming there is an execution plan EP1, another execution plan EP2 can be chained onto EP1. When chained, EP2 will stay in scheduled state until EP1 goes to stopped state. An execution plan can be chained onto multiple prerequisite execution plans, in which case it will be run once all the prerequisite execution plans are stopped. It builds on mechanisms which were already present. When an execution plan is chained, it behaves in the same way as if it was scheduled for future execution. A record is created in dynflow_delayed_table and once the conditions for it to execute are right, it is dispatched by the delayed executor. Because of this, there might be small delay between when the prerequisites finishs and the chained plan is started. --- examples/execution_plan_chaining.rb | 42 +++++++++++++ lib/dynflow/debug/telemetry/persistence.rb | 2 +- .../delayed_executors/abstract_core.rb | 2 +- lib/dynflow/persistence.rb | 8 ++- lib/dynflow/persistence_adapters/abstract.rb | 2 +- lib/dynflow/persistence_adapters/sequel.rb | 18 ++++-- .../025_create_execution_plan_dependencies.rb | 21 +++++++ lib/dynflow/world.rb | 10 +++ test/future_execution_test.rb | 6 +- test/persistence_test.rb | 63 ++++++++++++++++++- test/world_test.rb | 20 ++++++ 11 files changed, 179 insertions(+), 15 deletions(-) create mode 100755 examples/execution_plan_chaining.rb create mode 100644 lib/dynflow/persistence_adapters/sequel_migrations/025_create_execution_plan_dependencies.rb diff --git a/examples/execution_plan_chaining.rb b/examples/execution_plan_chaining.rb new file mode 100755 index 00000000..99cabd10 --- /dev/null +++ b/examples/execution_plan_chaining.rb @@ -0,0 +1,42 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require_relative 'example_helper' + +class DelayedAction < Dynflow::Action + def plan + plan_self + end + + def run + sleep 5 + end +end + +if $0 == __FILE__ + ExampleHelper.world.action_logger.level = 1 + ExampleHelper.world.logger.level = 0 + + plan1 = ExampleHelper.world.trigger(DelayedAction) + plan2 = ExampleHelper.world.chain(plan1.execution_plan_id, DelayedAction) + plan3 = ExampleHelper.world.chain(plan2.execution_plan_id, DelayedAction) + plan4 = ExampleHelper.world.chain(plan2.execution_plan_id, DelayedAction) + + puts <<-MSG.gsub(/^.*\|/, '') + | + | Execution Plan Chaining example + | ======================== + | + | This example shows the execution plan chaining functionality of Dynflow, which allows execution plans to wait until another execution plan finishes. + | + | Execution plans: + | #{plan1.id} runs immediately and should run successfully. + | #{plan2.id} is delayed and should run once #{plan1.id} finishes. + | #{plan3.id} and #{plan4.id} are delayed and should run once #{plan2.id} finishes. + | + | Visit #{ExampleHelper::DYNFLOW_URL} to see their status. + | + MSG + + ExampleHelper.run_web_console +end diff --git a/lib/dynflow/debug/telemetry/persistence.rb b/lib/dynflow/debug/telemetry/persistence.rb index ad5cb38c..4d692611 100644 --- a/lib/dynflow/debug/telemetry/persistence.rb +++ b/lib/dynflow/debug/telemetry/persistence.rb @@ -19,7 +19,7 @@ module Persistence :load_execution_plan, :save_execution_plan, :find_old_execution_plans, - :find_past_delayed_plans, + :find_ready_delayed_plans, :delete_delayed_plans, :save_delayed_plan, :set_delayed_plan_frozen, diff --git a/lib/dynflow/delayed_executors/abstract_core.rb b/lib/dynflow/delayed_executors/abstract_core.rb index ec1d08e6..f9ec1833 100644 --- a/lib/dynflow/delayed_executors/abstract_core.rb +++ b/lib/dynflow/delayed_executors/abstract_core.rb @@ -32,7 +32,7 @@ def time def delayed_execution_plans(time) with_error_handling([]) do - world.persistence.find_past_delayed_plans(time) + world.persistence.find_ready_delayed_plans(time) end end diff --git a/lib/dynflow/persistence.rb b/lib/dynflow/persistence.rb index 7facce49..809d9a27 100644 --- a/lib/dynflow/persistence.rb +++ b/lib/dynflow/persistence.rb @@ -97,8 +97,8 @@ def find_old_execution_plans(age) end end - def find_past_delayed_plans(time) - adapter.find_past_delayed_plans(time).map do |plan| + def find_ready_delayed_plans(time) + adapter.find_ready_delayed_plans(time).map do |plan| DelayedPlan.new_from_hash(@world, plan) end end @@ -159,5 +159,9 @@ def prune_envelopes(receiver_ids) def prune_undeliverable_envelopes adapter.prune_undeliverable_envelopes end + + def chain_execution_plan(first, second) + adapter.chain_execution_plan(first, second) + end end end diff --git a/lib/dynflow/persistence_adapters/abstract.rb b/lib/dynflow/persistence_adapters/abstract.rb index 94509b06..177c1d54 100644 --- a/lib/dynflow/persistence_adapters/abstract.rb +++ b/lib/dynflow/persistence_adapters/abstract.rb @@ -68,7 +68,7 @@ def save_execution_plan(execution_plan_id, value) raise NotImplementedError end - def find_past_delayed_plans(options = {}) + def find_ready_delayed_plans(options = {}) raise NotImplementedError end diff --git a/lib/dynflow/persistence_adapters/sequel.rb b/lib/dynflow/persistence_adapters/sequel.rb index db91b294..57b56a0a 100644 --- a/lib/dynflow/persistence_adapters/sequel.rb +++ b/lib/dynflow/persistence_adapters/sequel.rb @@ -39,7 +39,8 @@ class action_class execution_plan_uuid queue), envelope: %w(receiver_id), coordinator_record: %w(id owner_id class), delayed: %w(execution_plan_uuid start_at start_before args_serializer frozen), - output_chunk: %w(execution_plan_uuid action_id kind timestamp) } + output_chunk: %w(execution_plan_uuid action_id kind timestamp), + execution_plan_dependency: %w(execution_plan_uuid blocked_by_uuid) } SERIALIZABLE_COLUMNS = { action: %w(input output), delayed: %w(serialized_args), @@ -139,12 +140,16 @@ def find_old_execution_plans(age) .all.map { |plan| execution_plan_column_map(load_data plan, table_name) } end - def find_past_delayed_plans(time) + def find_ready_delayed_plans(time) table_name = :delayed table(table_name) - .where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time)) + .left_join(TABLES[:execution_plan_dependency], execution_plan_uuid: :execution_plan_uuid) + .left_join(TABLES[:execution_plan], uuid: :blocked_by_uuid) + .where(::Sequel.lit('start_at IS NULL OR (start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?))', time, time)) + .where(::Sequel[{ state: nil }] | ::Sequel[{ state: 'stopped' }]) .where(:frozen => false) .order_by(:start_at) + .select_all(TABLES[table_name]) .all .map { |plan| load_data(plan, table_name) } end @@ -159,6 +164,10 @@ def save_delayed_plan(execution_plan_id, value) save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false end + def chain_execution_plan(first, second) + save :execution_plan_dependency, { execution_plan_uuid: second }, { execution_plan_uuid: second, blocked_by_uuid: first }, with_data: false + end + def load_step(execution_plan_id, step_id) load :step, execution_plan_uuid: execution_plan_id, id: step_id end @@ -297,7 +306,8 @@ def abort_if_pending_migrations! envelope: :dynflow_envelopes, coordinator_record: :dynflow_coordinator_records, delayed: :dynflow_delayed_plans, - output_chunk: :dynflow_output_chunks } + output_chunk: :dynflow_output_chunks, + execution_plan_dependency: :dynflow_execution_plan_dependencies } def table(which) db[TABLES.fetch(which)] diff --git a/lib/dynflow/persistence_adapters/sequel_migrations/025_create_execution_plan_dependencies.rb b/lib/dynflow/persistence_adapters/sequel_migrations/025_create_execution_plan_dependencies.rb new file mode 100644 index 00000000..47dcad56 --- /dev/null +++ b/lib/dynflow/persistence_adapters/sequel_migrations/025_create_execution_plan_dependencies.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +Sequel.migration do + up do + type = database_type + create_table(:dynflow_execution_plan_dependencies) do + column_properties = if type.to_s.include?('postgres') + { type: :uuid } + else + { type: String, size: 36, fixed: true, null: false } + end + foreign_key :execution_plan_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties + foreign_key :blocked_by_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties + index :blocked_by_uuid + end + end + + down do + drop_table(:dynflow_execution_plan_dependencies) + end +end diff --git a/lib/dynflow/world.rb b/lib/dynflow/world.rb index a99670c7..87e1cbd4 100644 --- a/lib/dynflow/world.rb +++ b/lib/dynflow/world.rb @@ -201,6 +201,16 @@ def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_act Scheduled[execution_plan.id] end + def chain(plan_uuids, action_class, *args) + plan_uuids = [plan_uuids] unless plan_uuids.is_a? Array + result = delay_with_options(action_class: action_class, args: args, delay_options: { frozen: true }) + plan_uuids.each do |plan_uuid| + persistence.chain_execution_plan(plan_uuid, result.execution_plan_id) + end + persistence.set_delayed_plan_frozen(result.execution_plan_id, false) + result + end + def plan_elsewhere(action_class, *args) execution_plan = ExecutionPlan.new(self, nil) execution_plan.delay(nil, action_class, {}, *args) diff --git a/test/future_execution_test.rb b/test/future_execution_test.rb index b548f937..dc1e499b 100644 --- a/test/future_execution_test.rb +++ b/test/future_execution_test.rb @@ -75,7 +75,7 @@ module FutureExecutionTest it 'finds delayed plans' do @start_at = Time.now.utc - 100 delayed_plan - past_delayed_plans = world.persistence.find_past_delayed_plans(@start_at + 10) + past_delayed_plans = world.persistence.find_ready_delayed_plans(@start_at + 10) _(past_delayed_plans.length).must_equal 1 _(past_delayed_plans.first.execution_plan_uuid).must_equal execution_plan.id end @@ -112,8 +112,8 @@ module FutureExecutionTest it 'checks for delayed plans in regular intervals' do start_time = klok.current_time - persistence.expect(:find_past_delayed_plans, [], [start_time]) - persistence.expect(:find_past_delayed_plans, [], [start_time + options[:poll_interval]]) + persistence.expect(:find_ready_delayed_plans, [], [start_time]) + persistence.expect(:find_ready_delayed_plans, [], [start_time + options[:poll_interval]]) dummy_world.stub :persistence, persistence do _(klok.pending_pings.length).must_equal 0 delayed_executor.start.wait diff --git a/test/persistence_test.rb b/test/persistence_test.rb index aa0e87ec..0d6c1e7e 100644 --- a/test/persistence_test.rb +++ b/test/persistence_test.rb @@ -342,7 +342,7 @@ def self.it_acts_as_persistence_adapter end end - describe '#find_past_delayed_plans' do + describe '#find_ready_delayed_plans' do it 'finds plans with start_before in past' do start_time = Time.now.utc prepare_and_save_plans @@ -352,7 +352,7 @@ def self.it_acts_as_persistence_adapter adapter.save_delayed_plan('plan3', :execution_plan_uuid => 'plan3', :frozen => false, :start_at => format_time(start_time + 60)) adapter.save_delayed_plan('plan4', :execution_plan_uuid => 'plan4', :frozen => false, :start_at => format_time(start_time - 60), :start_before => format_time(start_time - 60)) - plans = adapter.find_past_delayed_plans(start_time) + plans = adapter.find_ready_delayed_plans(start_time) _(plans.length).must_equal 3 _(plans.map { |plan| plan[:execution_plan_uuid] }).must_equal %w(plan2 plan4 plan1) end @@ -366,10 +366,67 @@ def self.it_acts_as_persistence_adapter adapter.save_delayed_plan('plan2', :execution_plan_uuid => 'plan2', :frozen => true, :start_at => format_time(start_time + 60), :start_before => format_time(start_time - 60)) - plans = adapter.find_past_delayed_plans(start_time) + plans = adapter.find_ready_delayed_plans(start_time) _(plans.length).must_equal 1 _(plans.first[:execution_plan_uuid]).must_equal 'plan1' end + + it 'finds plans with null start_at' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false) + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 1 + _(plans.first[:execution_plan_uuid]).must_equal 'plan1' + end + + it 'does not find blocked plans' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false) + adapter.chain_execution_plan('plan2', 'plan1') + adapter.chain_execution_plan('plan3', 'plan1') + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 0 + end + + it 'finds plans which are no longer blocked' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false) + adapter.chain_execution_plan('plan2', 'plan1') + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 1 + _(plans.first[:execution_plan_uuid]).must_equal 'plan1' + end + + it 'does not find plans which are no longer blocked but are frozen' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => true) + adapter.chain_execution_plan('plan2', 'plan1') + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 0 + end + + it 'does not find plans which are no longer blocked but their start_at is in the future' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false, :start_at => start_time + 60) + adapter.chain_execution_plan('plan2', 'plan1') # plan2 is already stopped + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 0 + end end describe '#delete_output_chunks' do diff --git a/test/world_test.rb b/test/world_test.rb index e87135ad..e8cc0f4f 100644 --- a/test/world_test.rb +++ b/test/world_test.rb @@ -51,6 +51,26 @@ module WorldTest _(terminated_event.resolved?).must_equal true end end + + describe '#chain' do + it 'chains two execution plans' do + plan1 = world.plan(Support::DummyExample::Dummy) + plan2 = world.chain(plan1.id, Support::DummyExample::Dummy) + + ready = world.persistence.find_ready_delayed_plans(Time.now) + _(ready.count).must_equal 0 + + done = Concurrent::Promises.resolvable_future + world.execute(plan1.id, done) + done.wait + + plan1 = world.persistence.load_execution_plan(plan1.id) + _(plan1.state).must_equal :stopped + ready = world.persistence.find_ready_delayed_plans(Time.now) + _(ready.count).must_equal 1 + _(ready.first.execution_plan_uuid).must_equal plan2.execution_plan_id + end + end end end end From bc0494ffd62fbbc083476624c9e2178e633c7be8 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Fri, 17 May 2024 13:52:26 +0200 Subject: [PATCH 2/2] Make rubocop happy --- examples/execution_plan_chaining.rb | 2 +- lib/dynflow/world.rb | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/execution_plan_chaining.rb b/examples/execution_plan_chaining.rb index 99cabd10..c02cd3b1 100755 --- a/examples/execution_plan_chaining.rb +++ b/examples/execution_plan_chaining.rb @@ -13,7 +13,7 @@ def run end end -if $0 == __FILE__ +if $PROGRAM_NAME == __FILE__ ExampleHelper.world.action_logger.level = 1 ExampleHelper.world.logger.level = 0 diff --git a/lib/dynflow/world.rb b/lib/dynflow/world.rb index 87e1cbd4..8b059e63 100644 --- a/lib/dynflow/world.rb +++ b/lib/dynflow/world.rb @@ -4,6 +4,7 @@ require 'dynflow/world/invalidation' module Dynflow + # rubocop:disable Metrics/ClassLength class World include Algebrick::TypeCheck include Algebrick::Matching @@ -400,4 +401,5 @@ def spawn_and_wait(klass, name, *args) return actor end end + # rubocop:enable Metrics/ClassLength end