diff --git a/README.rst b/README.rst index 9b1d9435f..a89d962cd 100644 --- a/README.rst +++ b/README.rst @@ -118,7 +118,7 @@ If you use Oríon for published work, please cite our work using the following b .. code-block:: bibtex - @software{xavier_bouthillier_2022_0_2_2, + @software{xavier_bouthillier_2022_0_2_3, author = {Xavier Bouthillier and Christos Tsirigotis and François Corneau-Tremblay and @@ -143,10 +143,10 @@ If you use Oríon for published work, please cite our work using the following b Pascal Lamblin and Christopher Beckham}, title = {{Epistimio/orion: Asynchronous Distributed Hyperparameter Optimization}}, - month = feb, + month = mar, year = 2022, publisher = {Zenodo}, - version = {v0.2.2}, + version = {v0.2.3}, doi = {10.5281/zenodo.3478592}, url = {https://doi.org/10.5281/zenodo.3478592} } diff --git a/ROADMAP.md b/ROADMAP.md index 9fd7ae38a..2f5a7ef87 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1,14 +1,20 @@ # Roadmap -Last update Feb 11th, 2022 +Last update March 7th, 2022 ## Next releases - Short-Term -### v0.2.3 +### v0.2.4 - [DEBH](https://arxiv.org/abs/2105.09821) - [HEBO](https://github.com/huawei-noah/HEBO/tree/master/HEBO/archived_submissions/hebo) - [BOHB](https://ml.informatik.uni-freiburg.de/papers/18-ICML-BOHB.pdf) +- [Nevergrad](https://github.com/facebookresearch/nevergrad) +- [Ax](https://ax.dev/) +- [MOFA](https://github.com/Epistimio/orion.algo.mofa) +- [PB2](https://github.com/Epistimio/orion.algo.pb2) - Integration with Hydra +- Integration with [sample-space](https://github.com/Epistimio/sample-space) and + [ConfigSpace](https://automl.github.io/ConfigSpace/master/) ## Next releases - Mid-Term diff --git a/conda/ci_build.sh b/conda/ci_build.sh index dee180d0e..e63f40c2b 100755 --- a/conda/ci_build.sh +++ b/conda/ci_build.sh @@ -6,6 +6,7 @@ export PATH="$HOME/miniconda/bin:$PATH" hash -r conda config --set always_yes yes --set changeps1 no conda config --add channels conda-forge +conda config --add channels mila-iqia conda config --set channel_priority strict pip uninstall -y setuptools diff --git a/conda/meta.yaml b/conda/meta.yaml index c5b0bbc90..b33ec5d49 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -33,6 +33,7 @@ requirements: - requests - pandas - falcon + - falcon-cors - gunicorn - scikit-learn - psutil diff --git a/setup.py b/setup.py index 4cb156dd8..7231e81bc 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ repo_root = os.path.dirname(os.path.abspath(__file__)) -tests_require = ["pytest>=3.0.0", "scikit-learn"] +tests_require = ["pytest>=3.0.0", "scikit-learn", "ptera>=1.1.0"] packages = [ # Packages must be sorted alphabetically to ease maintenance and merges. @@ -88,6 +88,7 @@ "pandas", "gunicorn", "falcon", + "falcon-cors", "scikit-learn", "psutil", "joblib", diff --git a/src/orion/client/runner.py b/src/orion/client/runner.py index 7974b4a15..5eb1747b3 100644 --- a/src/orion/client/runner.py +++ b/src/orion/client/runner.py @@ -40,12 +40,24 @@ def __init__(self): self.handlers = dict() self.start = 0 self.delayed = 0 + self.signal_installed = False def __enter__(self): """Override the signal handlers with our delayed handler""" self.signal_received = False - self.handlers[signal.SIGINT] = signal.signal(signal.SIGINT, self.handler) - self.handlers[signal.SIGTERM] = signal.signal(signal.SIGTERM, self.handler) + + try: + self.handlers[signal.SIGINT] = signal.signal(signal.SIGINT, self.handler) + self.handlers[signal.SIGTERM] = signal.signal(signal.SIGTERM, self.handler) + self.signal_installed = True + + except ValueError: # ValueError: signal only works in main thread + log.warning( + "SIGINT/SIGTERM protection hooks could not be installed because " + "Runner is executing inside a thread/subprocess, results could get lost " + "on interruptions" + ) + return self def handler(self, sig, frame): @@ -65,6 +77,9 @@ def handler(self, sig, frame): def restore_handlers(self): """Restore old signal handlers""" + if not self.signal_installed: + return + signal.signal(signal.SIGINT, self.handlers[signal.SIGINT]) signal.signal(signal.SIGTERM, self.handlers[signal.SIGTERM]) @@ -268,7 +283,7 @@ def run(self): def should_sample(self): """Check if more trials could be generated""" - if self.is_broken or self.is_done: + if self.free_worker <= 0 or (self.is_broken or self.is_done): return 0 pending = len(self.pending_trials) + self.trials @@ -317,8 +332,9 @@ def gather(self): to_be_raised = None log.debug(f"Gathered new results {len(results)}") - # register the results + # NOTE: For Ptera instrumentation + trials = 0 # pylint:disable=unused-variable for result in results: trial = self.pending_trials.pop(result.future) @@ -327,6 +343,8 @@ def gather(self): # NB: observe release the trial already self.client.observe(trial, result.value) self.trials += 1 + # NOTE: For Ptera instrumentation + trials = self.trials # pylint:disable=unused-variable except InvalidResult as exception: # stop the optimization process if we received `InvalidResult` # as all the trials are assumed to be returning those @@ -398,15 +416,19 @@ def _suggest_trials(self, count): # non critical errors except WaitingForTrials: + log.debug("Runner cannot sample because WaitingForTrials") break except ReservationRaceCondition: + log.debug("Runner cannot sample because ReservationRaceCondition") break except LockAcquisitionTimeout: + log.debug("Runner cannot sample because LockAcquisitionTimeout") break except CompletedExperiment: + log.debug("Runner cannot sample because CompletedExperiment") break return trials diff --git a/src/orion/core/__init__.py b/src/orion/core/__init__.py index 0770fb0a9..69431fe3e 100644 --- a/src/orion/core/__init__.py +++ b/src/orion/core/__init__.py @@ -32,10 +32,10 @@ __descr__ = "Asynchronous [black-box] Optimization" __version__ = VERSIONS["version"] __license__ = "BSD-3-Clause" -__author__ = u"Epistímio" -__author_short__ = u"Epistímio" +__author__ = "Epistímio" +__author_short__ = "Epistímio" __author_email__ = "xavier.bouthillier@umontreal.ca" -__copyright__ = u"2017-2022, Epistímio" +__copyright__ = "2017-2022, Epistímio" __url__ = "https://github.com/epistimio/orion" DIRS = AppDirs(__name__, __author_short__) @@ -55,6 +55,7 @@ def define_config(): define_experiment_config(config) define_worker_config(config) define_evc_config(config) + define_frontends_uri_config(config) config.add_option( "user_script_config", @@ -73,6 +74,31 @@ def define_config(): return config +def define_frontends_uri_config(config): + """Create and define the field of frontends URI configuration.""" + + def parse_frontends_uri(data): + # Expect either a list of strings (URLs), + # or a string as comma-separated list of URLs + if isinstance(data, list): + return data + elif isinstance(data, str): + return [piece.strip() for piece in data.split(",")] + else: + raise RuntimeError( + f"frontends_uri: expected either a list of strings (URLs), " + f"or a string as comma-separated list of URLs, got {data}" + ) + + config.add_option( + "frontends_uri", + option_type=parse_frontends_uri, + default=[], + env_var="ORION_WEBAPI_FRONTENDS_URI", + help="List of frontends addresses allowed to send requests to Orion server.", + ) + + def define_storage_config(config): """Create and define the fields of the storage configuration.""" storage_config = Configuration() diff --git a/src/orion/core/io/resolve_config.py b/src/orion/core/io/resolve_config.py index a0272df73..f9b1e19c3 100644 --- a/src/orion/core/io/resolve_config.py +++ b/src/orion/core/io/resolve_config.py @@ -213,8 +213,9 @@ def fetch_config(args): local_config = unflatten(local_config) + backward_keys = ["storage", "experiment", "worker", "evc"] # For backward compatibility - for key in ["storage", "experiment", "worker", "evc"]: + for key in backward_keys: subkeys = list(global_config[key].keys()) # Arguments that are only supported locally @@ -241,6 +242,11 @@ def fetch_config(args): local_config.setdefault(key, {}) local_config[key][subkey] = value + # Keep other keys parsed from config file + for key in tmp_config.keys(): + if key not in backward_keys: + local_config[key] = tmp_config[key] + return local_config diff --git a/src/orion/executor/dask_backend.py b/src/orion/executor/dask_backend.py index a236622b1..4b093fa7a 100644 --- a/src/orion/executor/dask_backend.py +++ b/src/orion/executor/dask_backend.py @@ -52,6 +52,16 @@ def successful(self): class Dask(BaseExecutor): + """Wrapper around the dask client. + + .. warning:: + + The Dask executor can be pickled and used inside a subprocess, + the pickled client will use the main client that was spawned in the main process, + but you cannot spawn clients inside a subprocess. + + """ + def __init__(self, n_workers=-1, client=None, **config): super(Dask, self).__init__(n_workers=n_workers) diff --git a/src/orion/executor/multiprocess_backend.py b/src/orion/executor/multiprocess_backend.py index 911d41620..39fa4c973 100644 --- a/src/orion/executor/multiprocess_backend.py +++ b/src/orion/executor/multiprocess_backend.py @@ -156,6 +156,10 @@ class PoolExecutor(BaseExecutor): backend: str Pool backend to use; thread or multiprocess, defaults to multiprocess + .. warning:: + + Pickling of the executor is not supported, see Dask for a backend that supports it + """ BACKENDS = dict( @@ -173,6 +177,12 @@ def __init__(self, n_workers=-1, backend="multiprocess", **kwargs): self.pool = PoolExecutor.BACKENDS.get(backend, ThreadPool)(n_workers) + def __setstate__(self, state): + self.pool = state["pool"] + + def __getstate__(self): + return dict(pool=self.pool) + def __enter__(self): return self @@ -188,13 +198,6 @@ def close(self): if hasattr(self, "pool"): self.pool.shutdown() - def __getstate__(self): - state = super(PoolExecutor, self).__getstate__() - return state - - def __setstate__(self, state): - super(PoolExecutor, self).__setstate__(state) - def submit(self, function, *args, **kwargs): try: return self._submit_cloudpickle(function, *args, **kwargs) diff --git a/src/orion/serving/webapi.py b/src/orion/serving/webapi.py index f317a8a1c..ae8d4f275 100644 --- a/src/orion/serving/webapi.py +++ b/src/orion/serving/webapi.py @@ -8,7 +8,10 @@ """ +import logging + import falcon +from falcon_cors import CORS, CORSMiddleware from orion.serving.experiments_resource import ExperimentsResource from orion.serving.plots_resources import PlotsResource @@ -16,6 +19,63 @@ from orion.serving.trials_resource import TrialsResource from orion.storage.base import setup_storage +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class MyCORSMiddleware(CORSMiddleware): + """Subclass of falcon-cors CORSMiddleware class. + + Generate a HTTP 403 Forbidden response if request sender is not allowed + to access requested content. + + Default middleware just prints a message in server side + (e.g. "Aborting response due to origin not allowed"), but still + sends content, so, a client ignoring headers might still access + data even if not allowed. + + CORS middleware role is to add necessary "access-control-" headers to + response to mark it as allowed. So, a response lacking expected headers + after call to parent method `process_ressource()` can be considered + to not be delivered to request sender. + + More info about CORS: + - https://developer.mozilla.org/fr/docs/Web/HTTP/CORS + - https://fr.wikipedia.org/wiki/Cross-origin_resource_sharing + """ + + def process_resource(self, req, resp, resource, *args): + """Generate a 403 Forbidden response if response is not allowed.""" + + cors_resp_headers_before = [ + header + for header in resp.headers + if header.lower().startswith("access-control-") + ] + assert not cors_resp_headers_before, cors_resp_headers_before + + super().process_resource(req, resp, resource, *args) + + # We then verify if some access control headers were added to response. + # If not, reponse is not allowed. + # Special case: if request did not have an origin, it was certainly sent from + # a browser (ie. not another server), so CORS is not relevant. + cors_resp_headers_after = [ + header + for header in resp.headers + if header.lower().startswith("access-control-") + ] + if not cors_resp_headers_after and req.get_header("origin"): + raise falcon.HTTPForbidden() + + +class MyCORS(CORS): + """Subclass of falcon-cors CORS class to return a custom middleware.""" + + @property + def middleware(self): + return MyCORSMiddleware(self) + class WebApi(falcon.API): """ @@ -24,7 +84,27 @@ class WebApi(falcon.API): """ def __init__(self, config=None): - super(WebApi, self).__init__() + # By default, server will reject requests coming from a server + # with different origin. E.g., if server is hosted at + # http://myorionserver.com, it won't accept an API call + # coming from a server not hosted at same address + # (e.g. a local installation at http://localhost) + # This is a Cross-Origin Resource Sharing (CORS) security: + # https://developer.mozilla.org/fr/docs/Web/HTTP/CORS + # To make server accept CORS requests, we need to use + # falcon-cors package: https://github.com/lwcolton/falcon-cors + frontends_uri = ( + config["frontends_uri"] + if "frontends_uri" in config + else ["http://localhost:3000"] + ) + logger.info( + "allowed frontends: {}".format( + ", ".join(frontends_uri) if frontends_uri else "(none)" + ) + ) + cors = MyCORS(allow_origins_list=frontends_uri) + super(WebApi, self).__init__(middleware=[cors.middleware]) self.config = config setup_storage(config.get("storage")) diff --git a/tests/functional/backward_compatibility/versions.txt b/tests/functional/backward_compatibility/versions.txt index 7e6e8e11b..6a5b8770a 100644 --- a/tests/functional/backward_compatibility/versions.txt +++ b/tests/functional/backward_compatibility/versions.txt @@ -11,3 +11,4 @@ 0.1.16 0.1.17 0.2.1 +0.2.2 diff --git a/tests/functional/serving/conftest.py b/tests/functional/serving/conftest.py index 22b409bd7..0bf7bcf89 100644 --- a/tests/functional/serving/conftest.py +++ b/tests/functional/serving/conftest.py @@ -14,3 +14,18 @@ def client(): storage = {"type": "legacy", "database": {"type": "EphemeralDB"}} with OrionState(storage=storage): yield testing.TestClient(WebApi({"storage": storage})) + + +@pytest.fixture() +def client_with_frontends_uri(): + """Mock the falcon.API instance for testing with custom frontend_uri""" + storage = {"type": "legacy", "database": {"type": "EphemeralDB"}} + with OrionState(storage=storage): + yield testing.TestClient( + WebApi( + { + "storage": storage, + "frontends_uri": ["http://123.456", "http://example.com"], + } + ) + ) diff --git a/tests/functional/serving/test_root.py b/tests/functional/serving/test_root.py index 9f7d45ed1..e816b7df5 100644 --- a/tests/functional/serving/test_root.py +++ b/tests/functional/serving/test_root.py @@ -10,3 +10,58 @@ def test_runtime_summary(client): assert response.json["server"] == "gunicorn" assert response.json["database"] == "EphemeralDB" assert response.status == "200 OK" + + +def test_cors_with_default_frontends_uri(client): + """Tests CORS with default frontends_uri""" + + # No origin (e.g. from a browser), should pass + response = client.simulate_get("/") + assert response.status == "200 OK" + + # Default origin, should pass + response = client.simulate_get("/", headers={"Origin": "http://localhost:3000"}) + assert response.status == "200 OK" + + # Any other origin should fail (even with same host but different port) + + response = client.simulate_get("/", headers={"Origin": "http://localhost:4000"}) + assert response.status == "403 Forbidden" + + response = client.simulate_get("/", headers={"Origin": "http://localhost"}) + assert response.status == "403 Forbidden" + + response = client.simulate_get("/", headers={"Origin": "http://google.com"}) + assert response.status == "403 Forbidden" + + +def test_cors_with_custom_frontends_uri(client_with_frontends_uri): + """Tests CORS with custom frontends_uri""" + + # No origin (e.g. from a browser), should pass + response = client_with_frontends_uri.simulate_get("/") + assert response.status == "200 OK" + + # Allowed address, should pass + response = client_with_frontends_uri.simulate_get( + "/", headers={"Origin": "http://123.456"} + ) + assert response.status == "200 OK" + + # Another allowed address, should pass + response = client_with_frontends_uri.simulate_get( + "/", headers={"Origin": "http://example.com"} + ) + assert response.status == "200 OK" + + # Any other origin should fail + + response = client_with_frontends_uri.simulate_get( + "/", headers={"Origin": "http://localhost"} + ) + assert response.status == "403 Forbidden" + + response = client_with_frontends_uri.simulate_get( + "/", headers={"Origin": "http://google.com"} + ) + assert response.status == "403 Forbidden" diff --git a/tests/functional/storage/test_io.py b/tests/functional/storage/test_io.py new file mode 100644 index 000000000..f027d5322 --- /dev/null +++ b/tests/functional/storage/test_io.py @@ -0,0 +1,467 @@ +""" +Test suite to measure I/O level during hyperparameter optimization. + +If executed as a script, a graph is plotted using matplotlib and mean+-std I/O is logged in +terminal. + +If executed with ``pytest``, ``test_io`` will verify if the level of I/O is close enough to nominal +levels. +""" +import argparse +import contextlib +import functools +import multiprocessing +import os +import subprocess +import sys +import time +from collections import defaultdict, namedtuple + +import bson +import numpy +from ptera import probing + +from orion.client import build_experiment +from orion.client.runner import Runner +from orion.core.io.database.mongodb import MongoDB +from orion.core.io.database.pickleddb import PickledDB +from orion.core.worker.experiment import Experiment +from orion.testing import OrionState + + +def foo(x, sleep_time): + """Dummy function for the tests""" + time.sleep(sleep_time) + return [{"type": "objective", "name": "objective", "value": x}] + + +keys = ["net_in", "net_out"] +MongoStat = namedtuple("MongoStat", keys) + +order_values = dict(b=1 / 1000.0, k=1, m=1000, g=1000 ** 2) + + +def _convert_str_size(size): + """Convert format 0b/0k/0m/0g to KB in float""" + value = float(size[:-1]) + order = size[-1] + return value * order_values[order] + + +class MongoStatMonitoring(multiprocessing.Process): + def __init__(self, sleep_time=1): + super().__init__() + self.sleep_time = sleep_time + self.q = multiprocessing.Queue() + self.stop = multiprocessing.Event() + + def run(self): + while not self.stop.is_set(): + row = mongostat() + self.q.put(MongoStat(*(_convert_str_size(row[key]) for key in keys))) + time.sleep(self.sleep_time) + + +def mongostat(): + """Return stat row of mongostat in a dict.""" + out = subprocess.run( + "mongostat --rowcount=1".split(" "), stdout=subprocess.PIPE, check=True + ) + header, row = out.stdout.decode("utf-8").split("\n")[:2] + return dict(zip(header.split(), row.split())) + + +def max_trials(elements): + """Max number of trials in a buffer""" + if not elements: + return 0 + return max(e["trials"] for e in elements) + + +@contextlib.contextmanager +def monitor_with_mongostat(interval=1, baseline_sleep=5): + """Compute stats with MongoDB. + + This contextmanager is implemented to serve as a reference to validate the proper measure of I/O + using ptera solely (`monitor_with_ptera`). + """ + process = MongoStatMonitoring(sleep_time=interval) + process.start() + time.sleep(baseline_sleep) + process.stop.set() + process.join() + + baseline = defaultdict(int) + while not process.q.empty(): + row = process.q.get(timeout=0.01) + for i, key in enumerate(keys): + baseline[key] += row[i] + + for key in keys: + baseline[key] = numpy.array(baseline[key]).mean() + + data = ([], [], []) + + process = MongoStatMonitoring(sleep_time=interval) + process.start() + + with probing("Runner.gather(trials) > #value") as prb: + num_completed_trials = ( + prb.buffer_with_time(interval + 1).map(max_trials).accum() + ) + + yield data + + process.stop.set() + process.join() + + baseline = defaultdict(int) + while not process.q.empty(): + row = process.q.get(timeout=0.01) + for i, key in enumerate(keys): + data[i].append(row[i] - baseline[key]) + data[-1].append(num_completed_trials.pop(0)) + + +def measure_size(element): + """Measure size (KB) of an element encoded as BSON. + + List and tuples and converted elemented wize and the size is the sum of the elements sizes. + """ + if not element: + return 0 + + if isinstance(element, (list, tuple)): + return sum(measure_size(e) for e in element) + + # in KB + return len(bson.BSON.encode(element)) / 1000.0 + + +def measure_size_write(element): + """Measure net_in/net_out of probing event on DB.write""" + return { + "net_in": sum( + measure_size(element.get(key, None)) for key in ["data", "query"] + ), + "net_out": 0.001, + } + + +def measure_size_read(element): + """Measure net_in/net_out of probing event on DB.read""" + return { + "net_in": measure_size_count(element)["net_in"], + "net_out": sum(measure_size(e) for e in element["#value"]), + } + + +def measure_size_read_and_write(element): + """Measure net_in/net_out of probing event on DB.read_and_write""" + return { + "net_in": measure_size_write(element)["net_in"], + "net_out": measure_size(element["#value"]), + } + + +def measure_size_count(element): + """Measure net_in/net_out of probing event on DB.count""" + return { + "net_in": measure_size(element["query"]), + "net_out": 0.001, + } + + +def sum_stats(elements): + """Sum statisticts in buffered probing events.""" + stats = defaultdict(float) + for element in elements: + for key in element: + stats[key] += element[key] + + return stats + + +def get_online_desc(): + """Build compact inline stack of Orion call""" + i = 1 + frame = sys._getframe(i) + while ( + "orion/core/worker/experiment.py" not in frame.f_code.co_filename + or frame.f_code.co_name.startswith("_") + ): + i += 1 + frame = sys._getframe(i) + + if frame.f_code.co_name == "acquire_algorithm_lock": + caller_frame = sys._getframe(i + 2) + caller = f"{caller_frame.f_code.co_filename}:{caller_frame.f_lineno}:{caller_frame.f_code.co_name}({frame.f_code.co_name})" + else: + caller = f"{frame.f_code.co_filename}:{frame.f_lineno}:{frame.f_code.co_name}" + return caller + + +def get_full_stack_desc(): + """Build full stack of Orion call""" + i = 1 + frame = sys._getframe(i) + while ( + "orion/core/worker/experiment.py" not in frame.f_code.co_filename + or frame.f_code.co_name.startswith("_") + ): + i += 1 + frame = sys._getframe(i) + + stack = [] + while not frame.f_code.co_filename.endswith(__file__): + stack.append( + f"{frame.f_code.co_filename}:{frame.f_lineno}:{frame.f_code.co_name}" + ) + i += 1 + frame = sys._getframe(i) + + return "\n".join(stack) + + +def save_caller(element): + """Save call stack in probing event""" + + element["stack"] = get_full_stack_desc() + + return element + + +@contextlib.contextmanager +def monitor_with_ptera(interval=1, db_backend="PickledDB"): + """Monitor DB I/O and number of trials during optimization.""" + + with contextlib.ExitStack() as stack: + selectors = dict( + write=f"{db_backend}.write(query) > data", + read=f"{db_backend}.read(query) > #value", + read_and_write=f"{db_backend}.read_and_write(data, query) > #value", + count=f"{db_backend}.count(query) > #value", + n_trials="Runner.gather(trials) > #value", + ) + probes = dict() + profiling = dict() + for key, selector in selectors.items(): + probes[key] = stack.enter_context(probing(selector)) + + if key == "n_trials": + probes[key] = ( + probes["n_trials"] + .buffer_with_time(interval) + .map(max_trials) + .accum() + ) + else: + measure_stream = probes[key].map(globals()[f"measure_size_{key}"]) + profiling[key] = measure_stream.map(save_caller).accum() + probes[key] = ( + measure_stream.buffer_with_time(interval).map(sum_stats).accum() + ) + + data = ([], [], []) + yield data + + n_trials = probes.pop("n_trials") + for interval_trials, row in zip(n_trials, zip(*probes.values())): + if interval_trials == 0 and len(data[0]) > 0: + interval_trials = data[2][-1] + net_in = 0 + net_out = 0 + for probe in row: + net_in += probe["net_in"] + net_out += probe["net_out"] + + data[0].append(net_in) + data[1].append(net_out) + data[2].append(interval_trials) + + net_in_profiling = defaultdict(float) + net_out_profiling = defaultdict(float) + calls_profiling = defaultdict(int) + for key, profiler in profiling.items(): + for element in profiler: + net_in_profiling[element["stack"]] += element["net_in"] + net_out_profiling[element["stack"]] += element["net_out"] + calls_profiling[element["stack"]] += 1 + + # TODO: Make this print optional + return + net_in_profiling = sorted( + net_in_profiling.items(), key=lambda item: item[1], reverse=True + ) + total = numpy.zeros(2) + for item in net_in_profiling: + print("net_in", calls_profiling[item[0]], item[1]) + print(item[0]) + total += numpy.array([calls_profiling[item[0]], item[1]]) + print("total", total) + + net_out_profiling = sorted( + net_out_profiling.items(), key=lambda item: item[1], reverse=True + ) + total = numpy.zeros(2) + for item in net_out_profiling: + print("net_out", calls_profiling[item[0]], item[1]) + print(item[0]) + total += numpy.array([calls_profiling[item[0]], item[1]]) + print("total", total) + + +monitoring_methods = dict(mongostat=monitor_with_mongostat, ptera=monitor_with_ptera) + + +colors = dict(joblib="#1f77b4", singleexecutor="#ff7f0e") + + +def main(argv=None): + import matplotlib.pyplot as plt + + parser = argparse.ArgumentParser() + parser.add_argument( + "--backends", + default=["joblib", "singleexecutor"], + nargs="+", + type=str, + help="Executor backends to use during the tests.", + ) + parser.add_argument( + "--n-base-trials", + default=498, + type=int, + help=( + "Number of trials to produce before starting the test. " + "This measures the effect on I/O of last history in DB" + ), + ) + parser.add_argument( + "--n-trials", + default=2, + type=int, + help="Number of trials to execute during the test.", + ) + parser.add_argument( + "--trial-duration", + default=30, + type=float, + help="Duration of trial execution (in seconds).", + ) + parser.add_argument( + "--output", + default="test-io.png", + type=str, + help="File name to save figure.", + ) + + options = parser.parse_args(argv) + + for backend in options.backends: + with OrionState(): + net_in, net_out, n_trials = compute_stats( + monitoring_method="ptera", + executor=backend, + max_trials=( + options.n_base_trials, + options.n_base_trials + options.n_trials, + ), + sleep_time=options.trial_duration, + ) + + net_in = numpy.array(net_in) + net_out = numpy.array(net_out) + + print(f"Backend: {backend}") + print( + f"Input: min={net_in.min()}, max={net_in.max()}, " + f"mean={net_in.mean()}, std={net_in.std()}" + ) + print( + f"Output: min={net_out.min()}, max={net_out.max()}, " + f"mean={net_out.mean()}, std={net_out.std()}" + ) + + max_points = min(map(len, [net_in, net_out, n_trials])) + plt.plot( + list(range(max_points)), + net_in[:max_points], + label=f"{backend}-in", + color=colors[backend], + linestyle="dashed", + ) + plt.plot( + list(range(max_points)), + net_out[:max_points], + label=f"{backend}-out", + color=colors[backend], + ) + + plt.xlabel("Time (s)") + plt.ylabel("I/O (KB/s)") + + plt.legend() + plt.savefig(options.output) + + +def compute_stats( + monitoring_method="ptera", + executor="joblib", + max_trials=(498, 500), + sleep_time=30, +): + experiment = build_experiment( + f"test-io-{executor}-{monitoring_method}", + space=dict(x="uniform(0, 1, precision=100)"), + max_trials=max_trials[1], + ) + + with experiment.tmp_executor(executor, n_workers=1): + experiment.workon( + foo, + max_trials=max_trials[1], + max_trials_per_worker=max_trials[0], + sleep_time=0.0001, + ) + + with monitoring_methods[monitoring_method]() as data: + experiment.workon( + foo, + max_trials=max_trials[1], + max_trials_per_worker=max_trials[1] - max_trials[0], + sleep_time=sleep_time, + ) + + return data + + +def test_io(): + """Verify that I/O levels during optimization are close enough to nominal levels""" + + with OrionState(): + net_in, net_out, n_trials = compute_stats( + monitoring_method="ptera", + executor="joblib", + max_trials=(498, 500), + sleep_time=30.0, + ) + net_in = numpy.array(net_in) + net_out = numpy.array(net_out) + + NOMINAL_IN_MEAN = 17.0 # KB/s + NOMINAL_IN_STD = 79.0 # KB/s + + NOMINAL_OUT_MEAN = 59.0 # KB/s + NOMINAL_OUT_STD = 282.0 # KB/s + + assert net_in.mean() < NOMINAL_IN_MEAN + NOMINAL_IN_STD / numpy.sqrt( + net_in.shape[0] + ) + assert net_out.mean() < NOMINAL_OUT_MEAN + NOMINAL_OUT_STD / numpy.sqrt( + net_out.shape[0] + ) + + +if __name__ == "__main__": + main() diff --git a/tests/requirements.txt b/tests/requirements.txt index 601af04d4..7f37f2e37 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -6,3 +6,4 @@ pytest-lazy-fixture git+https://github.com/Delaunay/track dask[complete] pytest-custom_exit_code +ptera >= 1.1.0 diff --git a/tests/unittests/client/runner_subprocess.py b/tests/unittests/client/runner_subprocess.py new file mode 100644 index 000000000..e344913b9 --- /dev/null +++ b/tests/unittests/client/runner_subprocess.py @@ -0,0 +1,91 @@ +"""Used to test instantiating a runner inside a subprocess""" +from argparse import ArgumentParser + +from orion.client.runner import Runner +from orion.core.utils.exceptions import WaitingForTrials +from orion.core.worker.trial import Trial +from orion.executor.base import executor_factory + +idle_timeout = 20 +count = 10 +n_workers = 2 + + +parser = ArgumentParser() +parser.add_argument("--backend", type=str, default="joblib") +args = parser.parse_args() + + +def new_trial(value, sleep=0.01): + """Generate a dummy new trial""" + return Trial( + params=[ + dict(name="lhs", type="real", value=value), + dict(name="sleep", type="real", value=sleep), + ] + ) + + +class FakeClient: + """Orion mock client for Runner.""" + + def __init__(self, n_workers): + self.is_done = False + self.executor = executor_factory.create(args.backend, n_workers) + self.suggest_error = WaitingForTrials + self.trials = [] + self.status = [] + self.working_dir = "" + + def suggest(self, pool_size=None): + """Fake suggest.""" + if self.trials: + return self.trials.pop() + + raise self.suggest_error + + def release(self, trial, status=None): + """Fake release.""" + self.status.append(status) + + def observe(self, trial, value): + """Fake observe""" + self.status.append("completed") + + def close(self): + self._free_executor() + + def __del__(self): + self._free_executor() + + def _free_executor(self): + if self.executor is not None: + self.executor.__exit__(None, None, None) + self.executor = None + self.executor_owner = False + + +def function(lhs, sleep): + return lhs + sleep + + +client = FakeClient(n_workers) + +runner = Runner( + client=client, + fct=function, + pool_size=10, + idle_timeout=idle_timeout, + max_broken=2, + max_trials_per_worker=2, + trial_arg=[], + on_error=None, +) + +client = runner.client + +client.trials.extend([new_trial(i) for i in range(count)]) + +runner.run() +runner.client.close() +print("done") diff --git a/tests/unittests/client/test_runner.py b/tests/unittests/client/test_runner.py index b9ed87241..8bac16f9f 100644 --- a/tests/unittests/client/test_runner.py +++ b/tests/unittests/client/test_runner.py @@ -4,10 +4,13 @@ import copy import os import signal +import sys import time +import traceback from contextlib import contextmanager -from multiprocessing import Process +from multiprocessing import Process, Queue from threading import Thread +from wsgiref.simple_server import sys_version import pytest @@ -21,8 +24,12 @@ ) from orion.core.worker.trial import Trial from orion.executor.base import executor_factory +from orion.executor.dask_backend import HAS_DASK, Dask from orion.storage.base import LockAcquisitionTimeout -from orion.testing import create_experiment + + +def compatible(version): + return sys.version_info.major == version[0] and sys.version_info.minor >= version[1] def new_trial(value, sleep=0.01): @@ -47,9 +54,14 @@ def change_signal_handler(sig, handler): class FakeClient: """Orion mock client for Runner.""" - def __init__(self, n_workers): + def __init__(self, n_workers, backend="joblib", executor=None): self.is_done = False - self.executor = executor_factory.create("joblib", n_workers) + + if executor is None: + self.executor = executor_factory.create(backend, n_workers) + else: + self.executor = executor + self.suggest_error = WaitingForTrials self.trials = [] self.status = [] @@ -100,10 +112,10 @@ def function(lhs, sleep): return lhs + sleep -def new_runner(idle_timeout, n_workers=2, client=None): +def new_runner(idle_timeout, n_workers=2, client=None, executor=None, backend="joblib"): """Create a new runner with a mock client.""" if client is None: - client = FakeClient(n_workers) + client = FakeClient(n_workers, backend=backend, executor=executor) runner = Runner( client=client, @@ -535,3 +547,119 @@ def make_runner(n_workers, max_trials_per_worker, pool_size=None): runner.trials = 5 assert runner.should_sample() == 0, "The max number of trials was reached" runner.client.close() + + +def run_runner(reraise=False, executor=None): + try: + count = 10 + max_trials = 10 + workers = 2 + + runner = new_runner(0.1, n_workers=workers, executor=executor) + runner.max_trials_per_worker = max_trials + client = runner.client + + client.trials.extend([new_trial(i, sleep=0) for i in range(count)]) + + if executor is None: + executor = client.executor + + def set_is_done(): + time.sleep(0.05) + runner.pending_trials = dict() + runner.client.is_done = True + + start = time.time() + thread = Thread(target=set_is_done) + thread.start() + + with executor: + runner.run() + + print("done") + return 0 + except: + if reraise: + raise + + traceback.print_exc() + return 1 + + +def test_runner_inside_process(): + """Runner can execute inside a process""" + + queue = Queue() + + def get_result(results): + results.put(run_runner()) + + p = Process(target=get_result, args=(queue,)) + p.start() + p.join() + + assert queue.get() == 0 + assert p.exitcode == 0 + + +def test_runner_inside_childprocess(): + """Runner can execute inside a child process""" + pid = os.fork() + + # execute runner in the child process + if pid == 0: + run_runner() + os._exit(0) + else: + # parent process wait for child process to end + wpid, exit_status = os.wait() + assert wpid == pid + assert exit_status == 0 + + +def test_runner_inside_subprocess(): + """Runner can execute inside a subprocess""" + + import subprocess + + dir = os.path.dirname(__file__) + + result = subprocess.run( + ["python", f"{dir}/runner_subprocess.py", "--backend", "joblib"], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + assert result.stderr.decode("utf-8") == "" + assert result.stdout.decode("utf-8") == "done\n" + assert result.returncode == 0 + + +def test_runner_inside_thread(): + """Runner can execute inside a thread""" + + class GetResult: + def __init__(self) -> None: + self.r = None + + def run(self): + self.r = run_runner() + + result = GetResult() + thread = Thread(target=result.run) + thread.start() + thread.join() + + assert result.r == 0 + + +@pytest.mark.skipif(not HAS_DASK, reason="Running without dask") +def test_runner_inside_dask(): + """Runner can not execute inside a dask worker""" + + executor = Dask() + + future = executor.submit(run_runner, executor=executor, reraise=True) + + assert future.get() == 0 diff --git a/tests/unittests/core/io/test_resolve_config.py b/tests/unittests/core/io/test_resolve_config.py index ea029e490..5f8f665de 100644 --- a/tests/unittests/core/io/test_resolve_config.py +++ b/tests/unittests/core/io/test_resolve_config.py @@ -319,6 +319,10 @@ def mocked_config(file_object): assert evc_config == {} + # Test remaining config + assert config.pop("debug") is False + assert config.pop("frontends_uri") == [] + # Confirm that all fields were tested. assert config == {} diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index a2d87cc59..16a2963eb 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -192,6 +192,39 @@ def test_multisubprocess(backend): r.value +def nested(executor): + futures = [] + + for i in range(5): + futures.append(executor.submit(function, 1, 2, 3)) + + return sum([f.get() for f in futures]) + + +@pytest.mark.parametrize("backend", [Dask, SingleExecutor]) +def test_nested_submit(backend): + with backend(5) as executor: + futures = [executor.submit(nested, executor) for i in range(5)] + + results = executor.async_get(futures, timeout=2) + + for r in results: + assert r.value == 35 + + +@pytest.mark.parametrize("backend", [multiprocess, thread]) +def test_nested_submit_failure(backend): + with backend(5) as executor: + + if backend == multiprocess: + exception = NotImplementedError + elif backend == thread: + exception = TypeError + + with pytest.raises(exception): + [executor.submit(nested, executor) for i in range(5)] + + @pytest.mark.parametrize("executor", executors) def test_executors_have_default_args(executor): diff --git a/tox.ini b/tox.ini index f582b4f1d..d64034318 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,7 @@ deps = coverage commands = pip install -U {toxinidir}/tests/functional/gradient_descent_algo - coverage run --source=src --parallel-mode -m pytest --durations=50 --durations-min 1 -vv --ignore tests/functional/backward_compatibility --ignore tests/stress --timeout=180 {posargs} + coverage run --source=src --parallel-mode -m pytest --durations=50 --durations-min 1 -vv --ignore tests/functional/backward_compatibility --ignore tests/stress --timeout=360 {posargs} coverage combine coverage report -m coverage xml