Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle setting subscriber nameserver to False #159

Merged
merged 12 commits into from
Jan 31, 2023
Merged
59 changes: 3 additions & 56 deletions bin/satpy_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,67 +22,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>
"""The satpy launcher."""

import argparse
import logging
import sys

from trollflow2.logging import logging_on
from trollflow2.launcher import Runner
from multiprocessing import Manager


def parse_args():
"""Parse commandline arguments."""
parser = argparse.ArgumentParser(
description='Launch trollflow2 processing with Satpy listening on the specified Posttroll topic(s)')
parser.add_argument("topic", nargs='*',
help="Topic to listen to",
type=str)
parser.add_argument("product_list",
help="The yaml file with the product list",
type=str)
parser.add_argument("-m", "--test_message",
help="File path with the message used for testing offline. This implies threaded running.",
type=str, required=False)
parser.add_argument("-t", "--threaded",
help="Run the product generation in threads instead of processes.",
action='store_true')

parser.add_argument("-c", "--log-config",
help="Log config file (yaml) to use",
type=str, required=False)
parser.add_argument('-n', "--nameserver", required=False, type=str,
help="Nameserver to connect to", default='localhost')
parser.add_argument('-a', "--addresses", required=False, type=str,
help=("Add direct TCP port connection. Can be used several times: "
"'-a tcp://127.0.0.1:12345 -a tcp://123.456.789.0:9013'"),
action="append")

return parser.parse_args()
from trollflow2.launcher import launch


def main():
"""Launch trollflow2."""
args = vars(parse_args())

log_config = args.pop("log_config", None)
if log_config is not None:
with open(log_config) as fd:
import yaml
log_config = yaml.safe_load(fd.read())

logger = logging.getLogger("satpy_launcher")

log_queue = Manager().Queue()

with logging_on(log_queue, log_config):
logger.warning("Launching Satpy-based runner.")
product_list = args.pop("product_list")
test_message = args.pop("test_message")
threaded = args.pop("threaded")
connection_parameters = args

runner = Runner(product_list, log_queue, connection_parameters, test_message, threaded)
runner.run()
launch(sys.argv[1:])


if __name__ == "__main__":
Expand Down
7 changes: 6 additions & 1 deletion doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Welcome to Trollflow2's documentation!

Trollflow2 is an operational generation chain runner for Satpy.

See the example playlist (``pl.yaml``) for inspiration.
See the bottom of this page and the example playlist (``pl.yaml``) for inspiration.

The launcher
------------
Expand All @@ -23,6 +23,11 @@ The launcher
:undoc-members:
:show-inheritance:

It is possible to disable Posttroll Nameserver usage for the incoming
messages by starting ``satpy_launcher.py`` with command-line arguments
``-n False -a tcp://<host>:<port>`` where the host and port point to a
message publisher. Multiple publisher addresses can be given by supplying
them with additional ``-a`` switches.
Comment on lines +26 to +30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question here: Is this the same interface we use in other pytroll modules?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally, yes. All the packages using create_subscriber_from_dict_config() or create_publisher_from_dict_config() are using the same settings, explained here: https://github.com/pytroll/posttroll/blob/main/posttroll/subscriber.py#L437-L438

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok! but I was actually wondering if the external/command line interface was the same?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump @pnuu

Copy link
Member Author

@pnuu pnuu Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the incoming messages (subscribers) I found these:

  • geographic_gatherer.py uses -n false and -i tcp://..., where -i is "inbound connection" and means the same as -a does here
  • segment_gatherer.py reads these settings from the config file (using nameservers and addresses option names)

Internally the both use posttroll.subscriber.create_subscriber_from_dict_config(), so given the same combination the nameserver handling will be the same. The names used by Posttroll are addresses and nameserver.

The -a and -n options were present in satpy_launcher.py already before this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we'll keep them for now then


