Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile CLI run through dask #193

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion trollflow2/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Trollflow2 command line interface."""

import argparse
import contextlib
import json
import logging
from datetime import datetime
from queue import Queue

import dask.diagnostics
import yaml

from trollflow2.launcher import logging_on, process_files
Expand All @@ -30,6 +32,14 @@ def parse_args(args=None):
parser.add_argument("-c", "--log-config",
help="Log config file (yaml) to use",
type=str, required=False, default=None)
parser.add_argument("--dask-profiler",
help="Run dask profiler and visualize as bokeh plot, "
"write to file.",
type=str, required=False, default=None)
parser.add_argument("--dask-resource-profiler",
help="Run dask resource profiler with indicated timestep in seconds. "
"Requires --dask-profiler.",
type=float, required=False, default=None)
return parser.parse_args(args)


Expand All @@ -39,11 +49,20 @@ def cli(args=None):

log_config = _read_log_config(args)

with logging_on(log_config):
with contextlib.ExitStack() as stack:
stack.enter_context(logging_on(log_config))
logger.info("Starting Satpy.")
produced_files = Queue()
profs = []
if args.dask_profiler:
profs.append(stack.enter_context(dask.diagnostics.Profiler()))
if args.dask_resource_profiler:
profs.append(stack.enter_context(dask.diagnostics.ResourceProfiler(dt=args.dask_resource_profiler)))
process_files(args.files, json.loads(args.metadata, object_hook=datetime_decoder),
args.product_list, produced_files)
if args.dask_profiler:
dask.diagnostics.visualize(
profs, show=False, save=True, filename=args.dask_profiler)


def _read_log_config(args):
Expand Down
30 changes: 24 additions & 6 deletions trollflow2/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
workers: []
"""


yaml_test_load_save = """
product_list:
output_dir: &output_dir
Expand All @@ -63,6 +62,16 @@
"""


@pytest.fixture
def product_list_filename(tmp_path):
"""Filename for a test product list, with contents."""
product_list = "my_product_list.yaml"
filename = os.fspath(tmp_path / product_list)
with open(filename, "w") as fd:
fd.write(yaml_test_noop)
return filename


def test_arg_parsing():
"""Test parsing args."""
product_list = "my_product_list.yaml"
Expand Down Expand Up @@ -117,13 +126,9 @@ def test_cli_raises_an_error_when_product_list_is_empty(tmp_path, caplog, empty_
assert "check YAML file" in caplog.text


def test_cli_starts_processing_when_files_are_provided(tmp_path):
def test_cli_starts_processing_when_files_are_provided(tmp_path, product_list_filename):
"""Test that the cli start processing when files are provided."""
product_list = "my_product_list.yaml"
files = ["file1", "file2"]
product_list_filename = os.fspath(tmp_path / product_list)
with open(product_list_filename, "w") as fd:
fd.write(yaml_test_noop)
from trollflow2.launcher import process_files
new_process = mock.Mock(wraps=process_files)
mda = {"dish": "pizza"}
Expand All @@ -133,6 +138,19 @@ def test_cli_starts_processing_when_files_are_provided(tmp_path):
new_process.assert_called_once_with(files, mda, product_list_filename, q_mock.return_value)


def test_cli_dask_profiler(product_list_filename, tmp_path):
"""Test that dask profiles are written."""
from trollflow2.launcher import process_files
new_process = mock.Mock(wraps=process_files)
proffile = tmp_path / "dask-prof.html"
with (mock.patch("trollflow2.cli.process_files", new=new_process),
mock.patch("trollflow2.cli.Queue")):
cli(["-p", os.fspath(product_list_filename), "--dask-profiler",
os.fspath(proffile), "--dask-resource-profiler", "0.1",
"-m", json.dumps({"food": "soy"}), "aquafaba", "tempeh"])
assert proffile.exists()


def test_full_chain_cli_is_creating_output_file(tmp_path):
"""Test that the full chain cli is creating an output file."""
start_time = datetime(2022, 2, 2, 11, 22)
Expand Down