Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling wrappers #18

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
name = "FrameworkDemo"
uuid = "cfbf7e84-66d2-421e-b147-9edb7a8672d2"
authors = ["SmalRat <[email protected]>",
"Mateusz Jakub Fila <[email protected]>",
"hegner <[email protected]>"]
authors = ["SmalRat <[email protected]>", "Mateusz Jakub Fila <[email protected]>", "hegner <[email protected]>"]
version = "0.1.0"

[deps]
Expand All @@ -21,6 +19,7 @@ LightGraphs = "093fc24a-ae57-5d10-9952-331d41423f4d"
MetaGraphs = "626554b9-1ddb-594c-aa3c-2596fe9399a5"
Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80"
TimespanLogging = "a526e669-04d3-4846-9525-c66122c55f63"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[compat]
Dagger = "0.18.11"
Expand All @@ -29,4 +28,4 @@ Dagger = "0.18.11"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test"]
test = ["Test"]
102 changes: 61 additions & 41 deletions examples/schedule.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
using Distributed
import Distributed

if abspath(PROGRAM_FILE) == @__FILE__
new_procs = addprocs(12) # Set the number of workers
new_procs = Distributed.addprocs(12, lazy=false) # Set the number of workers
end

using Dagger
using Graphs
using MetaGraphs
using FrameworkDemo
using FrameworkDemo.ModGraphVizSimple # This is a workaround to make visualization work until the bugs are fixed in the package.
import Dagger
import Graphs
import MetaGraphs
import FrameworkDemo
import FrameworkDemo.ModGraphVizSimple # This is a workaround to make visualization work until the bugs are fixed in the package.


# Defining constants
Expand All @@ -24,41 +24,61 @@ OUTPUT_GRAPH_IMAGE_PATH = "$output_dir/"

MAX_GRAPHS_RUN = 3

function execution(graphs_map)
graphs_being_run = Set{Int}()
graphs_dict = Dict{Int, String}()
graphs_tasks = Dict{Int,Dagger.DTask}()
graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH)
notifications = RemoteChannel(()->Channel{Int}(32))
# notifications = Channel{Int}(32)
for (i, (g_name, g)) in enumerate(graphs)
graphs_dict[i] = g_name
while !(length(graphs_being_run) < MAX_GRAPHS_RUN)
finished_graph_id = take!(notifications)
delete!(graphs_being_run, finished_graph_id)
delete!(graphs_tasks, i)
println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])")
end
graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, i)
push!(graphs_being_run, i)
println("Dispatcher: scheduled graph $i: $g_name")
end
results = []
for (g_name, g) in graphs
g_map = Dict{Int, Any}()
for vertex_id in Graphs.vertices(g)
future = get_prop(g, vertex_id, :res_data)
g_map[vertex_id] = fetch(future)
end
push!(results, (g_name, g_map))
function execution(graphs_map::Dict{String, String})
notifications = Channel{String}(32)
dags = Dict{String, FrameworkDemo.TrackedTaskDAG}()
running_dags = Set{String}()
completed_dags = Set{String}()

schedule_graphs(notifications, graphs_map, dags, running_dags, completed_dags)
wait_dags_to_finish(notifications, dags, running_dags, completed_dags)
end

function parse_graph(graph_name::String, graph_path::String, output_graph_path::String, output_graph_image_path::String)
parsed_graph_dot = FrameworkDemo.timestamp_string("$output_graph_path$graph_name") * ".dot"
parsed_graph_image = FrameworkDemo.timestamp_string("$output_graph_image_path$graph_name") * ".png"
G = FrameworkDemo.parse_graphml([graph_path])

open(parsed_graph_dot, "w") do f
MetaGraphs.savedot(f, G)
end
for (g_name, res) in results
for (id, value) in res
println("Graph: $g_name, Final result for vertex $id: $value")
FrameworkDemo.dot_to_png(parsed_graph_dot, parsed_graph_image)
return G
end

function schedule_graphs(notifications::Channel{String}, graphs_map::Dict{String, String},
dags::Dict{String, FrameworkDemo.TrackedTaskDAG}, running_dags::Set{String}, completed_dags::Set{String})

