Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datadeps handling for algorithm tasks #19

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 69 additions & 51 deletions data/sequencer_demo/another_test_graph.graphml
Original file line number Diff line number Diff line change
@@ -1,88 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd">
<key id="key0" for="node" attr.name="class" attr.type="string" />
<key id="key1" for="node" attr.name="node_id" attr.type="string" />
<key id="key2" for="node" attr.name="type" attr.type="string" />
<key id="d0" for="node" attr.name="class" attr.type="string" />
<key id="d1" for="node" attr.name="node_id" attr.type="string" />
<key id="d2" for="node" attr.name="type" attr.type="string" />
<key id="d3" for="node" attr.name="runtime_average_s" attr.type="double" />
<key id="d4" for="node" attr.name="size_kb" attr.type="double" />
<graph id="G" edgedefault="directed" parse.nodeids="free" parse.edgeids="canonical" parse.order="nodesfirst">
<node id="n0">
<data key="key0">MicroProducer</data>
<data key="key1">ProducerA</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroProducer</data>
<data key="d1">ProducerA</data>
<data key="d2">Algorithm</data>
<data key="d3">9.3027e-05</data>
</node>
<node id="n1">
<data key="key0"></data>
<data key="key1">A</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">A</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n2">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerB</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerB</data>
<data key="d2">Algorithm</data>
<data key="d3">4.2463e-05</data>
</node>
<node id="n3">
<data key="key0"></data>
<data key="key1">C</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">C</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n4">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerD</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerD</data>
<data key="d2">Algorithm</data>
<data key="d3">8.241000000000001e-06</data>
</node>
<node id="n5">
<data key="key0"></data>
<data key="key1">E</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">E</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n6">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerF</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerF</data>
<data key="d2">Algorithm</data>
<data key="d3">7.264e-06</data>
</node>
<node id="n7">
<data key="key0"></data>
<data key="key1">G</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">G</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n8">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerH</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerH</data>
<data key="d2">Algorithm</data>
<data key="d3">6.705e-06</data>
</node>
<node id="n9">
<data key="key0"></data>
<data key="key1">I</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">I</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n10">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerJ</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerJ</data>
<data key="d2">Algorithm</data>
<data key="d3">8.59e-06</data>
</node>
<node id="n11">
<data key="key0"></data>
<data key="key1">K</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">K</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n12">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerL</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerL</data>
<data key="d2">Algorithm</data>
<data key="d3">8.94e-06</data>
</node>
<node id="n13">
<data key="key0"></data>
<data key="key1">M</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">M</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>
<node id="n14">
<data key="key0">MicroTransformer</data>
<data key="key1">TransformerN</data>
<data key="key2">Algorithm</data>
<data key="d0">MicroTransformer</data>
<data key="d1">TransformerN</data>
<data key="d2">Algorithm</data>
<data key="d3">8.94e-06</data>
</node>
<node id="n15">
<data key="key0"></data>
<data key="key1">O</data>
<data key="key2">DataObject</data>
<data key="d0"></data>
<data key="d1">O</data>
<data key="d2">DataObject</data>
<data key="d4">8.0</data>
</node>

<edge id="e0" source="n0" target="n1">
Expand Down
1 change: 1 addition & 0 deletions data/sequencer_demo/df_sequencer_demo.graphml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version='1.0' encoding='utf-8'?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd"><key id="d4" for="node" attr.name="size_average_B" attr.type="double"/>
<key id="d4" for="node" attr.name="size_kb" attr.type="double"/>
<key id="d3" for="node" attr.name="class" attr.type="string"/>
<key id="d2" for="node" attr.name="node_id" attr.type="string"/>
<key id="d1" for="node" attr.name="runtime_average_s" attr.type="double"/>
Expand Down
14 changes: 8 additions & 6 deletions examples/schedule.jl
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Distributed

if abspath(PROGRAM_FILE) == @__FILE__
new_procs = addprocs(12) # Set the number of workers
end
## doesn't work with datadeps! threads work though
# if abspath(PROGRAM_FILE) == @__FILE__
# new_procs = addprocs(12) # Set the number of workers
# end
##

using Dagger
using Graphs
using MetaGraphs
using FrameworkDemo
using FrameworkDemo.ModGraphVizSimple # This is a workaround to make visualization work until the bugs are fixed in the package.


# Defining constants
output_dir = "results"
graph1_path = "./data/sequencer_demo/df_sequencer_demo.graphml"
Expand Down Expand Up @@ -46,7 +47,8 @@ function execution(graphs_map)
results = []
for (g_name, g) in graphs
g_map = Dict{Int, Any}()
for vertex_id in Graphs.vertices(g)
data_vertices = MetaGraphs.filter_vertices(g, :type, "DataObject")
for vertex_id in data_vertices
future = get_prop(g, vertex_id, :res_data)
g_map[vertex_id] = fetch(future)
end
Expand All @@ -67,7 +69,7 @@ function main(graphs_map)
#
# OR
#
# configure_webdash_multievent()
# FrameworkDemo.configure_webdash_multievent()

