Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic runner script using new run_events function #29

Merged
merged 9 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ authors = ["SmalRat <[email protected]>",
version = "0.1.0"

[deps]
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
Cairo = "159f3aea-2a34-519c-b102-8c37f9878175"
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ julia --project -e "import Pkg; Pkg.instantiate()"

## Usage

Run an example:
See options for running with an example data flow graph in `data/`:

```
julia --project examples/schedule.jl
julia --project bin/schedule.jl --help
```

or use with REPL:
Expand Down
63 changes: 63 additions & 0 deletions bin/schedule.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env julia

using Distributed
using Dagger
using ArgParse
using FrameworkDemo

function parse_args()
s = ArgParseSettings()

@add_arg_table! s begin
"data-flow"
help = "Input data-flow graph as a GraphML file"
arg_type = String
required = true

"--event-count"
help = "Number of events to be processed"
arg_type = Int
default = 1

"--max-concurrent"
help = "Number of slots for graphs to be scheduled concurrently"
arg_type = Int
default = 3

"--dot-trace"
help = "Output graphviz dot file for execution logs graph"
arg_type = String
end

return ArgParse.parse_args(s)
end

function main()
args = parse_args()

event_count = args["event-count"]
max_concurrent = args["max-concurrent"]

if !isnothing(args["dot-trace"])
@info "Enabled logging"
FrameworkDemo.configure_LocalEventLog()
end

graph = FrameworkDemo.parse_graphml(args["data-flow"])
FrameworkDemo.run_events(graph, event_count, max_concurrent)

if !isnothing(args["dot-trace"])
logs = Dagger.fetch_logs!()
open(args["dot-trace"], "w") do io
FrameworkDemo.ModGraphVizSimple.show_logs(io, logs, :graphviz_simple)
@info "Written logs dot graph to $(args["dot-trace"])"
end
end
end


if abspath(PROGRAM_FILE) == @__FILE__
main()
rmprocs!(Dagger.Sch.eager_context(), workers())
rmprocs(workers())
end
4 changes: 2 additions & 2 deletions src/parsing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ function parse_graphml(filename::String)::MetaDiGraph
end

function show_graph(G)
for (_, v) in enumerate(Graphs.vertices(G))
for v in Graphs.vertices(G)
println("Node: ")
print("Node type: ")
println(get_prop(G, v, :type))
Expand All @@ -29,4 +29,4 @@ function show_graph(G)
println(get_prop(G, v, :node_id))
println()
end
end
end
51 changes: 24 additions & 27 deletions src/scheduling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,10 @@ function (alg::MockupAlgorithm)(args...; coefficients::Vector{Float64})
return alg.name
end

function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...)
println("Graph: $graph_name, entered notify, graph_id: $graph_id !")
println("Graph: $graph_name, all tasks in the graph finished, graph_id: $graph_id !")
function notify_graph_finalization(notifications::RemoteChannel, graph_id::Int, terminating_results...)
println("Graph $graph_id: all tasks in the graph finished!")
put!(notifications, graph_id)
println("Graph: $graph_name, notified, graph_id: $graph_id !")
end

function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_image_path::String)
graphs = []
for (graph_name, graph_path) in graphs_map
parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot"
parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png"
G = parse_graphml(graph_path)

open(parsed_graph_dot, "w") do f
MetaGraphs.savedot(f, G)
end
dot_to_png(parsed_graph_dot, parsed_graph_image)
push!(graphs, (graph_name, G))
end
return graphs
println("Graph $graph_id: notified!")
end

function get_promises(graph::MetaDiGraph, vertices::Vector)
Expand Down Expand Up @@ -87,12 +70,26 @@ function schedule_graph(graph::MetaDiGraph, coefficients::Dagger.Shard)
return terminating_results
end

function schedule_graph_with_notify(graph::MetaDiGraph,
notifications::RemoteChannel,
graph_name::String,
graph_id::Int,
coefficients::Dagger.Shard)
terminating_results = schedule_graph(graph, coefficients)
function run_events(graph::MetaDiGraph,
event_count::Int,
max_concurrent::Int)

graphs_tasks = Dict{Int,Dagger.DTask}()
notifications = RemoteChannel(()->Channel{Int}(32))
coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients()

for idx in 1:event_count
while length(graphs_tasks) >= max_concurrent
finished_graph_id = take!(notifications)
delete!(graphs_tasks, finished_graph_id)
println("Dispatcher: graph finished - graph $finished_graph_id")
end

terminating_results = FrameworkDemo.schedule_graph(graph, coefficients)
graphs_tasks[idx] = Dagger.@spawn notify_graph_finalization(notifications, idx, terminating_results...)

println("Dispatcher: scheduled graph $idx")
end

Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, terminating_results...)
values(graphs_tasks) .|> wait
end