Skip to content

Commit

Permalink
Add a helper function to switch between nfs and http interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlu committed May 9, 2024
1 parent 7a35ab2 commit 20b716c
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions dags/igneous_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@ def create_igneous_ops(param, dag):
if nfs_conn:
extra_args = nfs_conn.extra_dejson
sql_url = f"mysql://root:igneous@{extra_args['hostname']}"
nfs_kwargs = {"frag_path": f"file:///share/{run_name}", "sql_url": sql_url}
queue = "nfs"
else:
nfs_kwargs = {"frag_path": None, "sql_url": None}
extra_args = None
sql_url = None
queue = "manager"


def nfs_kwargs(io_mode):
if not nfs_conn:
return {"frag_path": None, "sql_url": None}
if io_mode == "write":
return {"frag_path": f"file:///share/{db_name(param['NAME'], 'segmentation')}", "sql_url": sql_url}
elif io_mode == "read":
return {"frag_path": f"http://{extra_args['hostname']}/share/{db_name(param['NAME'], 'segmentation')}", "sql_url": sql_url}
else:
return {"frag_path": None, "sql_url": None}


if not param.get("SKIP_DOWNSAMPLE", False):
if not param.get("SKIP_MESHING", False):
current_op = PythonOperator(
Expand All @@ -44,7 +55,7 @@ def create_igneous_ops(param, dag):
task_id="mesh",
python_callable=mesh,
op_args=[run_name, seg_cloudpath, param.get("MESH_QUALITY", "NORMAL"), param.get("SHARDED_MESH", True), ],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="write"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand Down Expand Up @@ -72,7 +83,7 @@ def create_igneous_ops(param, dag):
task_id="merge_mesh_fragments",
python_callable=merge_mesh_fragments,
op_args=[run_name, seg_cloudpath, param.get("SHARDED_MESH_WORKER_CONCURRENCY", None)],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="read"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand All @@ -83,7 +94,7 @@ def create_igneous_ops(param, dag):
task_id="mesh_manifest",
python_callable=mesh_manifest,
op_args=[run_name, seg_cloudpath, param["BBOX"], param["CHUNK_SIZE"], ],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="read"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand Down Expand Up @@ -116,7 +127,7 @@ def create_igneous_ops(param, dag):
task_id="skeleton_fragment",
python_callable=create_skeleton_fragments,
op_args=[run_name, seg_cloudpath, param.get("TEASAR_PARAMS", {'scale': 10, 'const': 10}), ],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="write"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand All @@ -143,7 +154,7 @@ def create_igneous_ops(param, dag):
task_id="merge_skeleton",
python_callable=merge_skeleton_fragments,
op_args=[run_name, seg_cloudpath, ],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="read"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand Down

0 comments on commit 20b716c

Please sign in to comment.