From 52c03c78ee5d95740d5a847589909f28e5c85e23 Mon Sep 17 00:00:00 2001 From: SmalRat <30555080+SmalRat@users.noreply.github.com> Date: Tue, 2 Jul 2024 11:28:11 +0300 Subject: [PATCH 1/2] Added MetaTask, TaskDAG and TrackedTaskDAG wrappers for tasks --- Project.toml | 7 ++- src/MetaTask.jl | 22 +++++++++ src/TaskDAG.jl | 109 ++++++++++++++++++++++++++++++++++++++++++ src/TrackedTaskDAG.jl | 66 +++++++++++++++++++++++++ 4 files changed, 200 insertions(+), 4 deletions(-) create mode 100644 src/MetaTask.jl create mode 100644 src/TaskDAG.jl create mode 100644 src/TrackedTaskDAG.jl diff --git a/Project.toml b/Project.toml index c09bb45..1b6ec3f 100644 --- a/Project.toml +++ b/Project.toml @@ -1,8 +1,6 @@ name = "FrameworkDemo" uuid = "cfbf7e84-66d2-421e-b147-9edb7a8672d2" -authors = ["SmalRat <30555080+SmalRat@users.noreply.github.com>", - "Mateusz Jakub Fila ", - "hegner "] +authors = ["SmalRat <30555080+SmalRat@users.noreply.github.com>", "Mateusz Jakub Fila ", "hegner "] version = "0.1.0" [deps] @@ -21,6 +19,7 @@ LightGraphs = "093fc24a-ae57-5d10-9952-331d41423f4d" MetaGraphs = "626554b9-1ddb-594c-aa3c-2596fe9399a5" Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80" TimespanLogging = "a526e669-04d3-4846-9525-c66122c55f63" +UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [compat] Dagger = "0.18.11" @@ -29,4 +28,4 @@ Dagger = "0.18.11" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test"] \ No newline at end of file +test = ["Test"] diff --git a/src/MetaTask.jl b/src/MetaTask.jl new file mode 100644 index 0000000..437d226 --- /dev/null +++ b/src/MetaTask.jl @@ -0,0 +1,22 @@ +import Dagger + +abstract type AbstractMetaTask +end + +struct MetaTask <: AbstractMetaTask + uuid::String + name::String + task::Dagger.DTask +end + +function get_uuid(meta_task::AbstractMetaTask) + return meta_task.uuid +end + +function get_name(meta_task::AbstractMetaTask) + return "tracked_" * meta_task.name +end + +function wait(meta_task::AbstractMetaTask) + wait(meta_task.task) +end \ No newline at end of file diff --git a/src/TaskDAG.jl b/src/TaskDAG.jl new file mode 100644 index 0000000..7575cbb --- /dev/null +++ b/src/TaskDAG.jl @@ -0,0 +1,109 @@ +import UUIDs +import MetaGraphs +import Dagger +import Base.repr + +include("MetaTask.jl") + +@enum TaskDAGState begin + ready + on_schedule + running + completed + failed +end + +mutable struct TaskDAG <: AbstractMetaTask + uuid::String + name::String + state::TaskDAGState + dag::MetaGraphs.MetaDiGraph + leafs_promises::Vector{Dagger.DTask} + + function TaskDAG(name::String, dag::MetaGraphs.MetaDiGraph) + new(string(UUIDs.uuid4()), name, ready, dag) + end +end + +function get_uuid(task_dag::TaskDAG) + return task_dag.uuid +end + +function get_name(task_dag::TaskDAG) + return task_dag.name +end + +function get_DAG(task_dag::TaskDAG) + return task_dag.dag +end + +function Base.repr(task_dag::TaskDAG) + return get_name(task_dag) * ": TaskDAG" +end + +function is_ready(task_dag::TaskDAG) + return task_dag.state == ready +end + +function is_running(task_dag::TaskDAG) + return task_dag.state == running +end + +function is_completed(task_dag::TaskDAG) + return task_dag.state == completed +end + +function start_DAG(task_dag::TaskDAG) + task_dag.state = on_schedule + + inc_e_src_map = get_ine_map(task_dag.dag) + + for vertex_id in MetaGraphs.topological_sort(task_dag.dag) + incoming_data = get_deps_promises(vertex_id, inc_e_src_map, task_dag.dag) + MetaGraphs.set_prop!(task_dag.dag, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[MetaGraphs.get_prop(task_dag.dag, vertex_id, :type)](task_dag.name, task_dag.uuid, vertex_id, incoming_data...)) + end + + task_dag.leafs_promises = _get_leafs_promises(task_dag) # assume the graph will not be modified + task_dag.state = running +end + +function _get_leafs_promises(task_dag::TaskDAG) + final_vertices = [] + out_e_src_map = get_oute_map(task_dag.dag) + + for vertex_id in MetaGraphs.vertices(task_dag.dag) + if !haskey(out_e_src_map, vertex_id) + out_e_src_map[vertex_id] = [] + end + end + + for vertex_id in keys(out_e_src_map) + if isempty(out_e_src_map[vertex_id]) + push!(final_vertices, vertex_id) + end + end + + return get_vertices_promises(final_vertices, task_dag.dag) +end + +function get_leafs_promises(task_dag::TaskDAG) + if !(task_dag.state in (running, completed)) + throw(error("Task DAG was not started yet")) + end + return task_dag.leafs_promises +end + +function wait(task_dag::TaskDAG) + if (task_dag.state !== ready) + for promise in task_dag.leafs_promises + wait(promise) + end + task_dag.state = completed + else + throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `TaskDAG`")) + end +end + +function fetch(task_dag::TaskDAG, vertex_id::Int) + fetch(MetaGraphs.get_prop(task_dag.dag, vertex_id, :res_data)) +end \ No newline at end of file diff --git a/src/TrackedTaskDAG.jl b/src/TrackedTaskDAG.jl new file mode 100644 index 0000000..3d72856 --- /dev/null +++ b/src/TrackedTaskDAG.jl @@ -0,0 +1,66 @@ +import UUIDs +import MetaGraphs +import Dagger +import Base.repr + +include("TaskDAG.jl") + +mutable struct TrackedTaskDAG <: AbstractMetaTask + uuid::String + state::TaskDAGState + task_dag::TaskDAG + notify_task::Union{Dagger.DTask, Nothing} + + function TrackedTaskDAG(name::String, dag::MetaGraphs.MetaDiGraph) + task_dag = TaskDAG(name, dag) + new(string(UUIDs.uuid4()), ready, task_dag, nothing) + end +end + +function get_uuid(tracked_task_dag::TrackedTaskDAG) + return tracked_task_dag.uuid +end + +function get_name(tracked_task_dag::TrackedTaskDAG) + return "tracked_" * tracked_task_dag.task_dag.name +end + +function get_task_DAG(tracked_task_dag::TrackedTaskDAG) + return tracked_task_dag.task_dag +end + +function Base.repr(task_dag::TrackedTaskDAG) + return get_name(task_dag) * ": TrackedTaskDAG" +end + +# Default notify function +function task_DAG_finalization(tracked_task_dag::TrackedTaskDAG, promises...) + println("Graph: $(get_name(tracked_task_dag)), entered notify, graph_id: $(get_uuid(tracked_task_dag)) !") +end + +function start_DAG(tracked_task_dag::TrackedTaskDAG) + tracked_task_dag.state = on_schedule + start_DAG(tracked_task_dag.task_dag) + tracked_task_dag.notify_task = Dagger.@spawn task_DAG_finalization(tracked_task_dag, get_leafs_promises(tracked_task_dag.task_dag)...) + tracked_task_dag.state = running +end + +function Base.wait(t::TrackedTaskDAG) + if t.state in (on_schedule, running, completed) + Dagger.wait(t.notify_task) + t.state = completed + t.task_dag.state = completed + else + throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `TrackedTaskDAG`")) + end + +end + +# TODO: return something more meaningful (for instance, dictionary of results for the specified vertices) +function Base.fetch(t::TrackedTaskDAG) + res = fetch(t.notify_task) + if (t.state !== completed) + t.state = completed + end + return res +end From 690ffed5033bc956b685caf8f0a2dea67e470c01 Mon Sep 17 00:00:00 2001 From: SmalRat <30555080+SmalRat@users.noreply.github.com> Date: Tue, 2 Jul 2024 11:52:29 +0300 Subject: [PATCH 2/2] Switched to the usage of the new scheduling wrappers --- examples/schedule.jl | 102 ++++++++++++++++++++++++++----------------- src/FrameworkDemo.jl | 2 + 2 files changed, 63 insertions(+), 41 deletions(-) diff --git a/examples/schedule.jl b/examples/schedule.jl index 792d2f0..4ccc4f0 100644 --- a/examples/schedule.jl +++ b/examples/schedule.jl @@ -1,14 +1,14 @@ -using Distributed +import Distributed if abspath(PROGRAM_FILE) == @__FILE__ - new_procs = addprocs(12) # Set the number of workers + new_procs = Distributed.addprocs(12, lazy=false) # Set the number of workers end -using Dagger -using Graphs -using MetaGraphs -using FrameworkDemo -using FrameworkDemo.ModGraphVizSimple # This is a workaround to make visualization work until the bugs are fixed in the package. +import Dagger +import Graphs +import MetaGraphs +import FrameworkDemo +import FrameworkDemo.ModGraphVizSimple # This is a workaround to make visualization work until the bugs are fixed in the package. # Defining constants @@ -24,41 +24,61 @@ OUTPUT_GRAPH_IMAGE_PATH = "$output_dir/" MAX_GRAPHS_RUN = 3 -function execution(graphs_map) - graphs_being_run = Set{Int}() - graphs_dict = Dict{Int, String}() - graphs_tasks = Dict{Int,Dagger.DTask}() - graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) - notifications = RemoteChannel(()->Channel{Int}(32)) - # notifications = Channel{Int}(32) - for (i, (g_name, g)) in enumerate(graphs) - graphs_dict[i] = g_name - while !(length(graphs_being_run) < MAX_GRAPHS_RUN) - finished_graph_id = take!(notifications) - delete!(graphs_being_run, finished_graph_id) - delete!(graphs_tasks, i) - println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])") - end - graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, i) - push!(graphs_being_run, i) - println("Dispatcher: scheduled graph $i: $g_name") - end - results = [] - for (g_name, g) in graphs - g_map = Dict{Int, Any}() - for vertex_id in Graphs.vertices(g) - future = get_prop(g, vertex_id, :res_data) - g_map[vertex_id] = fetch(future) - end - push!(results, (g_name, g_map)) +function execution(graphs_map::Dict{String, String}) + notifications = Channel{String}(32) + dags = Dict{String, FrameworkDemo.TrackedTaskDAG}() + running_dags = Set{String}() + completed_dags = Set{String}() + + schedule_graphs(notifications, graphs_map, dags, running_dags, completed_dags) + wait_dags_to_finish(notifications, dags, running_dags, completed_dags) +end + +function parse_graph(graph_name::String, graph_path::String, output_graph_path::String, output_graph_image_path::String) + parsed_graph_dot = FrameworkDemo.timestamp_string("$output_graph_path$graph_name") * ".dot" + parsed_graph_image = FrameworkDemo.timestamp_string("$output_graph_image_path$graph_name") * ".png" + G = FrameworkDemo.parse_graphml([graph_path]) + + open(parsed_graph_dot, "w") do f + MetaGraphs.savedot(f, G) end - for (g_name, res) in results - for (id, value) in res - println("Graph: $g_name, Final result for vertex $id: $value") + FrameworkDemo.dot_to_png(parsed_graph_dot, parsed_graph_image) + return G +end + +function schedule_graphs(notifications::Channel{String}, graphs_map::Dict{String, String}, + dags::Dict{String, FrameworkDemo.TrackedTaskDAG}, running_dags::Set{String}, completed_dags::Set{String}) + + for (g_name, g_path) in graphs_map + g = parse_graph(g_name, g_path, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) + tracked_task_dag = FrameworkDemo.TrackedTaskDAG(g_name, g) + dags[FrameworkDemo.get_uuid(tracked_task_dag)] = tracked_task_dag + + while length(running_dags) >= MAX_GRAPHS_RUN + wait_dags_to_finish(notifications, dags, running_dags, completed_dags, length(running_dags) - MAX_GRAPHS_RUN + 1) end + schedule_DAG(tracked_task_dag, notifications, running_dags) + end +end + +function schedule_DAG(tracked_task_dag::FrameworkDemo.TrackedTaskDAG, notifications::Channel{String}, running_dags::Set{String}) + uuid = FrameworkDemo.get_uuid(tracked_task_dag) + FrameworkDemo.start_DAG(tracked_task_dag) + push!(running_dags, uuid) + println("Dispatcher: graph scheduled - $uuid: $(FrameworkDemo.get_name(tracked_task_dag))") + Threads.@spawn begin + wait(tracked_task_dag) + put!(notifications, string(FrameworkDemo.get_uuid(tracked_task_dag))) end - for (_, task) in graphs_tasks - wait(task) +end + +function wait_dags_to_finish(notifications::Channel{String}, dags::Dict{String, FrameworkDemo.TrackedTaskDAG}, running_dags::Set{String}, + completed_dags::Set{String}, num=length(running_dags)) + for i in 1:num + uuid = take!(notifications) + delete!(running_dags, uuid) + push!(completed_dags, uuid) + println("Dispatcher: graph finished - $uuid: $(FrameworkDemo.get_name(dags[uuid]))") end end @@ -90,6 +110,6 @@ graphs_map = Dict{String, String}( if abspath(PROGRAM_FILE) == @__FILE__ mkpath(output_dir) main(graphs_map) - rmprocs!(Dagger.Sch.eager_context(), workers()) - rmprocs(workers()) + Dagger.rmprocs!(Dagger.Sch.eager_context(), Distributed.workers()) + Distributed.rmprocs(Distributed.workers()) end diff --git a/src/FrameworkDemo.jl b/src/FrameworkDemo.jl index ee3e414..7d4c21e 100644 --- a/src/FrameworkDemo.jl +++ b/src/FrameworkDemo.jl @@ -5,6 +5,8 @@ include("parsing.jl") include("scheduling.jl") include("visualization.jl") +include("TrackedTaskDAG.jl") + # to be removed include("ModGraphVizSimple.jl")