diff --git a/python/res/job_queue/workflow_runner.py b/python/res/job_queue/workflow_runner.py index bb1231d86a..efdd41eb6a 100644 --- a/python/res/job_queue/workflow_runner.py +++ b/python/res/job_queue/workflow_runner.py @@ -1,7 +1,6 @@ -from threading import Thread from res.job_queue import Workflow from res.util.substitution_list import SubstitutionList - +from concurrent import futures class WorkflowRunner(object): def __init__(self, workflow, ert=None, context=None): @@ -20,12 +19,22 @@ def __init__(self, workflow, ert=None, context=None): self.__context = context self.__workflow_result = None + self._workflow_executor = futures.ThreadPoolExecutor(max_workers=1) + self._workflow_job = None + + def __enter__(self): + self.run() + return self + + def __exit__(self, type, value, traceback): + self.wait() def run(self): - workflow_thread = Thread(name="ert_gui_workflow_thread") - workflow_thread.setDaemon(True) - workflow_thread.run = self.__runWorkflow - workflow_thread.start() + if self.isRunning(): + raise AssertionError('An instance of workflow is already running!') + else: + self._workflow_job = self._workflow_executor.submit(self.__runWorkflow) + def __runWorkflow(self): self.__workflow_result = self.__workflow.run(self.__ert, context=self.__context) @@ -41,9 +50,16 @@ def isCancelled(self): def cancel(self): if self.isRunning(): self.__workflow.cancel() + self.wait() + + def exception(self): + if self._workflow_job is not None: + return self._workflow_job._exception + return None def wait(self): - self.__workflow.wait() + # This returns a tuple (done, pending), since we run only one job we don't need to use it + _, _ = futures.wait([self._workflow_job], timeout=None, return_when=futures.FIRST_EXCEPTION) def workflowResult(self): """ @rtype: bool or None """ diff --git a/python/tests/res/job_queue/test_workflow_runner.py b/python/tests/res/job_queue/test_workflow_runner.py index 7afc12b622..ea023493d5 100644 --- a/python/tests/res/job_queue/test_workflow_runner.py +++ b/python/tests/res/job_queue/test_workflow_runner.py @@ -5,7 +5,11 @@ from ecl.util.test import TestAreaContext from res.util.substitution_list import SubstitutionList from .workflow_common import WorkflowCommon - +import sys +if sys.version_info >= (3, 3): + from unittest.mock import patch +else: + from mock import patch class WorkflowRunnerTest(ResTest): @@ -26,32 +30,31 @@ def test_workflow_thread_cancel_ert_script(self): self.assertFalse(workflow_runner.isRunning()) - workflow_runner.run() - - self.assertIsNone(workflow_runner.workflowResult()) + with workflow_runner: + self.assertIsNone(workflow_runner.workflowResult()) - time.sleep(1) # wait for workflow to start - self.assertTrue(workflow_runner.isRunning()) - self.assertFileExists("wait_started_0") + time.sleep(1) # wait for workflow to start + self.assertTrue(workflow_runner.isRunning()) + self.assertFileExists("wait_started_0") - time.sleep(1) # wait for first job to finish + time.sleep(1) # wait for first job to finish - workflow_runner.cancel() - time.sleep(1) # wait for cancel to take effect - self.assertFileExists("wait_finished_0") + workflow_runner.cancel() + time.sleep(1) # wait for cancel to take effect + self.assertFileExists("wait_finished_0") - self.assertFileExists("wait_started_1") - self.assertFileExists("wait_cancelled_1") - self.assertFileDoesNotExist("wait_finished_1") + self.assertFileExists("wait_started_1") + self.assertFileExists("wait_cancelled_1") + self.assertFileDoesNotExist("wait_finished_1") - self.assertTrue(workflow_runner.isCancelled()) + self.assertTrue(workflow_runner.isCancelled()) - workflow_runner.wait() # wait for runner to complete + workflow_runner.wait() # wait for runner to complete - self.assertFileDoesNotExist("wait_started_2") - self.assertFileDoesNotExist("wait_cancelled_2") - self.assertFileDoesNotExist("wait_finished_2") + self.assertFileDoesNotExist("wait_started_2") + self.assertFileDoesNotExist("wait_cancelled_2") + self.assertFileDoesNotExist("wait_finished_2") @@ -73,29 +76,45 @@ def test_workflow_thread_cancel_external(self): self.assertFalse(workflow_runner.isRunning()) - workflow_runner.run() + with workflow_runner: + time.sleep(1) # wait for workflow to start + self.assertTrue(workflow_runner.isRunning()) + self.assertFileExists("wait_started_0") + + time.sleep(1) # wait for first job to finish + + workflow_runner.cancel() + time.sleep(1) # wait for cancel to take effect + self.assertFileExists("wait_finished_0") + + self.assertFileExists("wait_started_1") + self.assertFileDoesNotExist("wait_finished_1") - time.sleep(1) # wait for workflow to start - self.assertTrue(workflow_runner.isRunning()) - self.assertFileExists("wait_started_0") + self.assertTrue(workflow_runner.isCancelled()) - time.sleep(1) # wait for first job to finish + workflow_runner.wait() # wait for runner to complete - workflow_runner.cancel() - time.sleep(1) # wait for cancel to take effect - self.assertFileExists("wait_finished_0") + self.assertFileDoesNotExist("wait_started_2") + self.assertFileDoesNotExist("wait_cancelled_2") + self.assertFileDoesNotExist("wait_finished_2") - self.assertFileExists("wait_started_1") - self.assertFileDoesNotExist("wait_finished_1") - self.assertTrue(workflow_runner.isCancelled()) + def test_workflow_failed_job(self): + with TestAreaContext("python/job_queue/workflow_runner_fails") as work_area: + WorkflowCommon.createExternalDumpJob() - workflow_runner.wait() # wait for runner to complete + joblist = WorkflowJoblist() + self.assertTrue(joblist.addJobFromFile("DUMP", "dump_failing_job")) + workflow = Workflow("dump_workflow", joblist) + self.assertEqual(len(workflow), 2) - self.assertFileDoesNotExist("wait_started_2") - self.assertFileDoesNotExist("wait_cancelled_2") - self.assertFileDoesNotExist("wait_finished_2") + workflow_runner = WorkflowRunner(workflow, ert=None, context=SubstitutionList()) + self.assertFalse(workflow_runner.isRunning()) + with patch.object(Workflow, 'run', side_effect=Exception('mocked workflow error')), \ + workflow_runner: + workflow_runner.wait() + self.assertNotEqual(workflow_runner.exception(), None) def test_workflow_success(self): with TestAreaContext("python/job_queue/workflow_runner_fast") as work_area: @@ -113,17 +132,16 @@ def test_workflow_success(self): workflow_runner = WorkflowRunner(workflow, ert=None, context=SubstitutionList()) self.assertFalse(workflow_runner.isRunning()) + with workflow_runner: + time.sleep(1) # wait for workflow to start + workflow_runner.wait() - workflow_runner.run() - time.sleep(1) # wait for workflow to start - workflow_runner.wait() - - self.assertFileExists("wait_started_0") - self.assertFileDoesNotExist("wait_cancelled_0") - self.assertFileExists("wait_finished_0") + self.assertFileExists("wait_started_0") + self.assertFileDoesNotExist("wait_cancelled_0") + self.assertFileExists("wait_finished_0") - self.assertFileExists("wait_started_1") - self.assertFileDoesNotExist("wait_cancelled_1") - self.assertFileExists("wait_finished_1") + self.assertFileExists("wait_started_1") + self.assertFileDoesNotExist("wait_cancelled_1") + self.assertFileExists("wait_finished_1") - self.assertTrue(workflow_runner.workflowResult()) + self.assertTrue(workflow_runner.workflowResult()) diff --git a/requirements.txt b/requirements.txt index dd6d9e8fbc..041277a20b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ requests functools32;python_version=='2.7' psutil configsuite +futures