-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Datadeps handling for algorithm tasks #19
base: main
Are you sure you want to change the base?
Changes from all commits
71afdaa
eb8a936
dc65be4
234134e
f888b36
e98763a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,52 @@ | ||
import Dagger | ||
using Distributed | ||
using Graphs | ||
using MetaGraphs | ||
|
||
mutable struct DataObject | ||
data | ||
size::Float64 | ||
end | ||
|
||
function populate_data_object!(object::DataObject, data) | ||
proc = Dagger.thunk_processor() | ||
scope = Dagger.scope(worker=myid()) | ||
|
||
chunk = Dagger.tochunk(data, proc, scope) | ||
|
||
object.data = chunk | ||
end | ||
|
||
# Algorithms | ||
function mock_Gaudi_algorithm(graph_name, graph_id, vertex_id, data...) | ||
println("Graph: $graph_name, Gaudi algorithm for vertex $vertex_id !") | ||
sleep(1) | ||
return vertex_id | ||
function make_algorithm(graph::MetaDiGraph, vertex_id::Int) | ||
runtime = get_prop(graph, vertex_id, :runtime_average_s) | ||
|
||
function algorithm(inputs, outputs) | ||
println("Gaudi algorithm for vertex $vertex_id !") | ||
|
||
for output in outputs | ||
bytes = round(Int, output.size * 1e3) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the |
||
populate_data_object!(output, ' '^bytes) | ||
end | ||
|
||
sleep(runtime) | ||
end | ||
|
||
return algorithm | ||
end | ||
|
||
function dataobject_algorithm(graph_name, graph_id, vertex_id, data...) | ||
println("Graph: $graph_name, Dataobject algorithm for vertex $vertex_id !") | ||
sleep(0.1) | ||
return vertex_id | ||
|
||
function get_transform(graph::MetaDiGraph, vertex_id::Int) | ||
type = get_prop(graph, vertex_id, :type) | ||
|
||
function f(data...; N_inputs) | ||
inputs = data[1:N_inputs] | ||
outputs = data[N_inputs+1:end] | ||
transform = AVAILABLE_TRANSFORMS[type](graph, vertex_id) | ||
return transform(inputs, outputs) | ||
end | ||
|
||
return f | ||
end | ||
|
||
function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...) | ||
|
@@ -28,7 +62,7 @@ function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_ | |
parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot" | ||
parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png" | ||
G = parse_graphml([graph_path]) | ||
|
||
open(parsed_graph_dot, "w") do f | ||
MetaGraphs.savedot(f, G) | ||
end | ||
|
@@ -45,7 +79,7 @@ function get_ine_map(G) | |
for edge in Graphs.edges(G) | ||
src_vertex = src(edge) | ||
dest_vertex = dst(edge) | ||
|
||
if haskey(incoming_edges_sources_map, dest_vertex) | ||
push!(incoming_edges_sources_map[dest_vertex], src_vertex) | ||
else | ||
|
@@ -82,33 +116,39 @@ function get_vertices_promises(vertices::Vector, G::MetaDiGraph) | |
return promises | ||
end | ||
|
||
function get_deps_promises(vertex_id, map, G) | ||
incoming_data = [] | ||
if haskey(map, vertex_id) | ||
for src in map[vertex_id] | ||
push!(incoming_data, get_prop(G, src, :res_data)) | ||
end | ||
end | ||
return incoming_data | ||
function get_in_promises(G, vertex_id) | ||
return [get_prop(G, src, :res_data) for src in inneighbors(G, vertex_id)] | ||
end | ||
|
||
function get_out_promises(G, vertex_id) | ||
return [get_prop(G, src, :res_data) for src in outneighbors(G, vertex_id)] | ||
end | ||
|
||
function schedule_graph(G::MetaDiGraph) | ||
inc_e_src_map = get_ine_map(G) | ||
data_vertices = MetaGraphs.filter_vertices(G, :type, "DataObject") | ||
sorted_vertices = MetaGraphs.topological_sort(G) | ||
|
||
for vertex_id in MetaGraphs.topological_sort(G) | ||
incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G) | ||
set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](name, graph_id, vertex_id, incoming_data...)) | ||
for data_id in data_vertices | ||
size = get_prop(G, data_id, :size_average_B) | ||
set_prop!(G, data_id, :res_data, DataObject(nothing, size)) | ||
end | ||
|
||
Dagger.spawn_datadeps() do | ||
for vertex_id in setdiff(sorted_vertices, data_vertices) | ||
incoming_data = get_in_promises(G, vertex_id) | ||
outgoing_data = get_out_promises(G, vertex_id) | ||
transform = get_transform(G, vertex_id) | ||
N_inputs = length(incoming_data) | ||
res = Dagger.@spawn transform(In.(incoming_data)..., Out.(outgoing_data)...; N_inputs) | ||
Comment on lines
+140
to
+142
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel the whole chain to execute algorithm is a bit too complicated. struct MockupAlgorithm
name::String
runtime::Float64
input_length::UInt
MockupAlgorithm(graph::MetaDiGraph, vertex_id::Int) = begin
runtime = get_prop(graph, vertex_id, :runtime_average_s)
name = get_prop(graph, vertex_id, :node_id)
inputs = length(inneighbors(graph, vertex_id))
new(name, runtime, inputs)
end
end
function (alg::MockupAlgorithm)(args...)
function execute!(alg::MockupAlgorithm, inputs, outputs)
println("Executing $(alg.name)")
populate_data_object!.(outputs)
sleep(alg.runtime)
end
inputs = args[1:alg.input_length]
outputs = args[alg.input_length+1:end]
execute!(alg, inputs, outputs)
end
function schedule_algorithm!(graph::MetaDiGraph, vertex_id::Int)
incoming_data = get_in_promises(graph, vertex_id)
outgoing_data = get_out_promises(graph, vertex_id)
algorithm = MockupAlgorithm(graph, vertex_id)
Dagger.@spawn algorithm(In.(incoming_data)..., Out.(outgoing_data)...)
end and then function schedule_graph(G::MetaDiGraph)
data_vertices = MetaGraphs.filter_vertices(G, :type, "DataObject")
sorted_vertices = MetaGraphs.topological_sort(G)
for data_id in data_vertices
size = get_prop(G, data_id, :size_average_B)
set_prop!(G, data_id, :res_data, DataObject(nothing, size))
end
Dagger.spawn_datadeps() do
for vertex_id in setdiff(sorted_vertices, data_vertices)
res = schedule_algorithm!(G, vertex_id)
set_prop!(G, vertex_id, :res_data, res)
end
end
end Having the algorithms as immutable callable structs might be useful later when we revise the pipeline scheduling |
||
set_prop!(G, vertex_id, :res_data, res) | ||
end | ||
end | ||
end | ||
|
||
function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel, graph_name::String, graph_id::Int) | ||
final_vertices = [] | ||
inc_e_src_map = get_ine_map(G) | ||
|
||
for vertex_id in MetaGraphs.topological_sort(G) | ||
incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G) | ||
set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](graph_name, graph_id, vertex_id, incoming_data...)) | ||
end | ||
schedule_graph(G) | ||
|
||
out_e_src_map = get_oute_map(G) | ||
for vertex_id in MetaGraphs.vertices(G) | ||
|
@@ -125,5 +165,3 @@ function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel | |
|
||
Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, get_vertices_promises(final_vertices, G)...) | ||
end | ||
|
||
AVAILABLE_TRANSFORMS = Dict{String, Function}("Algorithm" => mock_Gaudi_algorithm, "DataObject" => dataobject_algorithm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about getting the size of data to create from the data object instead of having it as another argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The part with making a chunk seems to be problematic with multiple processes when
addprocs
is used (running withjulia -p
strangely seems to be fine)Is it necessary to have data as a chunk if what the algorithm receives is a whole
DataObject
? If I understood correctly the motivation was to place the chunk on the local worker, but as is we have to transfer a wholeDataObject
as it may live somewhere else.I tried removing explicit chunk creation and relay on Dagger to handle it, and ended up with something like:
which seems to work without problems with
addprocs
and-p
. I think we could aim now for minimal version and optimize transfers laterThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it, the Chunk acts as a reference to some data, so moving around a DataObject with a Chunk in it is just moving around a struct that contains a reference rather than moving around the data itself. The actual data should only get transferred when you collect the Chunk in an algorithm.
I agree we can do without it for now.