@time execution(graphs_map)

Expand Down
99 changes: 70 additions & 29 deletions src/scheduling.jl
Original file line number Diff line number Diff line change
@@ -1,18 +1,55 @@
import Dagger
using Distributed
using Graphs
using MetaGraphs

mutable struct DataObject
data
size::Float64
end

function populate_data_object!(object::DataObject, data)
proc = Dagger.thunk_processor()
scope = Dagger.scope(worker=myid())

chunk = Dagger.tochunk(data, proc, scope)

object.data = chunk
end
Comment on lines +11 to +18
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about getting the size of data to create from the data object instead of having it as another argument?

Suggested change
function populate_data_object!(object::DataObject, data)
proc = Dagger.thunk_processor()
scope = Dagger.scope(worker=myid())
chunk = Dagger.tochunk(data, proc, scope)
object.data = chunk
end
function populate_data_object!(object::DataObject)
data = ' '^round(Int, object.size)
proc = Dagger.thunk_processor()
scope = Dagger.scope(worker=myid())
chunk = Dagger.tochunk(data, proc, scope)
object.data = chunk
end

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part with making a chunk seems to be problematic with multiple processes when addprocs is used (running with julia -p strangely seems to be fine)

Is it necessary to have data as a chunk if what the algorithm receives is a whole DataObject? If I understood correctly the motivation was to place the chunk on the local worker, but as is we have to transfer a whole DataObject as it may live somewhere else.

I tried removing explicit chunk creation and relay on Dagger to handle it, and ended up with something like:

mutable struct DataObject
    data
    size::Float64
end

function populate_data_object!(object::DataObject)
    object.data = " "^round(Int, object.size)
end

which seems to work without problems with addprocs and -p. I think we could aim now for minimal version and optimize transfers later

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, the Chunk acts as a reference to some data, so moving around a DataObject with a Chunk in it is just moving around a struct that contains a reference rather than moving around the data itself. The actual data should only get transferred when you collect the Chunk in an algorithm.

I agree we can do without it for now.


# Algorithms
function mock_Gaudi_algorithm(graph_name, graph_id, vertex_id, data...)
println("Graph: $graph_name, Gaudi algorithm for vertex $vertex_id !")
sleep(1)
return vertex_id
function _algorithm(graph::MetaDiGraph, vertex_id::Int)
joott marked this conversation as resolved.
Show resolved Hide resolved
runtime = get_prop(graph, vertex_id, :runtime_average_s)

function algorithm(inputs, outputs)
println("Gaudi algorithm for vertex $vertex_id !")

for output in outputs
bytes = round(Int, output.size * 1e3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 1e3 is not right since what we put in the size is measured in bytes.
In another comment I propose to move that part inside populate_data_object!

populate_data_object!(output, ' '^bytes)
end

sleep(runtime)
end

return algorithm
end

function dataobject_algorithm(graph_name, graph_id, vertex_id, data...)
println("Graph: $graph_name, Dataobject algorithm for vertex $vertex_id !")
sleep(0.1)
return vertex_id
AVAILABLE_TRANSFORMS = Dict{String, Function}(
"Algorithm" => _algorithm,
)
joott marked this conversation as resolved.
Show resolved Hide resolved

function get_transform(graph::MetaDiGraph, vertex_id::Int)
type = get_prop(graph, vertex_id, :type)

function f(data...; N_inputs)
inputs = data[1:N_inputs]
outputs = data[N_inputs+1:end]
transform = AVAILABLE_TRANSFORMS[type](graph, vertex_id)
return transform(inputs, outputs)
end

return f
end

function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...)
Expand All @@ -28,7 +65,7 @@ function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_
parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot"
parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png"
G = parse_graphml([graph_path])

open(parsed_graph_dot, "w") do f
MetaGraphs.savedot(f, G)
end
Expand All @@ -45,7 +82,7 @@ function get_ine_map(G)
for edge in Graphs.edges(G)
src_vertex = src(edge)
dest_vertex = dst(edge)

if haskey(incoming_edges_sources_map, dest_vertex)
push!(incoming_edges_sources_map[dest_vertex], src_vertex)
else
Expand Down Expand Up @@ -82,33 +119,39 @@ function get_vertices_promises(vertices::Vector, G::MetaDiGraph)
return promises
end

