diff --git a/data/datadeps_demo/df.graphml b/data/datadeps_demo/df.graphml index 5cdc0af..4a00968 100644 --- a/data/datadeps_demo/df.graphml +++ b/data/datadeps_demo/df.graphml @@ -1,5 +1,6 @@ - + + @@ -96,4 +97,4 @@ - \ No newline at end of file + diff --git a/data/sequencer_demo/another_test_graph.graphml b/data/sequencer_demo/another_test_graph.graphml index ace28b3..da01d58 100644 --- a/data/sequencer_demo/another_test_graph.graphml +++ b/data/sequencer_demo/another_test_graph.graphml @@ -1,88 +1,106 @@ - - - + + + + + - MicroProducer - ProducerA - Algorithm + MicroProducer + ProducerA + Algorithm + 9.3027e-05 - - A - DataObject + + A + DataObject + 8.0 - MicroTransformer - TransformerB - Algorithm + MicroTransformer + TransformerB + Algorithm + 4.2463e-05 - - C - DataObject + + C + DataObject + 8.0 - MicroTransformer - TransformerD - Algorithm + MicroTransformer + TransformerD + Algorithm + 8.241000000000001e-06 - - E - DataObject + + E + DataObject + 8.0 - MicroTransformer - TransformerF - Algorithm + MicroTransformer + TransformerF + Algorithm + 7.264e-06 - - G - DataObject + + G + DataObject + 8.0 - MicroTransformer - TransformerH - Algorithm + MicroTransformer + TransformerH + Algorithm + 6.705e-06 - - I - DataObject + + I + DataObject + 8.0 - MicroTransformer - TransformerJ - Algorithm + MicroTransformer + TransformerJ + Algorithm + 8.59e-06 - - K - DataObject + + K + DataObject + 8.0 - MicroTransformer - TransformerL - Algorithm + MicroTransformer + TransformerL + Algorithm + 8.94e-06 - - M - DataObject + + M + DataObject + 8.0 - MicroTransformer - TransformerN - Algorithm + MicroTransformer + TransformerN + Algorithm + 8.94e-06 - - O - DataObject + + O + DataObject + 8.0 diff --git a/data/sequencer_demo/df_sequencer_demo.graphml b/data/sequencer_demo/df_sequencer_demo.graphml index f0d499f..b304f1f 100644 --- a/data/sequencer_demo/df_sequencer_demo.graphml +++ b/data/sequencer_demo/df_sequencer_demo.graphml @@ -1,5 +1,6 @@ - + + diff --git a/examples/schedule.jl b/examples/schedule.jl index 792d2f0..4534dde 100644 --- a/examples/schedule.jl +++ b/examples/schedule.jl @@ -46,7 +46,8 @@ function execution(graphs_map) results = [] for (g_name, g) in graphs g_map = Dict{Int, Any}() - for vertex_id in Graphs.vertices(g) + data_vertices = MetaGraphs.filter_vertices(g, :type, "DataObject") + for vertex_id in data_vertices future = get_prop(g, vertex_id, :res_data) g_map[vertex_id] = fetch(future) end @@ -64,10 +65,6 @@ end function main(graphs_map) FrameworkDemo.configure_LocalEventLog() - # - # OR - # - # configure_webdash_multievent() @time execution(graphs_map) diff --git a/src/scheduling.jl b/src/scheduling.jl index d006bdd..7f71027 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -3,16 +3,22 @@ using Distributed using MetaGraphs # Algorithms -function mock_Gaudi_algorithm(graph_name, graph_id, vertex_id, data...) - println("Graph: $graph_name, Gaudi algorithm for vertex $vertex_id !") - sleep(1) - return vertex_id +struct MockupAlgorithm + name::String + runtime::Float64 + input_length::UInt + MockupAlgorithm(graph::MetaDiGraph, vertex_id::Int) = begin + runtime = get_prop(graph, vertex_id, :runtime_average_s) + name = get_prop(graph, vertex_id, :node_id) + inputs = length(inneighbors(graph, vertex_id)) + new(name, runtime, inputs) + end end -function dataobject_algorithm(graph_name, graph_id, vertex_id, data...) - println("Graph: $graph_name, Dataobject algorithm for vertex $vertex_id !") - sleep(0.1) - return vertex_id +function (alg::MockupAlgorithm)(args...) + println("Executing $(alg.name)") + + return alg.name end function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...) @@ -28,7 +34,7 @@ function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_ parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot" parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png" G = parse_graphml([graph_path]) - + open(parsed_graph_dot, "w") do f MetaGraphs.savedot(f, G) end @@ -38,92 +44,46 @@ function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_ return graphs end -# Function to get the map of incoming edges to a vertex (i.e. the sources of the incoming edges) -function get_ine_map(G) - incoming_edges_sources_map = Dict{eltype(G), Vector{eltype(G)}}() - - for edge in Graphs.edges(G) - src_vertex = src(edge) - dest_vertex = dst(edge) - - if haskey(incoming_edges_sources_map, dest_vertex) - push!(incoming_edges_sources_map[dest_vertex], src_vertex) - else - incoming_edges_sources_map[dest_vertex] = [src_vertex] - end - end - - return incoming_edges_sources_map -end - -# Function to get the map of outgoing edges from a vertex (i.e. the destinations of the outgoing edges) -function get_oute_map(G) - outgoing_edges_destinations_map = Dict{eltype(G), Vector{eltype(G)}}() - - for edge in Graphs.edges(G) - src_vertex = src(edge) - dest_vertex = dst(edge) - - if haskey(outgoing_edges_destinations_map, src_vertex) - push!(outgoing_edges_destinations_map[src_vertex], dest_vertex) - else - outgoing_edges_destinations_map[src_vertex] = [dest_vertex] - end - end - - return outgoing_edges_destinations_map -end - -function get_vertices_promises(vertices::Vector, G::MetaDiGraph) - promises = [] - for vertex in vertices - push!(promises, get_prop(G, vertex, :res_data)) - end - return promises +function get_promises(graph::MetaDiGraph, vertices::Vector) + return [get_prop(graph, v, :res_data) for v in vertices] end -function get_deps_promises(vertex_id, map, G) - incoming_data = [] - if haskey(map, vertex_id) - for src in map[vertex_id] - push!(incoming_data, get_prop(G, src, :res_data)) - end - end - return incoming_data +function is_terminating_alg(graph::AbstractGraph, vertex_id::Int) + successor_dataobjects = outneighbors(graph, vertex_id) + is_terminating(vertex) = isempty(outneighbors(graph, vertex)) + all(is_terminating, successor_dataobjects) end -function schedule_graph(G::MetaDiGraph) - inc_e_src_map = get_ine_map(G) - - for vertex_id in MetaGraphs.topological_sort(G) - incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G) - set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](name, graph_id, vertex_id, incoming_data...)) - end +function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int) + incoming_data = get_promises(graph, inneighbors(graph, vertex_id)) + algorithm = MockupAlgorithm(graph, vertex_id) + Dagger.@spawn algorithm(incoming_data...) end -function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel, graph_name::String, graph_id::Int) - final_vertices = [] - inc_e_src_map = get_ine_map(G) +function schedule_graph(graph::MetaDiGraph) + alg_vertices = MetaGraphs.filter_vertices(graph, :type, "Algorithm") + sorted_vertices = MetaGraphs.topological_sort(graph) - for vertex_id in MetaGraphs.topological_sort(G) - incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G) - set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](graph_name, graph_id, vertex_id, incoming_data...)) - end + terminating_results = [] - out_e_src_map = get_oute_map(G) - for vertex_id in MetaGraphs.vertices(G) - if !haskey(out_e_src_map, vertex_id) - out_e_src_map[vertex_id] = [] + for vertex_id in intersect(sorted_vertices, alg_vertices) + res = schedule_algorithm(graph, vertex_id) + set_prop!(graph, vertex_id, :res_data, res) + for v in outneighbors(graph, vertex_id) + set_prop!(graph, v, :res_data, res) end - end - for vertex_id in keys(out_e_src_map) - if out_e_src_map[vertex_id] == [] # TODO: a native method to check for emptiness should exist - push!(final_vertices, vertex_id) - end + is_terminating_alg(graph, vertex_id) && push!(terminating_results, res) end - Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, get_vertices_promises(final_vertices, G)...) + return terminating_results end -AVAILABLE_TRANSFORMS = Dict{String, Function}("Algorithm" => mock_Gaudi_algorithm, "DataObject" => dataobject_algorithm) +function schedule_graph_with_notify(graph::MetaDiGraph, + notifications::RemoteChannel, + graph_name::String, + graph_id::Int) + terminating_results = schedule_graph(graph) + + Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, terminating_results...) +end