Skip to content

Commit

Permalink
Fix small issue in generate when c_param is set to default. Improve d…
Browse files Browse the repository at this point in the history
…ataflow generation computation time for acyclic graph. Rename set_multi_graph into set_multi_arc in parameters for logical reason
  • Loading branch information
YouenL committed Feb 25, 2016
1 parent 86fa919 commit e78fb33
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Turbine/generation/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ def generate(dataflow_name="generated_graph", c_param=None):
graphName : the name of the dataflow (default : generated_graph).
c_param : parameters of the generation.
"""
logging.basicConfig(level=c_param.get_logging_level())
if c_param is None:
c_param = Parameters()
logging.basicConfig(level=c_param.get_logging_level())

start = time()
logging.info("Generating graph")
Expand Down
63 changes: 36 additions & 27 deletions Turbine/generation/graph_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ def generate_dataflow(dataflow_name, c_param):
if c_param.get_dataflow_type() == "PCG":
dataflow = PCG(dataflow_name)
if c_param.is_acyclic():
task_rank, task_degree = __generate_connex_dag(dataflow, c_param) # Generate a connected acyclic graph
__generate_arcs_dag(dataflow, c_param, task_rank, task_degree) # Add arcs such as the graph stay acyclic
# Generate a connected acyclic graph
task_rank, task_degree, task_to_rm = __generate_connex_dag(dataflow, c_param)
# Add arcs such as the graph stay acyclic
__generate_arcs_dag(dataflow, c_param, task_rank, task_degree, task_to_rm)
else:
task_degree = __generate_connex_graph(dataflow, c_param) # Generate simple connected graph
__generate_arcs(dataflow, c_param, task_degree) # Add arcs
Expand Down Expand Up @@ -144,14 +146,21 @@ def __generate_connex_dag(dataflow, c_param):

task_degree_non_null = set([])
task_rank = {}
task_to_rm = {}

# Create a path
task = dataflow.add_task()
task_to_rm[task] = [task]

task_degree_non_null.add(task)
__try_add_emergency_task(task, potential_emergency_task, emergency_task)
task_rank[task] = 0
for i in xrange(1, path_nodes_nb):
next_task = dataflow.add_task()
task_to_rm[next_task] = [next_task]

if dataflow.get_task_count() > 1000 and dataflow.get_task_count() % 1000 == 0:
logging.info(str(dataflow.get_task_count()) + "/" + str(c_param.get_nb_task()) + " tasks generate.")
task_degree_non_null.add(next_task)

__try_add_emergency_task(next_task, potential_emergency_task, emergency_task)
Expand All @@ -171,23 +180,35 @@ def __generate_connex_dag(dataflow, c_param):
for i in xrange(c_param.get_nb_task() - path_nodes_nb):
path_rank = randint(0, path_nodes_nb - 1)
task = dataflow.add_task()
task_to_rm[task] = [task]

if dataflow.get_task_count() > 1000 and dataflow.get_task_count() % 1000 == 0:
logging.info(str(dataflow.get_task_count()) + "/" + str(c_param.get_nb_task()) + " tasks generate.")
task_rank[task] = path_rank
arc_added, random_task = __add_random_dag_arc(dataflow, task_degree_non_null, task, task_rank)
if not arc_added:
arc_added, random_task = __add_random_dag_arc(dataflow, emergency_task, task, task_rank)
if not arc_added:
raise Exception("No emergency task, report it to the dev !")
emergency_task.remove(random_task)

if task_rank[task] == task_rank[random_task]:
task_to_rm[task].append(random_task)
task_to_rm[random_task].append(task)
elif not c_param.is_multi_arc():
task_to_rm[task].append(random_task)
task_to_rm[random_task].append(task)

task_degree_non_null.add(task)
__try_add_emergency_task(task, potential_emergency_task, emergency_task)
for t in [task, random_task]:
task_degree[t] -= 1
if task_degree[t] <= 0:
task_degree_non_null.discard(t)
return task_rank, task_degree
return task_rank, task_degree, task_to_rm


def __generate_arcs_dag(dataflow, c_param, task_rank, task_degree):
def __generate_arcs_dag(dataflow, c_param, task_rank, task_degree, task_to_rm):
logging.info("Generate more arcs")
nb_tot_arcs = dataflow.get_task_count() - 1
arc_count = dataflow.get_task_count() - 1
Expand All @@ -201,42 +222,30 @@ def __generate_arcs_dag(dataflow, c_param, task_rank, task_degree):
# Make the set: task with non null degree
for task in task_degree_non_null.copy(): # Start iterate on task to add arc
if task_degree[task] > 0:
task_degree_non_null.remove(task)
task_to_rm = [] # Temporary ignored task because of non reentrant or non multi-arc param
if not c_param.is_multi_arc():
for taskp in task_degree_non_null:
if task_rank[task] > task_rank[taskp]:
if dataflow.get_arc_list(source=taskp, target=task):
task_to_rm.append(taskp)
elif task_rank[task] < task_rank[taskp]:
if dataflow.get_arc_list(source=task, target=taskp):
task_to_rm.append(taskp)
else:
if task > taskp and dataflow.get_arc_list(source=task, target=taskp):
task_to_rm.append(taskp)
if task < taskp and dataflow.get_arc_list(source=taskp, target=task):
task_to_rm.append(taskp)

for rm_task in task_to_rm:
task_degree_non_null.remove(rm_task)
for rm_task in task_to_rm[task]:
task_degree_non_null.discard(rm_task)

# Add arc on potential task
for i in xrange(task_degree[task]):
if task_degree_non_null:
random_task = __add_random_dag_arc(dataflow, task_degree_non_null, task, task_rank)[1]
if task_rank[task] == task_rank[random_task]:
task_to_rm[task].append(random_task)
task_to_rm[random_task].append(task)
elif not c_param.is_multi_arc():
task_to_rm[task].append(random_task)
task_to_rm[random_task].append(task)

task_degree[random_task] -= 1
if task_degree[random_task] <= 0:
task_degree_non_null.remove(random_task)
elif not c_param.is_multi_arc():
task_degree_non_null.discard(random_task)
task_to_rm.append(random_task)

arc_count += 1
if dataflow.get_task_count() >= 1000 and arc_count % 1000 == 0:
logging.info(str(arc_count) + "/" + str(nb_tot_arcs) + " arcs added.")
logging.info(str(arc_count) + "/" + str(nb_tot_arcs) + " ac arcs added.")

# Put back task temporally removed
for task_rm in task_to_rm:
for task_rm in task_to_rm[task]:
task_degree_non_null.add(task_rm)
# Remove the task handle this iteration
task_degree[task] = 0
Expand Down
2 changes: 1 addition & 1 deletion Turbine/param/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def set_reentrant(self, value):
"""
self.__REENTRANT = bool(value)

def set_multi_graph(self, value):
def set_multi_arc(self, value):
"""
:type value: bool
Expand Down

0 comments on commit e78fb33

Please sign in to comment.