Skip to content

Commit

Permalink
add postgres executor
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-v committed Sep 8, 2023
1 parent cd2df16 commit 056484d
Show file tree
Hide file tree
Showing 6 changed files with 772 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ repos:
- id: mypy
args: ['--show-error-codes', '--warn-unused-ignores']
exclude: 'bin/|docs/|examples/|experimental/|redun_server/|setup.py'
additional_dependencies: [types-freezegun, types-python-dateutil, types-requests]
additional_dependencies: [types-freezegun, types-python-dateutil, types-requests, types-psycopg2]
5 changes: 5 additions & 0 deletions redun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
except (ImportError, ModuleNotFoundError):
# Skip gcp_batch executor if google-cloud-batch is not installed.
pass
try:
from redun.executors.postgres import PgExecutor
except (ImportError, ModuleNotFoundError):
# Skip pg executor if psycopg2 is not installed.
pass
from redun.executors.local import LocalExecutor
from redun.file import Dir, File, ShardedS3Dataset
from redun.handle import Handle
Expand Down
19 changes: 19 additions & 0 deletions redun/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,18 @@ def get_command_parser(self) -> argparse.ArgumentParser:
)
server_parser.set_defaults(func=self.server_command)

# PgExecutor worker spawner
pg_executor_parser = subparsers.add_parser(
"pg-executor-worker",
help="Start worker process for the given executor of type 'pg'.",
)
pg_executor_parser.add_argument(
"name",
help="Name or alias of the excutor",
default="default",
)
pg_executor_parser.set_defaults(func=self.pg_executor_command)

return parser

def help_command(self, args: Namespace, extra_args: List[str], argv: List[str]) -> None:
Expand Down Expand Up @@ -3096,3 +3108,10 @@ def server_command(self, args: Namespace, extra_args: List[str], argv: List[str]
env={k: str(v) for (k, v) in compose_env.items() if v is not None},
shell=True,
)

def pg_executor_command(self, args: Namespace, extra_args: List[str], argv: List[str]) -> None:
from redun import PgExecutor

key: str = "executor." + args.name
config: Config = setup_config(args.config, repo=args.pull_repo)
PgExecutor.run_worker(config[key])
Loading

0 comments on commit 056484d

Please sign in to comment.