From f72b4c72524e5d92229b0dd8c9857d8bc9c1b865 Mon Sep 17 00:00:00 2001 From: Josh Ott Date: Wed, 7 Aug 2024 14:43:03 +0200 Subject: [PATCH] Basic runner script using new `run_events` function (#29) * Removed `parse_graphs` from scheduling code * Removed unnecessary `enumerate` * Added function for running events from a graph * Added basic runner script * Moved runner script to `bin` * Updated README New information for runner script * Move main call into if statement Co-authored-by: Mateusz Jakub Fila <37295697+m-fila@users.noreply.github.com> * Make arguments non-positional Co-authored-by: Mateusz Jakub Fila <37295697+m-fila@users.noreply.github.com> * add logging with LocalEventLog and :graphviz_simple --------- Co-authored-by: Mateusz Jakub Fila <37295697+m-fila@users.noreply.github.com> Co-authored-by: Mateusz Jakub Fila --- Project.toml | 1 + README.md | 4 +-- bin/schedule.jl | 63 +++++++++++++++++++++++++++++++++++++++++++++++ src/parsing.jl | 4 +-- src/scheduling.jl | 51 ++++++++++++++++++-------------------- 5 files changed, 92 insertions(+), 31 deletions(-) create mode 100644 bin/schedule.jl diff --git a/Project.toml b/Project.toml index 6ac1830..dff4817 100644 --- a/Project.toml +++ b/Project.toml @@ -7,6 +7,7 @@ authors = ["SmalRat <30555080+SmalRat@users.noreply.github.com>", version = "0.1.0" [deps] +ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63" BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" Cairo = "159f3aea-2a34-519c-b102-8c37f9878175" Colors = "5ae59095-9a9b-59fe-a467-6f913c188581" diff --git a/README.md b/README.md index 78cc429..eae022f 100644 --- a/README.md +++ b/README.md @@ -18,10 +18,10 @@ julia --project -e "import Pkg; Pkg.instantiate()" ## Usage -Run an example: +See options for running with an example data flow graph in `data/`: ``` -julia --project examples/schedule.jl +julia --project bin/schedule.jl --help ``` or use with REPL: diff --git a/bin/schedule.jl b/bin/schedule.jl new file mode 100644 index 0000000..20a3795 --- /dev/null +++ b/bin/schedule.jl @@ -0,0 +1,63 @@ +#!/usr/bin/env julia + +using Distributed +using Dagger +using ArgParse +using FrameworkDemo + +function parse_args() + s = ArgParseSettings() + + @add_arg_table! s begin + "data-flow" + help = "Input data-flow graph as a GraphML file" + arg_type = String + required = true + + "--event-count" + help = "Number of events to be processed" + arg_type = Int + default = 1 + + "--max-concurrent" + help = "Number of slots for graphs to be scheduled concurrently" + arg_type = Int + default = 3 + + "--dot-trace" + help = "Output graphviz dot file for execution logs graph" + arg_type = String + end + + return ArgParse.parse_args(s) +end + +function main() + args = parse_args() + + event_count = args["event-count"] + max_concurrent = args["max-concurrent"] + + if !isnothing(args["dot-trace"]) + @info "Enabled logging" + FrameworkDemo.configure_LocalEventLog() + end + + graph = FrameworkDemo.parse_graphml(args["data-flow"]) + FrameworkDemo.run_events(graph, event_count, max_concurrent) + + if !isnothing(args["dot-trace"]) + logs = Dagger.fetch_logs!() + open(args["dot-trace"], "w") do io + FrameworkDemo.ModGraphVizSimple.show_logs(io, logs, :graphviz_simple) + @info "Written logs dot graph to $(args["dot-trace"])" + end + end +end + + +if abspath(PROGRAM_FILE) == @__FILE__ + main() + rmprocs!(Dagger.Sch.eager_context(), workers()) + rmprocs(workers()) +end diff --git a/src/parsing.jl b/src/parsing.jl index 8ebfbb3..5ad89de 100644 --- a/src/parsing.jl +++ b/src/parsing.jl @@ -7,7 +7,7 @@ function parse_graphml(filename::String)::MetaDiGraph end function show_graph(G) - for (_, v) in enumerate(Graphs.vertices(G)) + for v in Graphs.vertices(G) println("Node: ") print("Node type: ") println(get_prop(G, v, :type)) @@ -29,4 +29,4 @@ function show_graph(G) println(get_prop(G, v, :node_id)) println() end -end \ No newline at end of file +end diff --git a/src/scheduling.jl b/src/scheduling.jl index ad24a41..72bfdd2 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -29,27 +29,10 @@ function (alg::MockupAlgorithm)(args...; coefficients::Vector{Float64}) return alg.name end -function notify_graph_finalization(notifications::RemoteChannel, graph_name::String, graph_id::Int, final_vertices_promises...) - println("Graph: $graph_name, entered notify, graph_id: $graph_id !") - println("Graph: $graph_name, all tasks in the graph finished, graph_id: $graph_id !") +function notify_graph_finalization(notifications::RemoteChannel, graph_id::Int, terminating_results...) + println("Graph $graph_id: all tasks in the graph finished!") put!(notifications, graph_id) - println("Graph: $graph_name, notified, graph_id: $graph_id !") -end - -function parse_graphs(graphs_map::Dict, output_graph_path::String, output_graph_image_path::String) - graphs = [] - for (graph_name, graph_path) in graphs_map - parsed_graph_dot = timestamp_string("$output_graph_path$graph_name") * ".dot" - parsed_graph_image = timestamp_string("$output_graph_image_path$graph_name") * ".png" - G = parse_graphml(graph_path) - - open(parsed_graph_dot, "w") do f - MetaGraphs.savedot(f, G) - end - dot_to_png(parsed_graph_dot, parsed_graph_image) - push!(graphs, (graph_name, G)) - end - return graphs + println("Graph $graph_id: notified!") end function get_promises(graph::MetaDiGraph, vertices::Vector) @@ -87,12 +70,26 @@ function schedule_graph(graph::MetaDiGraph, coefficients::Dagger.Shard) return terminating_results end -function schedule_graph_with_notify(graph::MetaDiGraph, - notifications::RemoteChannel, - graph_name::String, - graph_id::Int, - coefficients::Dagger.Shard) - terminating_results = schedule_graph(graph, coefficients) +function run_events(graph::MetaDiGraph, + event_count::Int, + max_concurrent::Int) + + graphs_tasks = Dict{Int,Dagger.DTask}() + notifications = RemoteChannel(()->Channel{Int}(32)) + coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients() + + for idx in 1:event_count + while length(graphs_tasks) >= max_concurrent + finished_graph_id = take!(notifications) + delete!(graphs_tasks, finished_graph_id) + println("Dispatcher: graph finished - graph $finished_graph_id") + end + + terminating_results = FrameworkDemo.schedule_graph(graph, coefficients) + graphs_tasks[idx] = Dagger.@spawn notify_graph_finalization(notifications, idx, terminating_results...) + + println("Dispatcher: scheduled graph $idx") + end - Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, terminating_results...) + values(graphs_tasks) .|> wait end