for (g_name, g_path) in graphs_map
g = parse_graph(g_name, g_path, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH)
tracked_task_dag = FrameworkDemo.TrackedTaskDAG(g_name, g)
dags[FrameworkDemo.get_uuid(tracked_task_dag)] = tracked_task_dag

while length(running_dags) >= MAX_GRAPHS_RUN
wait_dags_to_finish(notifications, dags, running_dags, completed_dags, length(running_dags) - MAX_GRAPHS_RUN + 1)
end
schedule_DAG(tracked_task_dag, notifications, running_dags)
end
end

function schedule_DAG(tracked_task_dag::FrameworkDemo.TrackedTaskDAG, notifications::Channel{String}, running_dags::Set{String})
uuid = FrameworkDemo.get_uuid(tracked_task_dag)
FrameworkDemo.start_DAG(tracked_task_dag)
push!(running_dags, uuid)
println("Dispatcher: graph scheduled - $uuid: $(FrameworkDemo.get_name(tracked_task_dag))")
Threads.@spawn begin
wait(tracked_task_dag)
put!(notifications, string(FrameworkDemo.get_uuid(tracked_task_dag)))
end
for (_, task) in graphs_tasks
wait(task)
end

function wait_dags_to_finish(notifications::Channel{String}, dags::Dict{String, FrameworkDemo.TrackedTaskDAG}, running_dags::Set{String},
completed_dags::Set{String}, num=length(running_dags))
for i in 1:num
uuid = take!(notifications)
delete!(running_dags, uuid)
push!(completed_dags, uuid)
println("Dispatcher: graph finished - $uuid: $(FrameworkDemo.get_name(dags[uuid]))")
end
end