function get_deps_promises(vertex_id, map, G)
incoming_data = []
if haskey(map, vertex_id)
for src in map[vertex_id]
push!(incoming_data, get_prop(G, src, :res_data))
end
end
return incoming_data
function get_in_promises(G, vertex_id)
return [get_prop(G, src, :res_data) for src in inneighbors(G, vertex_id)]
end

function get_out_promises(G, vertex_id)
return [get_prop(G, src, :res_data) for src in outneighbors(G, vertex_id)]
end

function schedule_graph(G::MetaDiGraph)
inc_e_src_map = get_ine_map(G)
data_vertices = MetaGraphs.filter_vertices(G, :type, "DataObject")
sorted_vertices = MetaGraphs.topological_sort(G)

for vertex_id in MetaGraphs.topological_sort(G)
incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G)
set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](name, graph_id, vertex_id, incoming_data...))
for data_id in data_vertices
size = get_prop(G, data_id, :size_kb)
set_prop!(G, data_id, :res_data, DataObject(nothing, size))
end

Dagger.spawn_datadeps() do
for vertex_id in setdiff(sorted_vertices, data_vertices)
incoming_data = get_in_promises(G, vertex_id)
outgoing_data = get_out_promises(G, vertex_id)
transform = get_transform(G, vertex_id)
N_inputs = length(incoming_data)
res = Dagger.@spawn transform(In.(incoming_data)..., Out.(outgoing_data)...; N_inputs)
Comment on lines +140 to +142
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the whole chain to execute algorithm is a bit too complicated.
In principle all the parameters from DAG could be extracted and "algorithm" created before anything is submitted to Dagger. I think at this point it may by clearer to replace some closures with callable structs
I'd replace get_transform, AVAILABLE_TRANSFORMS and _algorithm with something like this

struct MockupAlgorithm
    name::String
    runtime::Float64
    input_length::UInt
    MockupAlgorithm(graph::MetaDiGraph, vertex_id::Int) = begin
        runtime = get_prop(graph, vertex_id, :runtime_average_s)
        name = get_prop(graph, vertex_id, :node_id)
        inputs = length(inneighbors(graph, vertex_id))
        new(name, runtime, inputs)
    end
end

function (alg::MockupAlgorithm)(args...)

    function execute!(alg::MockupAlgorithm, inputs, outputs)
        println("Executing $(alg.name)")
        populate_data_object!.(outputs)
        sleep(alg.runtime)
    end

    inputs = args[1:alg.input_length]
    outputs = args[alg.input_length+1:end]
    execute!(alg, inputs, outputs)
end

function schedule_algorithm!(graph::MetaDiGraph, vertex_id::Int)
    incoming_data = get_in_promises(graph, vertex_id)
    outgoing_data = get_out_promises(graph, vertex_id)
    algorithm = MockupAlgorithm(graph, vertex_id)
    Dagger.@spawn algorithm(In.(incoming_data)..., Out.(outgoing_data)...)
end

and then

function schedule_graph(G::MetaDiGraph)
    data_vertices = MetaGraphs.filter_vertices(G, :type, "DataObject")
    sorted_vertices = MetaGraphs.topological_sort(G)

    for data_id in data_vertices
        size = get_prop(G, data_id, :size_average_B)
        set_prop!(G, data_id, :res_data, DataObject(nothing, size))
    end

    Dagger.spawn_datadeps() do
        for vertex_id in setdiff(sorted_vertices, data_vertices)
            res = schedule_algorithm!(G, vertex_id)
            set_prop!(G, vertex_id, :res_data, res)
        end
    end
end

Having the algorithms as immutable callable structs might be useful later when we revise the pipeline scheduling

set_prop!(G, vertex_id, :res_data, res)
end
end
end

function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel, graph_name::String, graph_id::Int)
final_vertices = []
inc_e_src_map = get_ine_map(G)

for vertex_id in MetaGraphs.topological_sort(G)
incoming_data = get_deps_promises(vertex_id, inc_e_src_map, G)
set_prop!(G, vertex_id, :res_data, Dagger.@spawn AVAILABLE_TRANSFORMS[get_prop(G, vertex_id, :type)](graph_name, graph_id, vertex_id, incoming_data...))
end
schedule_graph(G)

out_e_src_map = get_oute_map(G)
for vertex_id in MetaGraphs.vertices(G)
Expand All @@ -125,5 +168,3 @@ function schedule_graph_with_notify(G::MetaDiGraph, notifications::RemoteChannel

Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, get_vertices_promises(final_vertices, G)...)
end

AVAILABLE_TRANSFORMS = Dict{String, Function}("Algorithm" => mock_Gaudi_algorithm, "DataObject" => dataobject_algorithm)
Loading