Skip to content

Commit

Permalink
Basic runner script using new run_events function (#29)
Browse files Browse the repository at this point in the history
* Removed `parse_graphs` from scheduling code

* Removed unnecessary `enumerate`

* Added function for running events from a graph

* Added basic runner script

* Moved runner script to `bin`

* Updated README

New information for runner script

* Move main call into if statement

Co-authored-by: Mateusz Jakub Fila <[email protected]>

* Make arguments non-positional

Co-authored-by: Mateusz Jakub Fila <[email protected]>

* add logging with LocalEventLog and :graphviz_simple

---------

Co-authored-by: Mateusz Jakub Fila <[email protected]>
Co-authored-by: Mateusz Jakub Fila <[email protected]>
  • Loading branch information
3 people authored Aug 7, 2024
1 parent 2f9bb77 commit f72b4c7
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 31 deletions.
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ authors = ["SmalRat <[email protected]>",
version = "0.1.0"

[deps]
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
Cairo = "159f3aea-2a34-519c-b102-8c37f9878175"
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ julia --project -e "import Pkg; Pkg.instantiate()"

## Usage

Run an example:
See options for running with an example data flow graph in `data/`:

```
julia --project examples/schedule.jl
julia --project bin/schedule.jl --help
```

or use with REPL:
Expand Down
63 changes: 63 additions & 0 deletions bin/schedule.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env julia

using Distributed
using Dagger
using ArgParse
using FrameworkDemo

function parse_args()
s = ArgParseSettings()

@add_arg_table! s begin
"data-flow"
help = "Input data-flow graph as a GraphML file"
arg_type = String
required = true

"--event-count"
help = "Number of events to be processed"
arg_type = Int
default = 1

"--max-concurrent"
help = "Number of slots for graphs to be scheduled concurrently"
arg_type = Int
default = 3

"--dot-trace"
help = "Output graphviz dot file for execution logs graph"
arg_type = String
end

return ArgParse.parse_args(s)
end

function main()
args = parse_args()

event_count = args["event-count"]
max_concurrent = args["max-concurrent"]

if !isnothing(args["dot-trace"])
@info "Enabled logging"
FrameworkDemo.configure_LocalEventLog()
end

graph = FrameworkDemo.parse_graphml(args["data-flow"])
FrameworkDemo.run_events(graph, event_count, max_concurrent)

if !isnothing(args["dot-trace"])
logs = Dagger.fetch_logs!()
open(args["dot-trace"], "w") do io
FrameworkDemo.ModGraphVizSimple.show_logs(io, logs, :graphviz_simple)
@info "Written logs dot graph to $(args["dot-trace"])"
end
end
end


if abspath(PROGRAM_FILE) == @__FILE__
main()
rmprocs!(Dagger.Sch.eager_context(), workers())
rmprocs(workers())
end
4 changes: 2 additions & 2 deletions src/parsing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ function parse_graphml(filename::String)::MetaDiGraph
end

function show_graph(G)
for (_, v) in enumerate(Graphs.vertices(G))
for v in Graphs.vertices(G)
println("Node: ")
print("Node type: ")
println(get_prop(G, v, :type))
Expand All @@ -29,4 +29,4 @@ function show_graph(G)
println(get_prop(G, v, :node_id))
println()
end
end
end
51 changes: 24 additions & 27 deletions src/scheduling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,10 @@ function (alg::MockupAlgorithm)(args...; coefficients::Vector{Float64})
return alg.name
end

function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...)
println("Graph: $graph_name, entered notify, graph_id: $graph_id !")
println("Graph: $graph_name, all tasks in the graph finished, graph_id: $graph_id !")
function notify_graph_finalization(notifications::RemoteChannel, graph_id::Int, terminating_results...)
println("Graph $graph_id: all tasks in the graph finished!")
put!(notifications, graph_id)
println("Graph: $graph_name, notified, graph_id: $graph_id !")
end

function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_image_path::String)
graphs = []
for (graph_name, graph_path) in graphs_map
parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot"
parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png"
G = parse_graphml(graph_path)

open(parsed_graph_dot, "w") do f
MetaGraphs.savedot(f, G)
end
dot_to_png(parsed_graph_dot, parsed_graph_image)
push!(graphs, (graph_name, G))
end
return graphs
println("Graph $graph_id: notified!")
end

function get_promises(graph::MetaDiGraph, vertices::Vector)
Expand Down Expand Up @@ -87,12 +70,26 @@ function schedule_graph(graph::MetaDiGraph, coefficients::Dagger.Shard)
return terminating_results
end

function schedule_graph_with_notify(graph::MetaDiGraph,
notifications::RemoteChannel,
graph_name::String,
graph_id::Int,
coefficients::Dagger.Shard)
terminating_results = schedule_graph(graph, coefficients)
function run_events(graph::MetaDiGraph,
event_count::Int,
max_concurrent::Int)

graphs_tasks = Dict{Int,Dagger.DTask}()
notifications = RemoteChannel(()->Channel{Int}(32))
coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients()

for idx in 1:event_count
while length(graphs_tasks) >= max_concurrent
finished_graph_id = take!(notifications)
delete!(graphs_tasks, finished_graph_id)
println("Dispatcher: graph finished - graph $finished_graph_id")
end

terminating_results = FrameworkDemo.schedule_graph(graph, coefficients)
graphs_tasks[idx] = Dagger.@spawn notify_graph_finalization(notifications, idx, terminating_results...)

println("Dispatcher: scheduled graph $idx")
end

Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, terminating_results...)
values(graphs_tasks) .|> wait
end

0 comments on commit f72b4c7

Please sign in to comment.