Expand Down Expand Up @@ -90,6 +110,6 @@ graphs_map = Dict{String, String}(
if abspath(PROGRAM_FILE) == @__FILE__
mkpath(output_dir)
main(graphs_map)
rmprocs!(Dagger.Sch.eager_context(), workers())
rmprocs(workers())
Dagger.rmprocs!(Dagger.Sch.eager_context(), Distributed.workers())
Distributed.rmprocs(Distributed.workers())
end
2 changes: 2 additions & 0 deletions src/FrameworkDemo.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ include("parsing.jl")
include("scheduling.jl")
include("visualization.jl")

include("TrackedTaskDAG.jl")

# to be removed
include("ModGraphVizSimple.jl")

Expand Down
22 changes: 22 additions & 0 deletions src/MetaTask.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Dagger

abstract type AbstractMetaTask
end

struct MetaTask <: AbstractMetaTask
uuid::String
name::String
task::Dagger.DTask
end

function get_uuid(meta_task::AbstractMetaTask)
return meta_task.uuid
end

function get_name(meta_task::AbstractMetaTask)
return "tracked_" * meta_task.name
end

function wait(meta_task::AbstractMetaTask)
wait(meta_task.task)
end
109 changes: 109 additions & 0 deletions src/TaskDAG.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import UUIDs
import MetaGraphs
import Dagger
import Base.repr

include("MetaTask.jl")

@enum TaskDAGState begin
ready
on_schedule
running
completed
failed
end

mutable struct TaskDAG <: AbstractMetaTask
uuid::String
name::String
state::TaskDAGState
dag::MetaGraphs.MetaDiGraph
leafs_promises::Vector{Dagger.DTask}

function TaskDAG(name::String, dag::MetaGraphs.MetaDiGraph)
new(string(UUIDs.uuid4()), name, ready, dag)
end
end

function get_uuid(task_dag::TaskDAG)
return task_dag.uuid
end

function get_name(task_dag::TaskDAG)
return task_dag.name
end

function get_DAG(task_dag::TaskDAG)
return task_dag.dag
end

function Base.repr(task_dag::TaskDAG)
return get_name(task_dag) * ": TaskDAG"
end

function is_ready(task_dag::TaskDAG)
return task_dag.state == ready
end

function is_running(task_dag::TaskDAG)
return task_dag.state == running
end

function is_completed(task_dag::TaskDAG)
return task_dag.state == completed
end

function start_DAG(task_dag::TaskDAG)
task_dag.state = on_schedule

inc_e_src_map = get_ine_map(task_dag.dag)

for vertex_id in MetaGraphs.topological_sort(task_dag.dag)
incoming_data = get_deps_promises(vertex_id, inc_e_src_map, task_dag.dag)
MetaGraphs.set_prop!(task_dag.dag, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[MetaGraphs.get_prop(task_dag.dag, vertex_id, :type)](task_dag.name, task_dag.uuid, vertex_id, incoming_data...))
end

task_dag.leafs_promises = _get_leafs_promises(task_dag) # assume the graph will not be modified
task_dag.state = running
end

function _get_leafs_promises(task_dag::TaskDAG)
final_vertices = []
out_e_src_map = get_oute_map(task_dag.dag)

for vertex_id in MetaGraphs.vertices(task_dag.dag)
if !haskey(out_e_src_map, vertex_id)
out_e_src_map[vertex_id] = []
end
end

for vertex_id in keys(out_e_src_map)
if isempty(out_e_src_map[vertex_id])
push!(final_vertices, vertex_id)
end
end

return get_vertices_promises(final_vertices, task_dag.dag)
end

function get_leafs_promises(task_dag::TaskDAG)
if !(task_dag.state in (running, completed))
throw(error("Task DAG was not started yet"))
end
return task_dag.leafs_promises
end

function wait(task_dag::TaskDAG)
if (task_dag.state !== ready)
for promise in task_dag.leafs_promises
wait(promise)
end
task_dag.state = completed
else
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `TaskDAG`"))
end
end

function fetch(task_dag::TaskDAG, vertex_id::Int)
fetch(MetaGraphs.get_prop(task_dag.dag, vertex_id, :res_data))
end
66 changes: 66 additions & 0 deletions src/TrackedTaskDAG.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import UUIDs
import MetaGraphs
import Dagger
import Base.repr

include("TaskDAG.jl")

mutable struct TrackedTaskDAG <: AbstractMetaTask
uuid::String
state::TaskDAGState
task_dag::TaskDAG
notify_task::Union{Dagger.DTask, Nothing}

function TrackedTaskDAG(name::String, dag::MetaGraphs.MetaDiGraph)
task_dag = TaskDAG(name, dag)
new(string(UUIDs.uuid4()), ready, task_dag, nothing)
end
end

function get_uuid(tracked_task_dag::TrackedTaskDAG)
return tracked_task_dag.uuid
end

function get_name(tracked_task_dag::TrackedTaskDAG)
return "tracked_" * tracked_task_dag.task_dag.name
end

function get_task_DAG(tracked_task_dag::TrackedTaskDAG)
return tracked_task_dag.task_dag
end

function Base.repr(task_dag::TrackedTaskDAG)
return get_name(task_dag) * ": TrackedTaskDAG"
end

# Default notify function
function task_DAG_finalization(tracked_task_dag::TrackedTaskDAG, promises...)
println("Graph: $(get_name(tracked_task_dag)), entered notify, graph_id: $(get_uuid(tracked_task_dag)) !")
end

function start_DAG(tracked_task_dag::TrackedTaskDAG)
tracked_task_dag.state = on_schedule
start_DAG(tracked_task_dag.task_dag)
tracked_task_dag.notify_task = Dagger.@spawn task_DAG_finalization(tracked_task_dag, get_leafs_promises(tracked_task_dag.task_dag)...)
tracked_task_dag.state = running
end

function Base.wait(t::TrackedTaskDAG)
if t.state in (on_schedule, running, completed)
Dagger.wait(t.notify_task)
t.state = completed
t.task_dag.state = completed
else
throw(ConcurrencyViolationError("Cannot `wait` on an unlaunched `TrackedTaskDAG`"))
end

end

# TODO: return something more meaningful (for instance, dictionary of results for the specified vertices)
function Base.fetch(t::TrackedTaskDAG)
res = fetch(t.notify_task)
if (t.state !== completed)
t.state = completed
end
return res
end