Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Commit

Permalink
Add ThreadPoolExecutor to workflowrunner
Browse files Browse the repository at this point in the history
- ThreadPoolExecutor replacing Thread in workflow runner
- mock worflow.run to throw exception
- create ThreadPoolExecutor in __init__
- replacing underscores and clean up
- add futures to requirements
- raise AssertionError if workflow is running when starting
  • Loading branch information
xjules committed Jan 13, 2020
1 parent 1365e5f commit 33aa63b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 52 deletions.
30 changes: 23 additions & 7 deletions python/res/job_queue/workflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from threading import Thread
from res.job_queue import Workflow
from res.util.substitution_list import SubstitutionList

from concurrent import futures

class WorkflowRunner(object):
def __init__(self, workflow, ert=None, context=None):
Expand All @@ -20,12 +19,22 @@ def __init__(self, workflow, ert=None, context=None):

self.__context = context
self.__workflow_result = None
self._workflow_executor = futures.ThreadPoolExecutor(max_workers=1)
self._workflow_job = None

def __enter__(self):
self.run()
return self

def __exit__(self, type, value, traceback):
self.wait()

def run(self):
workflow_thread = Thread(name="ert_gui_workflow_thread")
workflow_thread.setDaemon(True)
workflow_thread.run = self.__runWorkflow
workflow_thread.start()
if self.isRunning():
raise AssertionError('An instance of workflow is already running!')
else:
self._workflow_job = self._workflow_executor.submit(self.__runWorkflow)


def __runWorkflow(self):
self.__workflow_result = self.__workflow.run(self.__ert, context=self.__context)
Expand All @@ -41,9 +50,16 @@ def isCancelled(self):
def cancel(self):
if self.isRunning():
self.__workflow.cancel()
self.wait()

def exception(self):
if self._workflow_job is not None:
return self._workflow_job._exception
return None

def wait(self):
self.__workflow.wait()
# This returns a tuple (done, pending), since we run only one job we don't need to use it
_, _ = futures.wait([self._workflow_job], timeout=None, return_when=futures.FIRST_EXCEPTION)

def workflowResult(self):
""" @rtype: bool or None """
Expand Down
108 changes: 63 additions & 45 deletions python/tests/res/job_queue/test_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from ecl.util.test import TestAreaContext
from res.util.substitution_list import SubstitutionList
from .workflow_common import WorkflowCommon

import sys
if sys.version_info >= (3, 3):
from unittest.mock import patch
else:
from mock import patch

class WorkflowRunnerTest(ResTest):

Expand All @@ -26,32 +30,31 @@ def test_workflow_thread_cancel_ert_script(self):

self.assertFalse(workflow_runner.isRunning())

workflow_runner.run()

self.assertIsNone(workflow_runner.workflowResult())
with workflow_runner:
self.assertIsNone(workflow_runner.workflowResult())

time.sleep(1) # wait for workflow to start
self.assertTrue(workflow_runner.isRunning())
self.assertFileExists("wait_started_0")
time.sleep(1) # wait for workflow to start
self.assertTrue(workflow_runner.isRunning())
self.assertFileExists("wait_started_0")

time.sleep(1) # wait for first job to finish
time.sleep(1) # wait for first job to finish

workflow_runner.cancel()
time.sleep(1) # wait for cancel to take effect
self.assertFileExists("wait_finished_0")
workflow_runner.cancel()
time.sleep(1) # wait for cancel to take effect
self.assertFileExists("wait_finished_0")


self.assertFileExists("wait_started_1")
self.assertFileExists("wait_cancelled_1")
self.assertFileDoesNotExist("wait_finished_1")
self.assertFileExists("wait_started_1")
self.assertFileExists("wait_cancelled_1")
self.assertFileDoesNotExist("wait_finished_1")

self.assertTrue(workflow_runner.isCancelled())
self.assertTrue(workflow_runner.isCancelled())

workflow_runner.wait() # wait for runner to complete
workflow_runner.wait() # wait for runner to complete

