Skip to content

Commit

Permalink
CPU crunching for more realistic algorithm (#24)
Browse files Browse the repository at this point in the history
* Prime finding function

Function to find the nth prime number with helpers for worker
calibration

* Added algorithm calibration and crunching

Calibration coefficient is stored on each worker and used to crunch for
average runtime using prime number finder

* Updated test with calibration

* Made crunching runtime more predictable

It uses an more naive approach that is not dependent on the density of
primes

* Improved crunching calibration

Calibration now involves two parameters instead of one

* Renamed `algorithm.jl`
  • Loading branch information
joott authored Jul 17, 2024
1 parent 7934198 commit afcc5b3
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
4 changes: 3 additions & 1 deletion examples/schedule.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ function execution(graphs_map)
graphs = FrameworkDemo.parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH)
notifications = RemoteChannel(()->Channel{Int}(32))
# notifications = Channel{Int}(32)
coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients()

for (i, (g_name, g)) in enumerate(graphs)
graphs_dict[i] = g_name
while !(length(graphs_being_run) < MAX_GRAPHS_RUN)
Expand All @@ -39,7 +41,7 @@ function execution(graphs_map)
delete!(graphs_tasks, i)
println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])")
end
graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, i)
graphs_tasks[i] = FrameworkDemo.schedule_graph_with_notify(g, notifications, g_name, i, coefficients)
push!(graphs_being_run, i)
println("Dispatcher: scheduled graph $i: $g_name")
end
Expand Down
3 changes: 2 additions & 1 deletion src/FrameworkDemo.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ include("logging.jl")
include("parsing.jl")
include("scheduling.jl")
include("visualization.jl")
include("cpu_crunching.jl")

# to be removed
include("ModGraphVizSimple.jl")

end # FrameworkDemo
end # FrameworkDemo
43 changes: 43 additions & 0 deletions src/cpu_crunching.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# meant to be consistently inefficient for crunching purposes.
# just returns the largest prime less than `n_max`
function find_primes(n_max::Int)
primes = [2]

for n in 3:n_max
isPrime = true

for y in 2:n÷2
if n % y == 0
isPrime = false
break
end
end

if isPrime
push!(primes, n)
end
end

return primes[end]
end

function benchmark_prime(n::Int)
t0 = time()
find_primes(n)
Δt = time() - t0

return Δt
end

function calculate_coefficients()
n_max = [1000,200_000]
t_average = benchmark_prime.(n_max)

return inv([n_max[i]^j for i in 1:2, j in 1:2]) * t_average
end

function crunch_for_seconds(t::Float64, coefficients::Vector{Float64})
(b,a) = coefficients
n = ceil(Int, (-b + sqrt(abs(b^2 + 4a * t))) / 2a)
find_primes(n)
end
20 changes: 11 additions & 9 deletions src/scheduling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ end

alg_default_runtime_s::Float64 = 0

function (alg::MockupAlgorithm)(args...)
function (alg::MockupAlgorithm)(args...; coefficients::Vector{Float64})
println("Executing $(alg.name)")
crunch_for_seconds(alg.runtime, coefficients)

return alg.name
end
Expand Down Expand Up @@ -61,20 +62,20 @@ function is_terminating_alg(graph::AbstractGraph, vertex_id::Int)
all(is_terminating, successor_dataobjects)
end

function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int)
function schedule_algorithm(graph::MetaDiGraph, vertex_id::Int, coefficients::Dagger.Shard)
incoming_data = get_promises(graph, inneighbors(graph, vertex_id))
algorithm = MockupAlgorithm(graph, vertex_id)
Dagger.@spawn algorithm(incoming_data...)
Dagger.@spawn algorithm(incoming_data...; coefficients)
end

function schedule_graph(graph::MetaDiGraph)
function schedule_graph(graph::MetaDiGraph, coefficients::Dagger.Shard)
alg_vertices = MetaGraphs.filter_vertices(graph, :type, "Algorithm")
sorted_vertices = MetaGraphs.topological_sort(graph)

terminating_results = []

for vertex_id in intersect(sorted_vertices, alg_vertices)
res = schedule_algorithm(graph, vertex_id)
res = schedule_algorithm(graph, vertex_id, coefficients)
set_prop!(graph, vertex_id, :res_data, res)
for v in outneighbors(graph, vertex_id)
set_prop!(graph, v, :res_data, res)
Expand All @@ -87,10 +88,11 @@ function schedule_graph(graph::MetaDiGraph)
end

function schedule_graph_with_notify(graph::MetaDiGraph,
notifications::RemoteChannel,
graph_name::String,
graph_id::Int)
terminating_results = schedule_graph(graph)
notifications::RemoteChannel,
graph_name::String,
graph_id::Int,
coefficients::Dagger.Shard)
terminating_results = schedule_graph(graph, coefficients)

Dagger.@spawn notify_graph_finalization(notifications, graph_name, graph_id, terminating_results...)
end
3 changes: 2 additions & 1 deletion test/scheduling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ end
ilength(x) = sum(_ -> 1, x) # no standard length for MetaGraphs.filter_vertices iterator
algorithms_count = ilength(MetaGraphs.filter_vertices(graph, :type, "Algorithm"))
set_indexing_prop!(graph, :node_id)
coefficients = Dagger.@shard FrameworkDemo.calculate_coefficients()

Dagger.enable_logging!(tasknames=true, taskdeps=true)
_ = Dagger.fetch_logs!() # flush logs

tasks = FrameworkDemo.schedule_graph(graph)
tasks = FrameworkDemo.schedule_graph(graph, coefficients)
wait.(tasks)

logs = Dagger.fetch_logs!()
Expand Down

0 comments on commit afcc5b3

Please sign in to comment.