Plugins
-------
Expand Down
74 changes: 67 additions & 7 deletions trollflow2/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
memory buildup.
"""


import argparse
import ast
import copy
import gc
Expand All @@ -37,23 +37,25 @@
from collections import OrderedDict
from contextlib import suppress
from datetime import datetime
from logging import getLogger
import logging
from queue import Empty
from multiprocessing import Manager

import yaml
from trollflow2.dict_tools import gen_dict_extract, plist_iter
from trollflow2.logging import setup_queued_logging
from trollflow2.plugins import AbortProcessing
from yaml import UnsafeLoader, SafeLoader, BaseLoader

try:
from posttroll.listener import ListenerContainer
except ImportError:
ListenerContainer = None

from yaml import UnsafeLoader, SafeLoader, BaseLoader
from trollflow2.dict_tools import gen_dict_extract, plist_iter
from trollflow2.logging import logging_on
from trollflow2.logging import setup_queued_logging
from trollflow2.plugins import AbortProcessing


LOG = getLogger(__name__)
LOG = logging.getLogger(__name__)
pnuu marked this conversation as resolved.
Show resolved Hide resolved
DEFAULT_PRIORITY = 999


Expand Down Expand Up @@ -438,3 +440,61 @@ def sendmail(config, trace):
pid = Popen([sendmail, "-t", "-oi"], stdin=PIPE)
pid.communicate(msg.as_bytes())
pid.terminate()


def launch(args_in):
pnuu marked this conversation as resolved.
Show resolved Hide resolved
"""Launch the processing."""
args = parse_args(args_in)
pnuu marked this conversation as resolved.
Show resolved Hide resolved

log_config = args.pop("log_config", None)
if log_config is not None:
with open(log_config) as fd:
log_config = yaml.safe_load(fd.read())
pnuu marked this conversation as resolved.
Show resolved Hide resolved

logger = logging.getLogger("satpy_launcher")

log_queue = Manager().Queue()

with logging_on(log_queue, log_config):
logger.warning("Launching Satpy-based runner.")
pnuu marked this conversation as resolved.
Show resolved Hide resolved
product_list = args.pop("product_list")
test_message = args.pop("test_message")
threaded = args.pop("threaded")
connection_parameters = args

runner = Runner(product_list, log_queue, connection_parameters, test_message, threaded)
runner.run()


def parse_args(args_in):
pnuu marked this conversation as resolved.
Show resolved Hide resolved
"""Parse commandline arguments."""
parser = argparse.ArgumentParser(
description='Launch trollflow2 processing with Satpy listening on the specified Posttroll topic(s)')
parser.add_argument("topic", nargs='*',
help="Topic to listen to",
type=str)
parser.add_argument("product_list",
help="The yaml file with the product list",
type=str)
parser.add_argument("-m", "--test_message",
help="File path with the message used for testing offline. This implies threaded running.",
type=str, required=False)
parser.add_argument("-t", "--threaded",
help="Run the product generation in threads instead of processes.",
action='store_true')

parser.add_argument("-c", "--log-config",
help="Log config file (yaml) to use",
type=str, required=False)
parser.add_argument('-n', "--nameserver", required=False, type=str,
help="Nameserver to connect to", default='localhost')
pnuu marked this conversation as resolved.
Show resolved Hide resolved
parser.add_argument('-a', "--addresses", required=False, type=str,
help=("Add direct TCP port connection. Can be used several times: "
"'-a tcp://127.0.0.1:12345 -a tcp://123.456.789.0:9013'"),
action="append")

args = vars(parser.parse_args(args_in))
if args['nameserver'].lower() in ('false', 'off', '0'):
args['nameserver'] = False

return args
8 changes: 8 additions & 0 deletions trollflow2/tests/test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,5 +774,13 @@ def qsize(self):
assert "Process killed with signal 1" in caplog.text


def test_argparse_nameserver_is_none():
"""Test that '-n false' is sets nameserver as False."""
from trollflow2.launcher import parse_args

res = parse_args(['-n', 'false', 'product_list.yaml'])
assert res['nameserver'] is False


if __name__ == '__main__':
unittest.main()