self.assertFileDoesNotExist("wait_started_2")
self.assertFileDoesNotExist("wait_cancelled_2")
self.assertFileDoesNotExist("wait_finished_2")
self.assertFileDoesNotExist("wait_started_2")
self.assertFileDoesNotExist("wait_cancelled_2")
self.assertFileDoesNotExist("wait_finished_2")



Expand All @@ -73,29 +76,45 @@ def test_workflow_thread_cancel_external(self):

self.assertFalse(workflow_runner.isRunning())

workflow_runner.run()
with workflow_runner:
time.sleep(1) # wait for workflow to start
self.assertTrue(workflow_runner.isRunning())
self.assertFileExists("wait_started_0")

time.sleep(1) # wait for first job to finish

workflow_runner.cancel()
time.sleep(1) # wait for cancel to take effect
self.assertFileExists("wait_finished_0")

self.assertFileExists("wait_started_1")
self.assertFileDoesNotExist("wait_finished_1")

time.sleep(1) # wait for workflow to start
self.assertTrue(workflow_runner.isRunning())
self.assertFileExists("wait_started_0")
self.assertTrue(workflow_runner.isCancelled())

time.sleep(1) # wait for first job to finish
workflow_runner.wait() # wait for runner to complete

workflow_runner.cancel()
time.sleep(1) # wait for cancel to take effect
self.assertFileExists("wait_finished_0")
self.assertFileDoesNotExist("wait_started_2")
self.assertFileDoesNotExist("wait_cancelled_2")
self.assertFileDoesNotExist("wait_finished_2")

self.assertFileExists("wait_started_1")
self.assertFileDoesNotExist("wait_finished_1")

self.assertTrue(workflow_runner.isCancelled())
def test_workflow_failed_job(self):
with TestAreaContext("python/job_queue/workflow_runner_fails") as work_area:
WorkflowCommon.createExternalDumpJob()

workflow_runner.wait() # wait for runner to complete
joblist = WorkflowJoblist()
self.assertTrue(joblist.addJobFromFile("DUMP", "dump_failing_job"))
workflow = Workflow("dump_workflow", joblist)
self.assertEqual(len(workflow), 2)

self.assertFileDoesNotExist("wait_started_2")
self.assertFileDoesNotExist("wait_cancelled_2")
self.assertFileDoesNotExist("wait_finished_2")
workflow_runner = WorkflowRunner(workflow, ert=None, context=SubstitutionList())

self.assertFalse(workflow_runner.isRunning())
with patch.object(Workflow, 'run', side_effect=Exception('mocked workflow error')), \
workflow_runner:
workflow_runner.wait()
self.assertNotEqual(workflow_runner.exception(), None)

def test_workflow_success(self):
with TestAreaContext("python/job_queue/workflow_runner_fast") as work_area:
Expand All @@ -113,17 +132,16 @@ def test_workflow_success(self):
workflow_runner = WorkflowRunner(workflow, ert=None, context=SubstitutionList())

self.assertFalse(workflow_runner.isRunning())
with workflow_runner:
time.sleep(1) # wait for workflow to start
workflow_runner.wait()

workflow_runner.run()
time.sleep(1) # wait for workflow to start
workflow_runner.wait()

self.assertFileExists("wait_started_0")
self.assertFileDoesNotExist("wait_cancelled_0")
self.assertFileExists("wait_finished_0")
self.assertFileExists("wait_started_0")
self.assertFileDoesNotExist("wait_cancelled_0")
self.assertFileExists("wait_finished_0")

self.assertFileExists("wait_started_1")
self.assertFileDoesNotExist("wait_cancelled_1")
self.assertFileExists("wait_finished_1")
self.assertFileExists("wait_started_1")
self.assertFileDoesNotExist("wait_cancelled_1")
self.assertFileExists("wait_finished_1")

self.assertTrue(workflow_runner.workflowResult())
self.assertTrue(workflow_runner.workflowResult())
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ requests
functools32;python_version=='2.7'
psutil
configsuite
futures

0 comments on commit 33aa63b

Please sign in to comment.