Skip to content

Commit

Permalink
Merge pull request #94 from seung-lab/nginx
Browse files Browse the repository at this point in the history
Serve igneous frag files through http interface

NFS has huge overhead when reading small pieces of frag files due to caching/prefetching, switch to http interface to alleviate the problem. Uploading is still through nfs
  • Loading branch information
ranlu authored May 13, 2024
2 parents dece2e9 + 6b8acc5 commit d27d10d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 8 deletions.
36 changes: 35 additions & 1 deletion cloud/google/nfs_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,35 @@ def GenerateNFSServerStartupScript(context, hostname_manager):

oom_canary_cmd = GenerateDockerCommand(docker_image, docker_env) + ' ' + "python utils/memory_monitor.py ${AIRFLOW__CELERY__BROKER_URL} bot-message-queue >& /dev/null"
worker_cmd = GenerateCeleryWorkerCommand(docker_image, docker_env+['-p 8793:8793'], queue="nfs", concurrency=1)
nginx_conf = '''worker_processes auto;
worker_rlimit_nofile 2048;
events {
worker_connections 1024;
use epoll;
}
http {
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
access_log /dev/null;
server {
listen 80;
root /var/www/html;
location / {
try_files \$uri \$uri/ =404;
}
location /share/ {
alias /share/;
}
location /nginx_status {
stub_status;
}
}
}
'''

startup_script = f'''
#!/bin/bash
Expand All @@ -30,21 +59,26 @@ def GenerateNFSServerStartupScript(context, hostname_manager):
mount /dev/sdb /share
chmod 777 /share
mkdir -p /share/mariadb
apt-get install nfs-kernel-server -y
apt-get install nfs-kernel-server nginx -y
echo "/share 172.31.0.0/16(insecure,rw,async,no_subtree_check)" >> /etc/exports
echo "ALL: 172.31.0.0/16" >> /etc/hosts.allow
systemctl start nfs-kernel-server.service
cat << EOF > /etc/nfs.conf.d/local.conf
[nfsd]
threads = 64
EOF
cat << EOF > /etc/nginx/nginx.conf
{nginx_conf}
EOF
systemctl restart nfs-kernel-server.service
touch /etc/bootstrap_done
sleep 300
shutdown -h now
fi
sysctl -w net.netfilter.nf_conntrack_max=$(awk '/MemAvailable/ {{print int($2/16)}}' /proc/meminfo)
echo $(awk '/MemAvailable/ {{print int($2/64)}}' /proc/meminfo) > /sys/module/nf_conntrack/parameters/hashsize
mount /dev/sdb /share
chmod 777 /share
systemctl restart nfs-kernel-server.service
Expand Down
25 changes: 18 additions & 7 deletions dags/igneous_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@ def create_igneous_ops(param, dag):
if nfs_conn:
extra_args = nfs_conn.extra_dejson
sql_url = f"mysql://root:igneous@{extra_args['hostname']}"
nfs_kwargs = {"frag_path": f"file:///share/{run_name}", "sql_url": sql_url}
queue = "nfs"
else:
nfs_kwargs = {"frag_path": None, "sql_url": None}
extra_args = None
sql_url = None
queue = "manager"


def nfs_kwargs(io_mode):
if not nfs_conn:
return {"frag_path": None, "sql_url": None}
if io_mode == "write":
return {"frag_path": f"file:///share/{db_name(param['NAME'], 'segmentation')}", "sql_url": sql_url}
elif io_mode == "read":
return {"frag_path": f"http://{extra_args['hostname']}/share/{db_name(param['NAME'], 'segmentation')}", "sql_url": sql_url}
else:
return {"frag_path": None, "sql_url": None}


if not param.get("SKIP_DOWNSAMPLE", False):
if not param.get("SKIP_MESHING", False):
current_op = PythonOperator(
Expand All @@ -44,7 +55,7 @@ def create_igneous_ops(param, dag):
task_id="mesh",
python_callable=mesh,
op_args=[run_name, seg_cloudpath, param.get("MESH_QUALITY", "NORMAL"), param.get("SHARDED_MESH", True), ],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="write"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand Down Expand Up @@ -72,7 +83,7 @@ def create_igneous_ops(param, dag):
task_id="merge_mesh_fragments",
python_callable=merge_mesh_fragments,
op_args=[run_name, seg_cloudpath, param.get("SHARDED_MESH_WORKER_CONCURRENCY", None)],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="read"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand All @@ -83,7 +94,7 @@ def create_igneous_ops(param, dag):
task_id="mesh_manifest",
python_callable=mesh_manifest,
op_args=[run_name, seg_cloudpath, param["BBOX"], param["CHUNK_SIZE"], ],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="read"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand Down Expand Up @@ -116,7 +127,7 @@ def create_igneous_ops(param, dag):
task_id="skeleton_fragment",
python_callable=create_skeleton_fragments,
op_args=[run_name, seg_cloudpath, param.get("TEASAR_PARAMS", {'scale': 10, 'const': 10}), ],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="write"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand All @@ -143,7 +154,7 @@ def create_igneous_ops(param, dag):
task_id="merge_skeleton",
python_callable=merge_skeleton_fragments,
op_args=[run_name, seg_cloudpath, ],
op_kwargs=nfs_kwargs,
op_kwargs=nfs_kwargs(io_mode="read"),
on_retry_callback=task_retry_alert,
weight_rule=WeightRule.ABSOLUTE,
queue=queue,
Expand Down

0 comments on commit d27d10d

Please sign in to comment.