From 7f7bda7fc008b1fac7ceb639f8ede52f218afa6c Mon Sep 17 00:00:00 2001 From: Mateusz Jakub Fila Date: Wed, 14 Aug 2024 09:22:52 +0200 Subject: [PATCH] fix formatting --- benchmark/benchmarks.jl | 4 +- benchmark/suite/cpu_crunching.jl | 36 ++--- bin/schedule.jl | 15 +- examples/dummy_tasks.jl | 1 - .../logging/getting_logs/enable_logging.jl | 11 +- .../viz_enable_logging_modernAPI.jl | 13 +- .../viz_enable_logging_oldAPI.jl | 13 +- .../viz_raw_enable_logging_oldAPI.jl | 7 +- .../log_sink/viz_modernAPI_localeventlog.jl | 4 +- .../log_sink/viz_modernAPI_multieventlog.jl | 7 +- .../log_sink/viz_oldAPI_localeventlog.jl | 4 +- .../log_sink/viz_oldAPI_multieventlog.jl | 7 +- .../render_logs/render_graphviz_complex.jl | 37 ++--- .../render_logs/render_graphviz_simple.jl | 12 +- .../render_logs/render_plots_gantt.jl | 31 ++-- .../render_logs/render_plots_gantt_ps.jl | 31 ++-- .../webdash.jl | 8 +- examples/parsing_graphs.jl | 2 +- examples/processors_example.jl | 6 +- examples/schedule.jl | 22 ++- src/ModGraphVizSimple.jl | 138 ++++++++++-------- src/cpu_crunching.jl | 6 +- src/logging.jl | 14 +- src/scheduling.jl | 40 ++--- src/visualization.jl | 2 +- test/demo_workflows.jl | 6 +- test/parsing.jl | 2 +- test/runtests.jl | 16 +- test/scheduling.jl | 13 +- tools/gaudi_alg_exec_dist.jl | 24 +-- 30 files changed, 282 insertions(+), 250 deletions(-) diff --git a/benchmark/benchmarks.jl b/benchmark/benchmarks.jl index 151753d..8938462 100644 --- a/benchmark/benchmarks.jl +++ b/benchmark/benchmarks.jl @@ -9,9 +9,9 @@ include("suite/cpu_crunching.jl") if abspath(PROGRAM_FILE) == @__FILE__ @info "tuning benchmark suite" - tune!(SUITE, verbose=true) + tune!(SUITE, verbose = true) @info "running benchmark suite" - results = run(SUITE, verbose=true) + results = run(SUITE, verbose = true) @info "running benchmark suite" results for processor in result_processors processor(results) diff --git a/benchmark/suite/cpu_crunching.jl b/benchmark/suite/cpu_crunching.jl index 45b40cf..b28e704 100644 --- a/benchmark/suite/cpu_crunching.jl +++ b/benchmark/suite/cpu_crunching.jl @@ -3,15 +3,16 @@ import Printf SUITE["cpu_crunching"] = BenchmarkGroup(["cpu_crunching"]) SUITE["cpu_crunching"]["find_primes"] = BenchmarkGroup(["find_primes"]) -for i in exp10.(range(0, stop=6, length=10)) +for i in exp10.(range(0, stop = 6, length = 10)) n = ceil(Int, i) - SUITE["cpu_crunching"]["find_primes"][n] = @benchmarkable FrameworkDemo.find_primes($n) evals = 1 samples = 1 + SUITE["cpu_crunching"]["find_primes"][n] = @benchmarkable FrameworkDemo.find_primes($n) evals=1 samples=1 end SUITE["cpu_crunching"]["crunch_for_seconds"] = BenchmarkGroup(["crunch_for_seconds"]) coef = FrameworkDemo.calculate_coefficients() -for i in exp10.(range(-6, stop=1.5, length=10)) - SUITE["cpu_crunching"]["crunch_for_seconds"][i] = @benchmarkable FrameworkDemo.crunch_for_seconds($i, $coef) evals = 1 samples = 1 +for i in exp10.(range(-6, stop = 1.5, length = 10)) + SUITE["cpu_crunching"]["crunch_for_seconds"][i] = @benchmarkable FrameworkDemo.crunch_for_seconds($i, + $coef) evals=1 samples=1 end function log_ticks(range) @@ -21,14 +22,14 @@ function log_ticks(range) end function plot_find_primes(results::BenchmarkGroup) - primes_r = sort(collect(results["cpu_crunching"]["find_primes"]), by=first) + primes_r = sort(collect(results["cpu_crunching"]["find_primes"]), by = first) x = first.(primes_r) y = primes_r .|> last .|> minimum .|> time |> x -> x * 1e-9 - p = plot(x, y, xaxis=:log10, yaxis=:log10, xlabel="n", ylabel="time [s]", - title="find_primes(n)", label="find_primes", - marker=(:circle, 5), linewidth=3, - xticks=log_ticks(x), yticks=log_ticks(y), - xguidefonthalign=:right, yguidefontvalign=:top, legend=:topleft) + p = plot(x, y, xaxis = :log10, yaxis = :log10, xlabel = "n", ylabel = "time [s]", + title = "find_primes(n)", label = "find_primes", + marker = (:circle, 5), linewidth = 3, + xticks = log_ticks(x), yticks = log_ticks(y), + xguidefonthalign = :right, yguidefontvalign = :top, legend = :topleft) filename = "bench_find_primes.png" savefig(p, filename) @info "Results of benchmark cpu_crunching/find_primes written to $filename" @@ -37,15 +38,16 @@ end push!(result_processors, plot_find_primes) function plot_crunch_for_seconds(results::BenchmarkGroup) - crunch_r = sort(collect(results["cpu_crunching"]["crunch_for_seconds"]), by=first) + crunch_r = sort(collect(results["cpu_crunching"]["crunch_for_seconds"]), by = first) x = first.(crunch_r) y = crunch_r .|> last .|> minimum .|> time |> x -> x * 1e-9 - p = plot(x, (y - x) ./ x, xaxis=:log10, xlabel="t [s]", ylabel="Time relative error", - yformatter=x -> Printf.@sprintf("%.1f%%", 100 * x), - xticks=log_ticks(x), - title="crunch_for_seconds(t)", label="crunch_for_seconds", - marker=(:circle, 5), linewidth=3, - xguidefonthalign=:right, yguidefontvalign=:top, legend=:bottomright) + p = plot(x, (y - x) ./ x, xaxis = :log10, xlabel = "t [s]", + ylabel = "Time relative error", + yformatter = x -> Printf.@sprintf("%.1f%%", 100*x), + xticks = log_ticks(x), + title = "crunch_for_seconds(t)", label = "crunch_for_seconds", + marker = (:circle, 5), linewidth = 3, + xguidefonthalign = :right, yguidefontvalign = :top, legend = :bottomright) filename = "bench_crunch_for_seconds.png" savefig(p, filename) @info "Results of benchmark cpu_crunching/crunch_for_seconds written to $filename" diff --git a/bin/schedule.jl b/bin/schedule.jl index d5b8d01..1d41f85 100644 --- a/bin/schedule.jl +++ b/bin/schedule.jl @@ -31,7 +31,6 @@ function parse_args() "--fast" help = "Execute algorithms immediately skipping algorithm runtime information and crunching" action = :store_true - end return ArgParse.parse_args(s) @@ -46,15 +45,14 @@ function main() end graph = FrameworkDemo.parse_graphml(args["data-flow"]) - event_count=args["event-count"] - max_concurrent=args["max-concurrent"] - fast=args["fast"] + event_count = args["event-count"] + max_concurrent = args["max-concurrent"] + fast = args["fast"] @time "Pipeline execution" FrameworkDemo.run_events(graph; - event_count=event_count, - max_concurrent=max_concurrent, - fast=fast - ) + event_count = event_count, + max_concurrent = max_concurrent, + fast = fast) if !isnothing(args["dot-trace"]) logs = Dagger.fetch_logs!() @@ -65,7 +63,6 @@ function main() end end - if abspath(PROGRAM_FILE) == @__FILE__ main() rmprocs!(Dagger.Sch.eager_context(), workers()) diff --git a/examples/dummy_tasks.jl b/examples/dummy_tasks.jl index 0ce87a3..0177526 100644 --- a/examples/dummy_tasks.jl +++ b/examples/dummy_tasks.jl @@ -81,4 +81,3 @@ function modernAPI_graph_setup(time_to_sleep) return f end - diff --git a/examples/logging/getting_logs/enable_logging.jl b/examples/logging/getting_logs/enable_logging.jl index 6b02518..b2b507a 100644 --- a/examples/logging/getting_logs/enable_logging.jl +++ b/examples/logging/getting_logs/enable_logging.jl @@ -1,14 +1,13 @@ include("../../dummy_tasks.jl") # here Dagger is imported as well -Dagger.enable_logging!(tasknames=true, # Here we can choose the consumers to use. Check source code for more. -taskdeps=true, -taskargs=true, -taskargmoves=true, -) +Dagger.enable_logging!(tasknames = true, # Here we can choose the consumers to use. Check source code for more. + taskdeps = true, + taskargs = true, + taskargmoves = true) t = modernAPI_graph_setup(1) # Can also be the old API fetch(t) logs = Dagger.fetch_logs!() println("\n\nRaw logs:") -println(logs) # <- That should be a Dictionary of the form: {worker_id -> {consumer_symbol -> [list of the events on that worker after being processed by a consumer corresponding to this symbol]}} \ No newline at end of file +println(logs) # <- That should be a Dictionary of the form: {worker_id -> {consumer_symbol -> [list of the events on that worker after being processed by a consumer corresponding to this symbol]}} diff --git a/examples/logging/processing_and_visualizing_logs/enable_logging/viz_enable_logging_modernAPI.jl b/examples/logging/processing_and_visualizing_logs/enable_logging/viz_enable_logging_modernAPI.jl index 07b7e1c..13d1b0b 100644 --- a/examples/logging/processing_and_visualizing_logs/enable_logging/viz_enable_logging_modernAPI.jl +++ b/examples/logging/processing_and_visualizing_logs/enable_logging/viz_enable_logging_modernAPI.jl @@ -21,11 +21,10 @@ end function configure_MultiEventLog() Dagger.enable_logging!(timeline = true, # Some example configuration - tasknames=true, - taskdeps=true, - taskargs=true, - taskargmoves=true, - ) + tasknames = true, + taskdeps = true, + taskargs = true, + taskargmoves = true) Dagger.Sch.eager_context().log_sink[:full] = Dagger.TimespanLogging.Events.FullMetrics() end @@ -49,4 +48,6 @@ open(log_file_name, "w") do io # FrameworkDemo.Dagger.show_logs(graph_thunk, logs, :graphviz_simple) # Returns the string representation of the graph end -FrameworkDemo.dot_to_png(log_file_name, FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, 2000) \ No newline at end of file +FrameworkDemo.dot_to_png(log_file_name, + FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, + 2000) diff --git a/examples/logging/processing_and_visualizing_logs/enable_logging/viz_enable_logging_oldAPI.jl b/examples/logging/processing_and_visualizing_logs/enable_logging/viz_enable_logging_oldAPI.jl index 7f00567..2f368d3 100644 --- a/examples/logging/processing_and_visualizing_logs/enable_logging/viz_enable_logging_oldAPI.jl +++ b/examples/logging/processing_and_visualizing_logs/enable_logging/viz_enable_logging_oldAPI.jl @@ -23,11 +23,10 @@ end function configure_MultiEventLog() Dagger.enable_logging!(timeline = true, # Some example configuration - tasknames=true, - taskdeps=true, - taskargs=true, - taskargmoves=true, - ) + tasknames = true, + taskdeps = true, + taskargs = true, + taskargmoves = true) Dagger.Sch.eager_context().log_sink[:full] = Dagger.TimespanLogging.Events.FullMetrics() end @@ -51,4 +50,6 @@ open(log_file_name, "w") do io # FrameworkDemo.Dagger.show_logs(graph_thunk, logs, :graphviz_simple) # Returns the string representation of the graph end -FrameworkDemo.dot_to_png(log_file_name, FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, 2000) +FrameworkDemo.dot_to_png(log_file_name, + FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, + 2000) diff --git a/examples/logging/processing_and_visualizing_logs/enable_logging/viz_raw_enable_logging_oldAPI.jl b/examples/logging/processing_and_visualizing_logs/enable_logging/viz_raw_enable_logging_oldAPI.jl index 3e018af..5c58081 100644 --- a/examples/logging/processing_and_visualizing_logs/enable_logging/viz_raw_enable_logging_oldAPI.jl +++ b/examples/logging/processing_and_visualizing_logs/enable_logging/viz_raw_enable_logging_oldAPI.jl @@ -20,7 +20,10 @@ println(collect(ctx, graph_thunk)) # Wait for the graph execution and fetch the log_file_name = FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".dot" open(log_file_name, "w") do io #FrameworkDemo.ModGraphVizSimple.show_logs(io, graph_thunk, Dagger.fetch_logs!(), :graphviz_simple) - FrameworkDemo.ModGraphVizSimple.show_logs(graph_thunk, Dagger.fetch_logs!(), :graphviz_simple) + FrameworkDemo.ModGraphVizSimple.show_logs(graph_thunk, Dagger.fetch_logs!(), + :graphviz_simple) end -FrameworkDemo.dot_to_png(log_file_name, FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 700, 700) +FrameworkDemo.dot_to_png(log_file_name, + FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 700, + 700) diff --git a/examples/logging/processing_and_visualizing_logs/log_sink/viz_modernAPI_localeventlog.jl b/examples/logging/processing_and_visualizing_logs/log_sink/viz_modernAPI_localeventlog.jl index 208f5fc..068cfaf 100644 --- a/examples/logging/processing_and_visualizing_logs/log_sink/viz_modernAPI_localeventlog.jl +++ b/examples/logging/processing_and_visualizing_logs/log_sink/viz_modernAPI_localeventlog.jl @@ -24,4 +24,6 @@ open(log_file_name, "w") do io # or FrameworkDemo.ModGraphVizSimple.show_logs(io, graph_thunk, :graphviz_simple) # Dagger.show_logs(io, graph_thunk, :graphviz_simple) after the bug fix in the package end -FrameworkDemo.dot_to_png(log_file_name, FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, 2000) \ No newline at end of file +FrameworkDemo.dot_to_png(log_file_name, + FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, + 2000) diff --git a/examples/logging/processing_and_visualizing_logs/log_sink/viz_modernAPI_multieventlog.jl b/examples/logging/processing_and_visualizing_logs/log_sink/viz_modernAPI_multieventlog.jl index aa87c00..190d647 100644 --- a/examples/logging/processing_and_visualizing_logs/log_sink/viz_modernAPI_multieventlog.jl +++ b/examples/logging/processing_and_visualizing_logs/log_sink/viz_modernAPI_multieventlog.jl @@ -40,9 +40,12 @@ spans = Dagger.TimespanLogging.build_timespans(vcat(values(events_logs)...)).com timespan_logs = convert(Vector{Dagger.TimespanLogging.Timespan}, spans) log_file_name = FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".dot" open(log_file_name, "w") do io - FrameworkDemo.ModGraphVizSimple.show_logs(io, graph_thunk, timespan_logs, :graphviz_simple) # Dagger.show_logs(io, graph_thunk, timespan_logs, :graphviz_simple) after the bug fix in the package + FrameworkDemo.ModGraphVizSimple.show_logs(io, graph_thunk, timespan_logs, + :graphviz_simple) # Dagger.show_logs(io, graph_thunk, timespan_logs, :graphviz_simple) after the bug fix in the package # or FrameworkDemo.ModGraphVizSimpleExt.show_logs(io, timespan_logs, :graphviz_simple) # Dagger.show_logs(io, timespan_logs, :graphviz_simple) after the bug fix in the package # or FrameworkDemo.ModGraphVizSimpleExt.show_logs(io, graph_thunk, :graphviz_simple) # Dagger.show_logs(io, graph_thunk, :graphviz_simple) after the bug fix in the package end -FrameworkDemo.dot_to_png(log_file_name, FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, 2000) \ No newline at end of file +FrameworkDemo.dot_to_png(log_file_name, + FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, + 2000) diff --git a/examples/logging/processing_and_visualizing_logs/log_sink/viz_oldAPI_localeventlog.jl b/examples/logging/processing_and_visualizing_logs/log_sink/viz_oldAPI_localeventlog.jl index 3b38e04..b9fdb7f 100644 --- a/examples/logging/processing_and_visualizing_logs/log_sink/viz_oldAPI_localeventlog.jl +++ b/examples/logging/processing_and_visualizing_logs/log_sink/viz_oldAPI_localeventlog.jl @@ -24,4 +24,6 @@ open(log_file_name, "w") do io # or ModGraphVizSimpleExt.show_logs(io, graph_thunk, :graphviz_simple) # Dagger.show_logs(io, graph_thunk, :graphviz_simple) after the bug fix in the package end -FrameworkDemo.dot_to_png(log_file_name, FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, 2000) \ No newline at end of file +FrameworkDemo.dot_to_png(log_file_name, + FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, + 2000) diff --git a/examples/logging/processing_and_visualizing_logs/log_sink/viz_oldAPI_multieventlog.jl b/examples/logging/processing_and_visualizing_logs/log_sink/viz_oldAPI_multieventlog.jl index 44b8a03..66f68f6 100644 --- a/examples/logging/processing_and_visualizing_logs/log_sink/viz_oldAPI_multieventlog.jl +++ b/examples/logging/processing_and_visualizing_logs/log_sink/viz_oldAPI_multieventlog.jl @@ -40,9 +40,12 @@ spans = Dagger.TimespanLogging.build_timespans(vcat(values(events_logs)...)).com timespan_logs = convert(Vector{Dagger.TimespanLogging.Timespan}, spans) log_file_name = FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".dot" open(log_file_name, "w") do io - FrameworkDemo.ModGraphVizSimple.show_logs(io, graph_thunk, timespan_logs, :graphviz_simple) # Dagger.show_logs(io, graph_thunk, timespan_logs, :graphviz_simple) after the bug fix in the package + FrameworkDemo.ModGraphVizSimple.show_logs(io, graph_thunk, timespan_logs, + :graphviz_simple) # Dagger.show_logs(io, graph_thunk, timespan_logs, :graphviz_simple) after the bug fix in the package # or ModGraphVizSimpleExt.show_logs(io, timespan_logs, :graphviz_simple) # Dagger.show_logs(io, timespan_logs, :graphviz_simple) after the bug fix in the package # or ModGraphVizSimpleExt.show_logs(io, graph_thunk, :graphviz_simple) # Dagger.show_logs(io, graph_thunk, :graphviz_simple) after the bug fix in the package end -FrameworkDemo.dot_to_png(log_file_name, FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, 2000) \ No newline at end of file +FrameworkDemo.dot_to_png(log_file_name, + FrameworkDemo.timestamp_string(FILENAME_TEMPLATE) * ".png", 2000, + 2000) diff --git a/examples/logging/processing_and_visualizing_logs/render_logs/render_graphviz_complex.jl b/examples/logging/processing_and_visualizing_logs/render_logs/render_graphviz_complex.jl index 78a13fb..af761d7 100644 --- a/examples/logging/processing_and_visualizing_logs/render_logs/render_graphviz_complex.jl +++ b/examples/logging/processing_and_visualizing_logs/render_logs/render_graphviz_complex.jl @@ -26,9 +26,10 @@ MAX_GRAPHS_RUN = 3 function execution(graphs_map) graphs_being_run = Set{Int}() graphs_dict = Dict{Int, String}() - graphs_tasks = Dict{Int,Dagger.DTask}() - graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) - notifications = RemoteChannel(()->Channel{Int}(32)) + graphs_tasks = Dict{Int, Dagger.DTask}() + graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, + OUTPUT_GRAPH_IMAGE_PATH) + notifications = RemoteChannel(() -> Channel{Int}(32)) # notifications = Channel{Int}(32) for (i, (g_name, g)) in enumerate(graphs) graphs_dict[i] = g_name @@ -38,7 +39,8 @@ function execution(graphs_map) delete!(graphs_tasks, i) println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])") end - graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, i) + graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, + i) push!(graphs_being_run, i) println("Dispatcher: scheduled graph $i: $g_name") end @@ -64,29 +66,28 @@ function execution(graphs_map) end function main(graphs_map) - Dagger.enable_logging!(tasknames=true, - taskdeps=true, - taskargs=true, - taskargmoves=true, - ) + Dagger.enable_logging!(tasknames = true, + taskdeps = true, + taskargs = true, + taskargmoves = true) @time execution(graphs_map) - graph = Dagger.render_logs(Dagger.fetch_logs!(), :graphviz, disconnected=true, color_by=:proc) + graph = Dagger.render_logs(Dagger.fetch_logs!(), :graphviz, disconnected = true, + color_by = :proc) surface = Cairo.CairoSVGSurface(IOBuffer(), 7000, 2000) context = Cairo.CairoContext(surface) GraphViz.render(context, graph) - img_name = FrameworkDemo.timestamp_string("$output_dir/render_graphviz_complex") * ".png" + img_name = FrameworkDemo.timestamp_string("$output_dir/render_graphviz_complex") * + ".png" write_to_png(surface, img_name) end -graphs_map = Dict{String, String}( -"graph1" => graph1_path, -"graph2" => graph2_path, -"graph3" => graph1_path, -"graph4" => graph2_path -) +graphs_map = Dict{String, String}("graph1" => graph1_path, + "graph2" => graph2_path, + "graph3" => graph1_path, + "graph4" => graph2_path) main(graphs_map) rmprocs!(Dagger.Sch.eager_context(), workers()) -rmprocs(workers()) \ No newline at end of file +rmprocs(workers()) diff --git a/examples/logging/processing_and_visualizing_logs/render_logs/render_graphviz_simple.jl b/examples/logging/processing_and_visualizing_logs/render_logs/render_graphviz_simple.jl index 38d807c..bd3befd 100644 --- a/examples/logging/processing_and_visualizing_logs/render_logs/render_graphviz_simple.jl +++ b/examples/logging/processing_and_visualizing_logs/render_logs/render_graphviz_simple.jl @@ -10,18 +10,18 @@ include("../../../dummy_tasks.jl") output_dir = "examples/results" mkpath(output_dir) -Dagger.enable_logging!(tasknames=true, -taskdeps=true, -taskargs=true, -taskargmoves=true, -) +Dagger.enable_logging!(tasknames = true, + taskdeps = true, + taskargs = true, + taskargmoves = true) a = modernAPI_graph_setup(0.1) ctx = Dagger.Sch.eager_context() println(fetch(a)) -graph = Dagger.render_logs(Dagger.fetch_logs!(), :graphviz, disconnected=true, color_by=:proc) +graph = Dagger.render_logs(Dagger.fetch_logs!(), :graphviz, disconnected = true, + color_by = :proc) surface = Cairo.CairoSVGSurface(IOBuffer(), 7000, 2000) context = Cairo.CairoContext(surface) diff --git a/examples/logging/processing_and_visualizing_logs/render_logs/render_plots_gantt.jl b/examples/logging/processing_and_visualizing_logs/render_logs/render_plots_gantt.jl index da2fdaa..60aa285 100644 --- a/examples/logging/processing_and_visualizing_logs/render_logs/render_plots_gantt.jl +++ b/examples/logging/processing_and_visualizing_logs/render_logs/render_plots_gantt.jl @@ -28,9 +28,10 @@ MAX_GRAPHS_RUN = 3 function execution(graphs_map) graphs_being_run = Set{Int}() graphs_dict = Dict{Int, String}() - graphs_tasks = Dict{Int,Dagger.DTask}() - graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) - notifications = RemoteChannel(()->Channel{Int}(32)) + graphs_tasks = Dict{Int, Dagger.DTask}() + graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, + OUTPUT_GRAPH_IMAGE_PATH) + notifications = RemoteChannel(() -> Channel{Int}(32)) # notifications = Channel{Int}(32) for (i, (g_name, g)) in enumerate(graphs) graphs_dict[i] = g_name @@ -40,7 +41,8 @@ function execution(graphs_map) delete!(graphs_tasks, i) println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])") end - graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, i) + graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, + i) push!(graphs_being_run, i) println("Dispatcher: scheduled graph $i: $g_name") end @@ -66,11 +68,10 @@ function execution(graphs_map) end function main(graphs_map) - Dagger.enable_logging!(tasknames=true, - taskdeps=true, - taskargs=true, - taskargmoves=true, - ) + Dagger.enable_logging!(tasknames = true, + taskdeps = true, + taskargs = true, + taskargmoves = true) @time execution(graphs_map) @@ -78,13 +79,11 @@ function main(graphs_map) display(plot) end -graphs_map = Dict{String, String}( -"graph1" => graph1_path, -"graph2" => graph2_path, -"graph3" => graph1_path, -"graph4" => graph2_path -) +graphs_map = Dict{String, String}("graph1" => graph1_path, + "graph2" => graph2_path, + "graph3" => graph1_path, + "graph4" => graph2_path) main(graphs_map) rmprocs!(Dagger.Sch.eager_context(), workers()) -rmprocs(workers()) \ No newline at end of file +rmprocs(workers()) diff --git a/examples/logging/processing_and_visualizing_logs/render_logs/render_plots_gantt_ps.jl b/examples/logging/processing_and_visualizing_logs/render_logs/render_plots_gantt_ps.jl index e84d2b1..ccc6f63 100644 --- a/examples/logging/processing_and_visualizing_logs/render_logs/render_plots_gantt_ps.jl +++ b/examples/logging/processing_and_visualizing_logs/render_logs/render_plots_gantt_ps.jl @@ -28,9 +28,10 @@ MAX_GRAPHS_RUN = 3 function execution(graphs_map) graphs_being_run = Set{Int}() graphs_dict = Dict{Int, String}() - graphs_tasks = Dict{Int,Dagger.DTask}() - graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) - notifications = RemoteChannel(()->Channel{Int}(32)) + graphs_tasks = Dict{Int, Dagger.DTask}() + graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, + OUTPUT_GRAPH_IMAGE_PATH) + notifications = RemoteChannel(() -> Channel{Int}(32)) # notifications = Channel{Int}(32) for (i, (g_name, g)) in enumerate(graphs) graphs_dict[i] = g_name @@ -40,7 +41,8 @@ function execution(graphs_map) delete!(graphs_tasks, i) println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])") end - graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, i) + graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, + i) push!(graphs_being_run, i) println("Dispatcher: scheduled graph $i: $g_name") end @@ -66,11 +68,10 @@ function execution(graphs_map) end function main(graphs_map) - Dagger.enable_logging!(tasknames=true, - taskdeps=true, - taskargs=true, - taskargmoves=true, - ) + Dagger.enable_logging!(tasknames = true, + taskdeps = true, + taskargs = true, + taskargmoves = true) @time execution(graphs_map) @@ -78,13 +79,11 @@ function main(graphs_map) display(plot) end -graphs_map = Dict{String, String}( -"graph1" => graph1_path, -"graph2" => graph2_path, -"graph3" => graph1_path, -"graph4" => graph2_path -) +graphs_map = Dict{String, String}("graph1" => graph1_path, + "graph2" => graph2_path, + "graph3" => graph1_path, + "graph4" => graph2_path) main(graphs_map) rmprocs!(Dagger.Sch.eager_context(), workers()) -rmprocs(workers()) \ No newline at end of file +rmprocs(workers()) diff --git a/examples/logging/processing_and_visualizing_logs/webdash.jl b/examples/logging/processing_and_visualizing_logs/webdash.jl index 7793a18..be38044 100644 --- a/examples/logging/processing_and_visualizing_logs/webdash.jl +++ b/examples/logging/processing_and_visualizing_logs/webdash.jl @@ -15,7 +15,7 @@ ml[:profile] = DaggerWebDash.ProfileMetrics() ctx.profile = true # Create a LogWindow; necessary for real-time event updates -lw = TimespanLogging.Events.LogWindow(20*10^9, :core) +lw = TimespanLogging.Events.LogWindow(20 * 10^9, :core) ml.aggregators[:logwindow] = lw # Create the D3Renderer server on port 8080 @@ -24,11 +24,13 @@ d3r = DaggerWebDash.D3Renderer(8080) ## Add some plots! Rendered top-down in order # Show an overview of all generated events as a Gantt chart -push!(d3r, DaggerWebDash.GanttPlot(:core, :id, :esat, :psat; title="Overview")) +push!(d3r, DaggerWebDash.GanttPlot(:core, :id, :esat, :psat; title = "Overview")) # Show various numerical events as line plots over time push!(d3r, DaggerWebDash.LinePlot(:core, :wsat, "Worker Saturation", "Running Tasks")) -push!(d3r, DaggerWebDash.LinePlot(:core, :loadavg, "CPU Load Average", "Average Running Threads")) +push!(d3r, + DaggerWebDash.LinePlot(:core, :loadavg, "CPU Load Average", + "Average Running Threads")) push!(d3r, DaggerWebDash.LinePlot(:core, :bytes, "Allocated Bytes", "Bytes")) push!(d3r, DaggerWebDash.LinePlot(:core, :mem, "Available Memory", "% Free")) diff --git a/examples/parsing_graphs.jl b/examples/parsing_graphs.jl index 7efce40..3459ede 100644 --- a/examples/parsing_graphs.jl +++ b/examples/parsing_graphs.jl @@ -10,4 +10,4 @@ end if abspath(PROGRAM_FILE) == @__FILE__ main() -end \ No newline at end of file +end diff --git a/examples/processors_example.jl b/examples/processors_example.jl index 4d33320..ea29576 100644 --- a/examples/processors_example.jl +++ b/examples/processors_example.jl @@ -2,7 +2,7 @@ using Distributed -ps1 = addprocs(2, exeflags="--project") +ps1 = addprocs(2, exeflags = "--project") @everywhere using Distributed, Dagger # Dummy task to wait for 0.5 seconds and then return the id of the worker @@ -13,7 +13,7 @@ ctx = Context() job = @async collect(ctx, ts) # Lets fire up some new workers -ps2 = addprocs(2, exeflags="--project") +ps2 = addprocs(2, exeflags = "--project") @everywhere ps2 using Distributed, Dagger # New workers are not available until we do this addprocs!(ctx, ps2) @@ -22,4 +22,4 @@ addprocs!(ctx, ps2) @show fetch(job) |> unique # and cleanup after ourselves... -workers() |> rmprocs \ No newline at end of file +workers() |> rmprocs diff --git a/examples/schedule.jl b/examples/schedule.jl index 6fc1f53..3387af6 100644 --- a/examples/schedule.jl +++ b/examples/schedule.jl @@ -10,7 +10,6 @@ using MetaGraphs using FrameworkDemo using FrameworkDemo.ModGraphVizSimple # This is a workaround to make visualization work until the bugs are fixed in the package. - # Defining constants output_dir = "results" graph1_path = "./data/demo/sequencer/df.graphml" @@ -27,9 +26,10 @@ MAX_GRAPHS_RUN = 3 function execution(graphs_map) graphs_being_run = Set{Int}() graphs_dict = Dict{Int, String}() - graphs_tasks = Dict{Int,Dagger.DTask}() - graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) - notifications = RemoteChannel(()->Channel{Int}(32)) + graphs_tasks = Dict{Int, Dagger.DTask}() + graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, + OUTPUT_GRAPH_IMAGE_PATH) + notifications = RemoteChannel(() -> Channel{Int}(32)) # notifications = Channel{Int}(32) coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients() @@ -41,7 +41,8 @@ function execution(graphs_map) delete!(graphs_tasks, i) println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])") end - graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, i, coefficients) + graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, + i, coefficients) push!(graphs_being_run, i) println("Dispatcher: scheduled graph $i: $g_name") end @@ -76,15 +77,12 @@ function main(graphs_map) FrameworkDemo.ModGraphVizSimple.show_logs(io, logs, :graphviz_simple) end FrameworkDemo.dot_to_png(LOGS_FILE, GRAPH_IMAGE_PATH, 7000, 8000) # adjust picture size, if needed (optional param) - end -graphs_map = Dict{String, String}( -"graph1" => graph1_path, -"graph2" => graph2_path, -"graph3" => graph1_path, -"graph4" => graph2_path -) +graphs_map = Dict{String, String}("graph1" => graph1_path, + "graph2" => graph2_path, + "graph3" => graph1_path, + "graph4" => graph2_path) if abspath(PROGRAM_FILE) == @__FILE__ mkpath(output_dir) diff --git a/src/ModGraphVizSimple.jl b/src/ModGraphVizSimple.jl index 2a495cb..d6ebd04 100644 --- a/src/ModGraphVizSimple.jl +++ b/src/ModGraphVizSimple.jl @@ -20,9 +20,9 @@ import Dagger.TimespanLogging: Timespan global _part_labels = Dict() -function write_node(io, t::Chunk, c, ctx=nothing) - _part_labels[t]="part_$c" - c+1 +function write_node(io, t::Chunk, c, ctx = nothing) + _part_labels[t] = "part_$c" + c + 1 end function node_id(t::Thunk) @@ -55,7 +55,7 @@ end # Modified version of the function from Dagger compute.jl function custom_dependents(node::Thunk) - deps = Dict{Union{Thunk,Chunk}, Set{Thunk}}() + deps = Dict{Union{Thunk, Chunk}, Set{Thunk}}() visited = Set{Thunk}() to_visit = Set{Thunk}() push!(to_visit, node) @@ -72,7 +72,7 @@ function custom_dependents(node::Thunk) end inp = unwrapped if istask(inp) || (inp isa Chunk) - s = get!(()->Set{Thunk}(), deps, inp) + s = get!(() -> Set{Thunk}(), deps, inp) push!(s, next) if istask(inp) && !(inp in visited) push!(to_visit, inp) @@ -86,7 +86,7 @@ end # Writing DAG using DTask involves unwrapping WeakRefs, which might return `nothing` if garbage collected, # so the part of the DAG is not displayed. This is an unstable behavior, so disabled by default. -function write_dag(io, e::DTask, stable::Bool=true) +function write_dag(io, e::DTask, stable::Bool = true) if (stable) throw(ArgumentError("Writing DAG for DTask is not supported by default. Use the logs instead.")) else @@ -100,7 +100,7 @@ function write_dag(io, t::Thunk) # Chunk/Thunk nodes deps = custom_dependents(t) - c=1 + c = 1 for k in keys(deps) c = write_node(io, k, c) end @@ -113,11 +113,11 @@ function write_dag(io, t::Thunk) end # Argument nodes (not Chunks/Thunks) - argmap = Dict{Int,Vector}() + argmap = Dict{Int, Vector}() getargs!(argmap, t) - argids = IdDict{Any,String}() + argids = IdDict{Any, String}() for id in keys(argmap) - for (argidx,arg) in argmap[id] + for (argidx, arg) in argmap[id] name = "arg_$(argidx)_to_$(id)" if !isimmutable(arg) if arg in keys(argids) @@ -137,9 +137,9 @@ end ### Timespan-based graphing -pretty_time(ts::Timespan) = pretty_time(ts.finish-ts.start) +pretty_time(ts::Timespan) = pretty_time(ts.finish - ts.start) function pretty_time(t) - r(t) = round(t; digits=3) + r(t) = round(t; digits = 3) if t > 1000^3 "$(r(t/(1000^3))) s" elseif t > 1000^2 @@ -165,66 +165,74 @@ function pretty_size(sz) end node_label(x) = repr(x) -node_label(x::T) where T<:AbstractArray = +function node_label(x::T) where {T <: AbstractArray} "$T\nShape: $(size(x))\nSize: $(pretty_size(sizeof(x)))" +end node_label(x::Chunk) = "Chunk on $(x.processor)" node_proc(x) = nothing node_proc(x::Chunk) = x.processor -_proc_color(ctx, proc::Processor) = get!(ctx.proc_to_color, proc) do - _color = ctx.proc_colors[ctx.proc_color_idx[]] - ctx.proc_color_idx[] = clamp(ctx.proc_color_idx[]+1, 0, 128) - "#$(Colors.hex(_color))" +function _proc_color(ctx, proc::Processor) + get!(ctx.proc_to_color, proc) do + _color = ctx.proc_colors[ctx.proc_color_idx[]] + ctx.proc_color_idx[] = clamp(ctx.proc_color_idx[] + 1, 0, 128) + "#$(Colors.hex(_color))" + end end _proc_color(ctx, id::Int) = _proc_color(ctx, ctx.id_to_proc[id]) _proc_color(ctx, ::Nothing) = "black" -_proc_shape(ctx, proc::Processor) = get!(ctx.proc_to_shape, typeof(proc)) do - _shape = ctx.proc_shapes[ctx.proc_shape_idx[]] - ctx.proc_shape_idx[] = clamp(ctx.proc_shape_idx[]+1, 0, length(ctx.proc_shapes)) - _shape +function _proc_shape(ctx, proc::Processor) + get!(ctx.proc_to_shape, typeof(proc)) do + _shape = ctx.proc_shapes[ctx.proc_shape_idx[]] + ctx.proc_shape_idx[] = clamp(ctx.proc_shape_idx[] + 1, 0, length(ctx.proc_shapes)) + _shape + end end _proc_shape(ctx, ::Nothing) = "ellipse" -function write_node(io, t::Thunk, c, ctx=nothing) +function write_node(io, t::Thunk, c, ctx = nothing) f = isa(t.f, Function) ? "$(t.f)" : "fn" println(io, "$(node_name(t)) [label=\"$f - $(t.id)\"];") c end dec(x) = Base.dec(x, 0, false) -function write_node(io, t, c, ctx, id=dec(hash(t))) - l = replace(node_label(t), "\""=>"") +function write_node(io, t, c, ctx, id = dec(hash(t))) + l = replace(node_label(t), "\"" => "") proc = node_proc(t) color = _proc_color(ctx, proc) shape = _proc_shape(ctx, proc) - println(io, "$(node_name(id)) [label=\"$l\",color=\"$color\",shape=\"$shape\",penwidth=5];") + println(io, + "$(node_name(id)) [label=\"$l\",color=\"$color\",shape=\"$shape\",penwidth=5];") c end function write_node(io, t, c, name::String) - l = replace(node_label(t), "\""=>"") + l = replace(node_label(t), "\"" => "") println(io, "$(node_name(name)) [label=\"$l\"];") c end function write_node(io, ts::Timespan, c, ctx) - (;thunk_id, processor) = ts.id - (;f) = ts.timeline + (; thunk_id, processor) = ts.id + (; f) = ts.timeline f = isa(f, Function) ? "$f" : "fn" t_comp = pretty_time(ts) color = _proc_color(ctx, processor) shape = _proc_shape(ctx, processor) # TODO: t_log = log(ts.finish - ts.start) / 5 ctx.id_to_proc[thunk_id] = processor - println(io, "$(node_name(thunk_id)) [label=\"$f\n$t_comp\",color=\"$color\",shape=\"$shape\",penwidth=5];") + println(io, + "$(node_name(thunk_id)) [label=\"$f\n$t_comp\",color=\"$color\",shape=\"$shape\",penwidth=5];") # TODO: "\n Thunk $(ts.id)\nResult Type: $res_type\nResult Size: $sz_comp\", c end -function write_edge(io, ts_move::Timespan, logs, ctx, inputname=nothing, inputarg=nothing) - (;thunk_id, id) = ts_move.id - (;f,) = ts_move.timeline +function write_edge(io, ts_move::Timespan, logs, ctx, inputname = nothing, + inputarg = nothing) + (; thunk_id, id) = ts_move.id + (; f,) = ts_move.timeline t_move = pretty_time(ts_move) if id > 0 print(io, "$(node_name(id)) -> $(node_name(thunk_id)) [label=\"Move: $t_move") @@ -232,7 +240,8 @@ function write_edge(io, ts_move::Timespan, logs, ctx, inputname=nothing, inputar else @assert inputname !== nothing @assert inputarg !== nothing - print(io, "$(node_name(inputname)) -> $(node_name(thunk_id)) [label=\"Move: $t_move") + print(io, + "$(node_name(inputname)) -> $(node_name(thunk_id)) [label=\"Move: $t_move") proc = node_proc(inputarg) color_src = _proc_color(ctx, proc) end @@ -241,8 +250,12 @@ function write_edge(io, ts_move::Timespan, logs, ctx, inputname=nothing, inputar println(io, "\",color=\"$color_src;0.5:$color_dst\",penwidth=2];") end -write_edge(io, from::String, to::String, ctx=nothing) = println(io, "$(node_name(from)) -> $(node_name(to));") -write_edge(io, from::String, to::Int, ctx=nothing) = println(io, "$(node_name(from)) -> $(node_name(to));") +function write_edge(io, from::String, to::String, ctx = nothing) + println(io, "$(node_name(from)) -> $(node_name(to));") +end +function write_edge(io, from::String, to::Int, ctx = nothing) + println(io, "$(node_name(from)) -> $(node_name(to));") +end convert_to_thunk(t::Thunk) = t convert_to_thunk(t::DTask) = Dagger.Sch._find_thunk(t) @@ -251,8 +264,8 @@ getargs!(d, t) = nothing function getargs!(d, t::Thunk) raw_inputs = map(last, t.inputs) - d[t.id] = [filter(x->!istask(x[2]), collect(enumerate(raw_inputs)))...,] - foreach(i->getargs!(d, i), raw_inputs) + d[t.id] = [filter(x -> !istask(x[2]), collect(enumerate(raw_inputs)))...] + foreach(i -> getargs!(d, i), raw_inputs) end function getargs!(d, e::DTask) @@ -260,30 +273,30 @@ function getargs!(d, e::DTask) end # DTask is not used in the current implementation, as it would be unstable, and the logs provide all the necessary information -function write_dag(io, logs::Vector, t::Union{Thunk, DTask, Nothing}=nothing) - ctx = (proc_to_color = Dict{Processor,String}(), +function write_dag(io, logs::Vector, t::Union{Thunk, DTask, Nothing} = nothing) + ctx = (proc_to_color = Dict{Processor, String}(), proc_colors = Colors.distinguishable_colors(128), proc_color_idx = Ref{Int}(1), - proc_to_shape = Dict{Type,String}(), - proc_shapes = ("ellipse","box","triangle"), + proc_to_shape = Dict{Type, String}(), + proc_shapes = ("ellipse", "box", "triangle"), proc_shape_idx = Ref{Int}(1), - id_to_proc = Dict{Int,Processor}()) + id_to_proc = Dict{Int, Processor}()) c = 1 # Compute nodes - for ts in filter(x->x.category==:compute, logs) + for ts in filter(x -> x.category == :compute, logs) c = write_node(io, ts, c, ctx) end # Argument nodes & edges - argmap = Dict{Int,Vector}() - argids = IdDict{Any,String}() + argmap = Dict{Int, Vector}() + argids = IdDict{Any, String}() if (isa(t, Thunk)) # Then can get info from the Thunk getargs!(argmap, t) - argnodemap = Dict{Int,Vector{String}}() + argnodemap = Dict{Int, Vector{String}}() for id in keys(argmap) nodes = String[] arg_c = 1 - for (argidx,arg) in argmap[id] + for (argidx, arg) in argmap[id] name = "arg_$(argidx)_to_$(id)" if !isimmutable(arg) if arg in keys(argids) @@ -298,9 +311,9 @@ function write_dag(io, logs::Vector, t::Union{Thunk, DTask, Nothing}=nothing) push!(nodes, name) end # Arg-to-compute edges - for ts in filter(x->x.category==:move && - x.id.thunk_id==id && - x.id.id==-argidx, logs) + for ts in filter(x -> x.category == :move && + x.id.thunk_id == id && + x.id.id == -argidx, logs) write_edge(io, ts, logs, ctx, name, arg) end arg_c += 1 @@ -308,8 +321,8 @@ function write_dag(io, logs::Vector, t::Union{Thunk, DTask, Nothing}=nothing) argnodemap[id] = nodes end else # Rely on the logs only - for ts in filter(x->x.category==:move && x.id.id < 0, logs) - (;thunk_id, id) = ts.id + for ts in filter(x -> x.category == :move && x.id.id < 0, logs) + (; thunk_id, id) = ts.id arg = ts.timeline[2] name = "arg_$(-id)_to_$(thunk_id)" if !isimmutable(arg) @@ -322,14 +335,14 @@ function write_dag(io, logs::Vector, t::Union{Thunk, DTask, Nothing}=nothing) else c = write_node(io, arg, c, ctx, name) end - + # Arg-to-compute edges write_edge(io, ts, logs, ctx, name, arg) end end - + # Move edges - for ts in filter(x->x.category==:move && x.id.id>0, logs) + for ts in filter(x -> x.category == :move && x.id.id > 0, logs) write_edge(io, ts, logs, ctx) end #= FIXME: Legend (currently it's laid out horizontally) @@ -357,20 +370,25 @@ function _show_plan(io::IO, t) write_dag(io, t) println(io, "}") end -function _show_plan(io::IO, t::Union{Thunk,DTask}, logs::Vector{Timespan}) +function _show_plan(io::IO, t::Union{Thunk, DTask}, logs::Vector{Timespan}) println(io, """strict digraph { graph [layout=dot,rankdir=LR];""") write_dag(io, logs, t) println(io, "}") end -show_logs(io::IO, logs, vizmode::Symbol; options...) = +function show_logs(io::IO, logs, vizmode::Symbol; options...) show_logs(io, logs, Val{vizmode}(); options...) -show_logs(io::IO, t, logs, vizmode::Symbol; options...) = +end +function show_logs(io::IO, t, logs, vizmode::Symbol; options...) show_logs(io, t, logs, Val{vizmode}(); options...) +end -show_logs(io::IO, t::Union{Thunk,DTask}, ::Val{:graphviz_simple}) = _show_plan(io, t) +show_logs(io::IO, t::Union{Thunk, DTask}, ::Val{:graphviz_simple}) = _show_plan(io, t) show_logs(io::IO, logs::Vector{Timespan}, ::Val{:graphviz_simple}) = _show_plan(io, logs) -show_logs(io::IO, t::Union{Thunk,DTask}, logs::Vector{Timespan}, ::Val{:graphviz_simple}) = _show_plan(io, t, logs) +function show_logs(io::IO, t::Union{Thunk, DTask}, logs::Vector{Timespan}, + ::Val{:graphviz_simple}) + _show_plan(io, t, logs) +end end diff --git a/src/cpu_crunching.jl b/src/cpu_crunching.jl index fd5c663..33898eb 100644 --- a/src/cpu_crunching.jl +++ b/src/cpu_crunching.jl @@ -6,7 +6,7 @@ function find_primes(n_max::Int) for n in 3:n_max isPrime = true - for y in 2:n÷2 + for y in 2:(n ÷ 2) if n % y == 0 isPrime = false break @@ -30,14 +30,14 @@ function benchmark_prime(n::Int) end function calculate_coefficients()::Vector{Float64} - n_max = [1000,200_000] + n_max = [1000, 200_000] t_average = benchmark_prime.(n_max) return inv([n_max[i]^j for i in 1:2, j in 1:2]) * t_average end function crunch_for_seconds(t::Float64, coefficients::Vector{Float64}) - (b,a) = coefficients + (b, a) = coefficients n = ceil(Int, (-b + sqrt(abs(b^2 + 4a * t))) / 2a) find_primes(n) end diff --git a/src/logging.jl b/src/logging.jl index e55589d..bb59a55 100644 --- a/src/logging.jl +++ b/src/logging.jl @@ -20,7 +20,7 @@ function configure_webdash_multievent() ctx.profile = true # Create a LogWindow; necessary for real-time event updates - lw = TimespanLogging.Events.LogWindow(20*10^9, :core) + lw = TimespanLogging.Events.LogWindow(20 * 10^9, :core) ml.aggregators[:logwindow] = lw # Create the D3Renderer server on port 8080 @@ -29,11 +29,13 @@ function configure_webdash_multievent() ## Add some plots! Rendered top-down in order # Show an overview of all generated events as a Gantt chart - push!(d3r, DaggerWebDash.GanttPlot(:core, :id, :esat, :psat; title="Overview")) + push!(d3r, DaggerWebDash.GanttPlot(:core, :id, :esat, :psat; title = "Overview")) # Show various numerical events as line plots over time push!(d3r, DaggerWebDash.LinePlot(:core, :wsat, "Worker Saturation", "Running Tasks")) - push!(d3r, DaggerWebDash.LinePlot(:core, :loadavg, "CPU Load Average", "Average Running Threads")) + push!(d3r, + DaggerWebDash.LinePlot(:core, :loadavg, "CPU Load Average", + "Average Running Threads")) push!(d3r, DaggerWebDash.LinePlot(:core, :bytes, "Allocated Bytes", "Bytes")) push!(d3r, DaggerWebDash.LinePlot(:core, :mem, "Available Memory", "% Free")) @@ -70,10 +72,10 @@ function fetch_LocalEventLog() ctx = Dagger.Sch.eager_context() logs = Dagger.TimespanLogging.get_logs!(ctx.log_sink) # str = Dagger.show_plan() - doesn't work (exist) - return logs + return logs end -function my_show_plan(io::IO, logs::Vector{Dagger.TimespanLogging.Timespan}, t=nothing) +function my_show_plan(io::IO, logs::Vector{Dagger.TimespanLogging.Timespan}, t = nothing) println(io, """strict digraph { graph [layout=dot,rankdir=LR];""") ModGraphVizSimple.write_dag(io, t, logs) @@ -96,4 +98,4 @@ function save_logs(log_file, logs) open(log_file, "w") do io write(io, logs) end -end \ No newline at end of file +end diff --git a/src/scheduling.jl b/src/scheduling.jl index 5cce408..5213b86 100644 --- a/src/scheduling.jl +++ b/src/scheduling.jl @@ -7,10 +7,10 @@ struct MockupAlgorithm name::String runtime::Float64 input_length::UInt - MockupAlgorithm(graph::MetaDiGraph, vertex_id::Int) = begin + function MockupAlgorithm(graph::MetaDiGraph, vertex_id::Int) name = get_prop(graph, vertex_id, :node_id) if has_prop(graph, vertex_id, :runtime_average_s) - runtime = get_prop(graph, vertex_id, :runtime_average_s) + runtime = get_prop(graph, vertex_id, :runtime_average_s) else runtime = alg_default_runtime_s @warn "Runtime not provided for $name algorithm. Using default value $runtime" @@ -22,7 +22,7 @@ end alg_default_runtime_s::Float64 = 0 -function (alg::MockupAlgorithm)(args...; coefficients::Union{Vector{Float64},Missing}) +function (alg::MockupAlgorithm)(args...; coefficients::Union{Vector{Float64}, Missing}) println("Executing $(alg.name)") if coefficients isa Vector{Float64} crunch_for_seconds(alg.runtime, coefficients) @@ -31,7 +31,8 @@ function (alg::MockupAlgorithm)(args...; coefficients::Union{Vector{Float64},Mis return alg.name end -function notify_graph_finalization(notifications::RemoteChannel, graph_id::Int, terminating_results...) +function notify_graph_finalization(notifications::RemoteChannel, graph_id::Int, + terminating_results...) println("Graph $graph_id: all tasks in the graph finished!") put!(notifications, graph_id) println("Graph $graph_id: notified!") @@ -47,18 +48,19 @@ function is_terminating_alg(graph::AbstractGraph, vertex_id::Int) all(is_terminating, successor_dataobjects) end -function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int, coefficients::Union{Dagger.Shard,Nothing}) +function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int, + coefficients::Union{Dagger.Shard, Nothing}) incoming_data = get_promises(graph, inneighbors(graph, vertex_id)) algorithm = MockupAlgorithm(graph, vertex_id) if isnothing(coefficients) - alg_helper(data...) = algorithm(data...; coefficients=missing) + alg_helper(data...) = algorithm(data...; coefficients = missing) return Dagger.@spawn alg_helper(incoming_data...) else - return Dagger.@spawn algorithm(incoming_data...; coefficients=coefficients) + return Dagger.@spawn algorithm(incoming_data...; coefficients = coefficients) end end -function schedule_graph(graph::MetaDiGraph, coefficients::Union{Dagger.Shard,Nothing}) +function schedule_graph(graph::MetaDiGraph, coefficients::Union{Dagger.Shard, Nothing}) alg_vertices = MetaGraphs.filter_vertices(graph, :type, "Algorithm") sorted_vertices = MetaGraphs.topological_sort(graph) @@ -77,30 +79,28 @@ function schedule_graph(graph::MetaDiGraph, coefficients::Union{Dagger.Shard,Not return terminating_results end - - -function calibrate_crunch(; fast::Bool=false)::Union{Dagger.Shard,Nothing} +function calibrate_crunch(; fast::Bool = false)::Union{Dagger.Shard, Nothing} return fast ? nothing : Dagger.@shard calculate_coefficients() end function run_events(graph::MetaDiGraph; - event_count::Int, - max_concurrent::Int, - fast::Bool=false) - - graphs_tasks = Dict{Int,Dagger.DTask}() - notifications = RemoteChannel(()->Channel{Int}(32)) - coefficients = FrameworkDemo.calibrate_crunch(;fast=fast) + event_count::Int, + max_concurrent::Int, + fast::Bool = false) + graphs_tasks = Dict{Int, Dagger.DTask}() + notifications = RemoteChannel(() -> Channel{Int}(32)) + coefficients = FrameworkDemo.calibrate_crunch(; fast = fast) for idx in 1:event_count - while length(graphs_tasks) >= max_concurrent + while length(graphs_tasks) >= max_concurrent finished_graph_id = take!(notifications) delete!(graphs_tasks, finished_graph_id) println("Dispatcher: graph finished - graph $finished_graph_id") end terminating_results = FrameworkDemo.schedule_graph(graph, coefficients) - graphs_tasks[idx] = Dagger.@spawn notify_graph_finalization(notifications, idx, terminating_results...) + graphs_tasks[idx] = Dagger.@spawn notify_graph_finalization(notifications, idx, + terminating_results...) println("Dispatcher: scheduled graph $idx") end diff --git a/src/visualization.jl b/src/visualization.jl index 194806b..489f5e7 100644 --- a/src/visualization.jl +++ b/src/visualization.jl @@ -2,7 +2,7 @@ using GraphViz using FileIO using Cairo -function dot_to_png(in, out, width=7000, height=2000) +function dot_to_png(in, out, width = 7000, height = 2000) dot_code = read(in, String) graph = GraphViz.load(IOBuffer(dot_code)) GraphViz.layout!(graph) diff --git a/test/demo_workflows.jl b/test/demo_workflows.jl index a011349..73110bd 100644 --- a/test/demo_workflows.jl +++ b/test/demo_workflows.jl @@ -1,7 +1,7 @@ using FrameworkDemo using Dagger -function run_demo(name::String, coefficients::Union{Dagger.Shard,Nothing}) +function run_demo(name::String, coefficients::Union{Dagger.Shard, Nothing}) @testset "$name" begin println("Running $(name) workflow demo") path = joinpath(pkgdir(FrameworkDemo), "data/demo/$(name)/df.graphml") @@ -10,10 +10,10 @@ function run_demo(name::String, coefficients::Union{Dagger.Shard,Nothing}) end end -@testset verbose = true "Demo workflows" begin +@testset verbose=true "Demo workflows" begin Dagger.disable_logging!() is_fast = "no-fast" ∉ ARGS - coefficients = FrameworkDemo.calibrate_crunch(; fast=is_fast) + coefficients = FrameworkDemo.calibrate_crunch(; fast = is_fast) run(name) = run_demo(name, coefficients) run("sequential") run("sequential_terminated") diff --git a/test/parsing.jl b/test/parsing.jl index 6fcf0a8..2b6dcd9 100644 --- a/test/parsing.jl +++ b/test/parsing.jl @@ -40,4 +40,4 @@ using MetaGraphs @test has_edge(graph, graph["9", :original_id], graph["3", :original_id]) @test has_edge(graph, graph["10", :original_id], graph["4", :original_id]) @test has_edge(graph, graph["11", :original_id], graph["5", :original_id]) -end \ No newline at end of file +end diff --git a/test/runtests.jl b/test/runtests.jl index f0121fd..8daea21 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,16 +1,14 @@ using Distributed using Test -@info( - "Execution environment details", - julia_version = VERSION, - n_workers = Distributed.nworkers(), - n_procs = Distributed.nprocs(), - n_threads = Threads.nthreads(), - test_args = repr(ARGS) -) +@info("Execution environment details", + julia_version=VERSION, + n_workers=Distributed.nworkers(), + n_procs=Distributed.nprocs(), + n_threads=Threads.nthreads(), + test_args=repr(ARGS)) -@testset verbose = true "FrameworkDemo.jl" begin +@testset verbose=true "FrameworkDemo.jl" begin include("parsing.jl") include("scheduling.jl") include("demo_workflows.jl") diff --git a/test/scheduling.jl b/test/scheduling.jl index 223b134..f11c052 100644 --- a/test/scheduling.jl +++ b/test/scheduling.jl @@ -4,21 +4,21 @@ using Graphs using MetaGraphs function get_alg_timeline(logs::Dict) - timeline = Dict{Int,Any}() + timeline = Dict{Int, Any}() Dagger.logs_event_pairs(logs) do w, start_idx, finish_idx category = logs[w][:core][start_idx].category if category == :compute tid = logs[w][:id][start_idx].thunk_id t_start = logs[w][:core][start_idx].timestamp t_stop = logs[w][:core][finish_idx].timestamp - timeline[tid] = (start=t_start, stop=t_stop) + timeline[tid] = (start = t_start, stop = t_stop) end end return timeline end function get_alg_deps(logs::Dict) - task_deps = Dict{Int,Set{Int}}() + task_deps = Dict{Int, Set{Int}}() for w in keys(logs) for idx in 1:length(logs[w][:core]) category = logs[w][:core][idx].category @@ -34,17 +34,16 @@ function get_alg_deps(logs::Dict) return task_deps end - -@testset verbose = true "Scheduling" begin +@testset verbose=true "Scheduling" begin path = joinpath(pkgdir(FrameworkDemo), "data/demo/datadeps/df.graphml") graph = FrameworkDemo.parse_graphml(path) ilength(x) = sum(_ -> 1, x) # no standard length for MetaGraphs.filter_vertices iterator algorithms_count = ilength(MetaGraphs.filter_vertices(graph, :type, "Algorithm")) set_indexing_prop!(graph, :node_id) is_fast = "no-fast" ∉ ARGS - coefficients = FrameworkDemo.calibrate_crunch(;fast=is_fast) + coefficients = FrameworkDemo.calibrate_crunch(; fast = is_fast) - Dagger.enable_logging!(tasknames=true, taskdeps=true) + Dagger.enable_logging!(tasknames = true, taskdeps = true) _ = Dagger.fetch_logs!() # flush logs tasks = FrameworkDemo.schedule_graph(graph, coefficients) diff --git a/tools/gaudi_alg_exec_dist.jl b/tools/gaudi_alg_exec_dist.jl index 8484b65..bbc9146 100644 --- a/tools/gaudi_alg_exec_dist.jl +++ b/tools/gaudi_alg_exec_dist.jl @@ -12,11 +12,11 @@ using MetaGraphs include(joinpath(@__DIR__, "../deps/GraphMLReader.jl/src/GraphMLReader.jl")) function parse_args(args) - s = ArgParseSettings(description= - """ - Calculate distributions of Gaudi algorithm execution duration time - from a timeline extracted with Gaudi TimelineSvc or data-flow graph - """) + s = ArgParseSettings(description = + """ + Calculate distributions of Gaudi algorithm execution duration time + from a timeline extracted with Gaudi TimelineSvc or data-flow graph + """) @add_arg_table! s begin "input" @@ -42,7 +42,8 @@ end function durations_from_graphml(filename) graph = GraphMLReader.loadgraphml(filename, "G") algorithm_vertices = MetaGraphs.filter_vertices(graph, :type, "Algorithm") - return [get_prop(graph, vertex, :runtime_average_s) for vertex in algorithm_vertices if has_prop(graph, vertex, :runtime_average_s)] + return [get_prop(graph, vertex, :runtime_average_s) + for vertex in algorithm_vertices if has_prop(graph, vertex, :runtime_average_s)] end function main(args) @@ -82,11 +83,14 @@ function main(args) end num_bins = sqrt(n) |> ceil |> Int - bin_edges = exp10.(range(log10(min_duration), stop=log10(max_duration), length=num_bins + 3)) + bin_edges = exp10.(range(log10(min_duration), stop = log10(max_duration), + length = num_bins + 3)) - histogram(durations; label="", bin=bin_edges, xscale=:log10, xlim=extrema(bin_edges), - title="Algorithm execution duration", xlabel="Duration (s)", ylabel="Counts", - xguidefonthalign=:right, yguidefontvalign=:top) + histogram(durations; label = "", bin = bin_edges, xscale = :log10, + xlim = extrema(bin_edges), + title = "Algorithm execution duration", xlabel = "Duration (s)", + ylabel = "Counts", + xguidefonthalign = :right, yguidefontvalign = :top) savefig(output_file) @info "Histogram saved to $output_file" end