Skip to content

Commit

Permalink
[Execution] Implement and use the Operation-based executor (#110)
Browse files Browse the repository at this point in the history
Part of #99.
  • Loading branch information
geoffxy authored Nov 17, 2024
1 parent 9b3af85 commit 99b6982
Show file tree
Hide file tree
Showing 11 changed files with 606 additions and 82 deletions.
4 changes: 4 additions & 0 deletions errors/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@
name: MaestroInstallError
message: "An error occurred when installing Maestro in a remote environment: {error_message}"

3006:
name: CombineOutputFileConflict
message: "The output file or directory '{output_file}' already exists and cannot be overwritten by the combine() task."


# Archive and restore errors (error code 4xxx)
4001:
Expand Down
11 changes: 6 additions & 5 deletions src/conductor/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
AtLeastCommitNotAncestor,
)
from conductor.task_identifier import TaskIdentifier
from conductor.execution.executor import Executor
from conductor.execution.plan import ExecutionPlan
from conductor.execution.executor2 import Executor2
from conductor.execution.planning.planner import ExecutionPlanner
from conductor.utils.user_code import cli_command
from conductor.utils.colored_output import print_bold

Expand Down Expand Up @@ -151,8 +151,9 @@ def main(args):
if not commit_is_ancestor:
raise AtLeastCommitNotAncestor()

plan = ExecutionPlan.for_task(
task_identifier, ctx, run_again=args.again, at_least_commit=commit
ep = ExecutionPlanner(ctx)
plan = ep.create_plan_for(
task_identifier, run_again=args.again, at_least_commit=commit
)
executor = Executor(execution_slots=num_jobs)
executor = Executor2(execution_slots=num_jobs)
executor.run_plan(plan, ctx, stop_on_first_error=args.stop_early)
14 changes: 14 additions & 0 deletions src/conductor/errors/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,19 @@ def _message(self):
)


class CombineOutputFileConflict(ConductorError):
error_code = 3006

def __init__(self, **kwargs):
super().__init__()
self.output_file = kwargs["output_file"]

def _message(self):
return "The output file or directory '{output_file}' already exists and cannot be overwritten by the combine() task.".format(
output_file=self.output_file,
)


class OutputFileExists(ConductorError):
error_code = 4001

Expand Down Expand Up @@ -650,6 +663,7 @@ def _message(self):
"OutputDirTaken",
"ConductorAbort",
"MaestroInstallError",
"CombineOutputFileConflict",
"OutputFileExists",
"OutputPathDoesNotExist",
"NoTaskOutputsToArchive",
Expand Down
Loading

0 comments on commit 99b6982

Please sign in to comment.