Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Command Line Interface #184

Merged
merged 13 commits into from
Oct 22, 2024
7 changes: 0 additions & 7 deletions .stickler.yml

This file was deleted.

4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,7 @@
tests_require=['pytest', 'mock', 'rasterio'],
python_requires='>=3.9',
test_suite='trollflow2.tests.suite',
use_scm_version=True
use_scm_version=True,
entry_points={
'console_scripts': ['satpy_cli = trollflow2.cli:cli',]}
)
77 changes: 77 additions & 0 deletions trollflow2/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Trollflow2 command line interface."""

import argparse
import json
import logging
from datetime import datetime
from queue import Queue

import yaml

from trollflow2.launcher import logging_on, process_files

logger = logging.getLogger(__name__)


def parse_args(args=None):
"""Parse commandline arguments."""
parser = argparse.ArgumentParser(
description='Launch trollflow2 processing with Satpy on the provides files then quit.')
parser.add_argument("files", nargs='*',
help="Data files to run on",
type=str)
parser.add_argument("-p", "--product-list",
help="The yaml file with the product list",
type=str,
required=True)
parser.add_argument("-m", "--metadata",
help="Metadata (json) to pass on",
type=str, required=False, default="{}")
parser.add_argument("-c", "--log-config",
help="Log config file (yaml) to use",
type=str, required=False, default=None)
return parser.parse_args(args)


def cli(args=None):
"""Command line interface."""
args = parse_args(args)

log_config = _read_log_config(args)

with logging_on(log_config):
logger.info("Starting Satpy.")
produced_files = Queue()
process_files(args.files, json.loads(args.metadata, object_hook=datetime_decoder),
args.product_list, produced_files)


def _read_log_config(args):
"""Read the config."""
log_config = args.log_config
if log_config is not None:
with open(log_config) as fd:
log_config = yaml.safe_load(fd.read())
return log_config


def datetime_decoder(dct):
pnuu marked this conversation as resolved.
Show resolved Hide resolved
"""Decode datetimes to python objects."""
if isinstance(dct, list):
pairs = enumerate(dct)

Check warning on line 61 in trollflow2/cli.py

View check run for this annotation

Codecov / codecov/patch

trollflow2/cli.py#L61

Added line #L61 was not covered by tests
elif isinstance(dct, dict):
pairs = dct.items()
result = []
for key, val in pairs:
if isinstance(val, str):
try:
val = datetime.fromisoformat(val)
except ValueError:
pass
elif isinstance(val, (dict, list)):
val = datetime_decoder(val)

Check warning on line 72 in trollflow2/cli.py

View check run for this annotation

Codecov / codecov/patch

trollflow2/cli.py#L71-L72

Added lines #L71 - L72 were not covered by tests
result.append((key, val))
if isinstance(dct, list):
return [x[1] for x in result]

Check warning on line 75 in trollflow2/cli.py

View check run for this annotation

Codecov / codecov/patch

trollflow2/cli.py#L75

Added line #L75 was not covered by tests
elif isinstance(dct, dict):
return dict(result)
84 changes: 50 additions & 34 deletions trollflow2/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,24 @@

def message_to_jobs(msg, product_list):
"""Convert a posttroll message *msg* to a list of jobs given a *product_list*."""
input_filenames = _extract_filenames(msg)
input_mda = msg.data
return file_list_to_jobs(input_filenames, product_list, input_mda)


def file_list_to_jobs(input_filenames, product_list, input_mda):
"""Convert a file list to jobs."""
formats = product_list['product_list'].get('formats', None)
for _product, pconfig in plist_iter(product_list['product_list'], level='product'):
if 'formats' not in pconfig and formats is not None:
pconfig['formats'] = copy.deepcopy(formats)
jobs = OrderedDict()
priorities = get_area_priorities(product_list)
# TODO: check the uri is accessible from the current host.
input_filenames = _extract_filenames(msg)
for prio, areas in priorities.items():
jobs[prio] = OrderedDict()
jobs[prio]['input_filenames'] = input_filenames.copy()
jobs[prio]['input_mda'] = msg.data.copy()
jobs[prio]['input_mda'] = input_mda.copy()
jobs[prio]['product_list'] = {}
for section in product_list:
if section == 'product_list':
Expand Down Expand Up @@ -322,7 +328,7 @@
return yml


def get_dask_client(config):
def get_dask_distributed_client(config):
"""Create Dask client if configured."""
client = None

Expand Down Expand Up @@ -374,38 +380,20 @@

def process(msg, prod_list, produced_files):
"""Process a message."""
config = read_config(prod_list, Loader=UnsafeLoader)
"""Convert a posttroll message *msg* to a list of jobs given a *product_list*."""
input_filenames = _extract_filenames(msg)
input_mda = msg.data
process_files(input_filenames, input_mda, prod_list, produced_files)

# Get distributed client
client = get_dask_client(config)

def process_files(input_filenames, input_mda, prod_list, produced_files):
"""Process files."""
config = read_config(prod_list, Loader=UnsafeLoader)
client = get_dask_distributed_client(config)
try:
config = expand(config)
jobs = message_to_jobs(msg, config)
for prio in sorted(jobs.keys()):
job = jobs[prio]
job['processing_priority'] = prio
job['produced_files'] = produced_files
try:
for wrk in config['workers']:
cwrk = wrk.copy()
if "timeout" in cwrk:

def _timeout_handler(signum, frame, wrk=wrk):
raise TimeoutError(
f"Timeout for {wrk['fun']!s} expired "
f"after {wrk['timeout']:.1f} seconds, "
"giving up")
signal.signal(signal.SIGALRM, _timeout_handler)
# using setitimer because it accepts floats,
# unlike signal.alarm
signal.setitimer(signal.ITIMER_REAL,
cwrk.pop("timeout"))
cwrk.pop('fun')(job, **cwrk)
if "timeout" in cwrk:
signal.alarm(0) # cancel the alarm
except AbortProcessing as err:
logger.warning(str(err))
jobs = file_list_to_jobs(input_filenames, config, input_mda)
process_jobs(config["workers"], jobs, produced_files)
except Exception:
logger.exception("Process crashed")
if "crash_handlers" in config:
Expand All @@ -423,13 +411,39 @@
except AttributeError:
continue
del config
try:
with suppress(AttributeError):
client.close()
except AttributeError:
pass
gc.collect()


def process_jobs(workers, jobs, produced_files):
"""Process the jobs."""
for prio in sorted(jobs.keys()):
job = jobs[prio]
job['processing_priority'] = prio
job['produced_files'] = produced_files
try:
for wrk in workers:
cwrk = wrk.copy()
if "timeout" in cwrk:
def _timeout_handler(signum, frame, wrk=wrk):
raise TimeoutError(
f"Timeout for {wrk['fun']!s} expired "
f"after {wrk['timeout']:.1f} seconds, "
"giving up")

signal.signal(signal.SIGALRM, _timeout_handler)
# using setitimer because it accepts floats,
# unlike signal.alarm
signal.setitimer(signal.ITIMER_REAL,
cwrk.pop("timeout"))
cwrk.pop('fun')(job, **cwrk)
if "timeout" in cwrk:
signal.alarm(0) # cancel the alarm

Check warning on line 442 in trollflow2/launcher.py

View check run for this annotation

Codecov / codecov/patch

trollflow2/launcher.py#L442

Added line #L442 was not covered by tests
except AbortProcessing as err:
logger.warning(str(err))


def read_config(fname=None, raw_string=None, Loader=SafeLoader):
"""Read the configuration file."""
try:
Expand All @@ -439,6 +453,8 @@
raw_config = fid.read()
elif raw_string:
raw_config = raw_string
if not raw_config:
raise IOError
config = yaml.load(_remove_null_keys(raw_config), Loader=Loader)
except (IOError, yaml.YAMLError):
# Either open() or yaml.load() failed
Expand Down
Loading