Skip to content

Commit

Permalink
[Maestro] Add ability to install, start, and shutdown the Maestro dae…
Browse files Browse the repository at this point in the history
…mon (#112)
  • Loading branch information
geoffxy authored Nov 24, 2024
1 parent edcf453 commit eaa4ad0
Show file tree
Hide file tree
Showing 26 changed files with 949 additions and 45 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ build
dist

cond-out

src/conductor/envs/resources/*.whl
28 changes: 28 additions & 0 deletions proto/maestro.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
syntax = "proto3";

package conductor;

// Conductor's remote daemon service (Maestro).
service Maestro {
// A temporary RPC for testing purposes.
rpc Ping(PingRequest) returns (PingResponse) {}

// Tell the daemon to shut down.
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse) {}
}

message PingRequest {
string message = 1;
}

message PingResponse {
string message = 1;
}

message ShutdownRequest {
string key = 1;
}

message ShutdownResponse {
string message = 1;
}
1 change: 1 addition & 0 deletions pylintrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[MESSAGES CONTROL]
disable=R,C
enable=useless-suppression
ignore-paths=src/conductor/envs/proto_gen/
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,19 @@ exclude = '''
)/
| src/conductor/__init__.py
| src/conductor/errors/generated.py
| src/conductor/envs/proto_gen/
)
'''

[tool.mypy]
exclude = [
'src/conductor/envs/proto_gen/*',
]

[[tool.mypy.overrides]]
module = "fabric.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "grpc.*"
ignore_missing_imports = true
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@

DEV_REQUIRES = [
"black",
"grpcio-tools",
"mypy",
"pep517",
"pylint",
"pytest",
"pytest-timeout",
"pyyaml",
"setuptools",
"types-protobuf",
]

ENVS_REQUIRES = [
"fabric",
"grpcio",
"protobuf",
]

KEYWORDS = [
Expand Down
3 changes: 3 additions & 0 deletions src/conductor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,8 @@
###

MAESTRO_ROOT = ".conductor-maestro"
MAESTRO_VENV_NAME = "condenv"
MAESTRO_LOG_FILE = "maestro.log"
MAESTRO_COND_WHEEL_TEMPLATE = "conductor_cli-{version}-py3-none-any.whl"

MAESTRO_PYTHON_VERSION = "3.10.12"
103 changes: 81 additions & 22 deletions src/conductor/envs/install_maestro.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import pathlib
import importlib.resources as pkg_resources

from fabric.connection import Connection

import conductor
import conductor.envs.resources as env_resources
from conductor.config import MAESTRO_ROOT, MAESTRO_PYTHON_VERSION
from conductor.config import (
MAESTRO_PYTHON_VERSION,
MAESTRO_COND_WHEEL_TEMPLATE,
MAESTRO_VENV_NAME,
)
from conductor.errors import MaestroInstallError


def install_maestro(c: Connection) -> None:
def ensure_maestro_installed(c: Connection, maestro_root: pathlib.Path) -> None:
"""
Installs Maestro in the remote environment connected to by `c`.
Ensures that Maestro is installed in the remote environment connected to by
the connection `c`.
"""

# General strategy:
# - Keep all Maestro-related code under a special root directory, specified
# using `MAESTRO_ROOT`. The assumption is that `MAESTRO_ROOT` is stored
Expand All @@ -27,32 +35,32 @@ def install_maestro(c: Connection) -> None:
# this works in a generic EC2 environment and inside a Docker container.

try:
installer = _MaestroInstaller.create(c)
installer = _MaestroInstaller(c, maestro_root)
installer.ensure_root_dir()
installer.ensure_pyenv_installed()
installer.install_python()
installer.ensure_virtualenv()
installer.install_conductor_wheel()
except Exception as ex:
raise MaestroInstallError(error_msg=str(ex)) from ex
raise MaestroInstallError(error_message=str(ex)) from ex


class _MaestroInstaller:
@classmethod
def create(cls, c: Connection) -> "_MaestroInstaller":
result = c.run("echo $HOME")
home_dir = result.stdout.strip()
return cls(c, home_dir)

def __init__(self, c: Connection, home_dir: str) -> None:
def __init__(self, c: Connection, maestro_root: pathlib.Path) -> None:
self._c = c
# This is an absolute path in the remote environment.
self._maestro_root = f"{home_dir}/{MAESTRO_ROOT}"
self._pyenv_root = f"{self._maestro_root}/pyenv"
self._maestro_root = maestro_root
self._pyenv_root = self._maestro_root / "pyenv"
self._pyenv_env = {
"PYENV_ROOT": str(self._pyenv_root),
"PYENV_VERSION": MAESTRO_PYTHON_VERSION,
}

def ensure_root_dir(self) -> None:
self._c.run(f"mkdir -p {self._maestro_root}")
self._c.run(f"mkdir -p {str(self._maestro_root)}")

def ensure_pyenv_installed(self) -> None:
result = self._c.run(f"ls {self._pyenv_root}", warn=True, hide="both")
result = self._c.run(f"ls {str(self._pyenv_root)}", warn=True, hide="both")
if result.ok:
# Assume install succeeded.
return
Expand All @@ -63,21 +71,72 @@ def ensure_pyenv_installed(self) -> None:
"install_pyenv.sh"
)
with pkg_resources.as_file(install_script) as path:
self._c.put(path, f"{self._maestro_root}/install_pyenv.sh")
install_script_file = self._maestro_root / "install_pyenv.sh"
self._c.put(path, str(install_script_file))
# We want a custom install path to avoid interfering with the existing environment.
self._c.run(
f"bash {self._maestro_root}/install_pyenv.sh",
env={"PYENV_ROOT": self._pyenv_root},
f"bash {str(install_script_file)}",
env=self._pyenv_env,
hide="both",
)
except Exception:
# If the installation failed, remove the directory so that we will
# retry next time.
self._c.run(f"rm -r {self._pyenv_root}")
self._c.run(f"rm -r {str(self._pyenv_root)}")
raise

def install_python(self) -> None:
pyenv_exec = self._pyenv_root / "bin" / "pyenv"
result = self._c.run(
f"{str(pyenv_exec)} versions --bare",
hide="both",
env=self._pyenv_env,
)
if result.ok and MAESTRO_PYTHON_VERSION in result.stdout:
# Assume install succeeded.
return
self._c.run(
f"{str(pyenv_exec)} install {MAESTRO_PYTHON_VERSION}",
env=self._pyenv_env,
)

def ensure_virtualenv(self) -> None:
# Check if the virtualenv already exists.
venv_location = self._maestro_root / MAESTRO_VENV_NAME
result = self._c.run(f"ls {str(venv_location)}", warn=True, hide="both")
if result.ok:
# Assume the venv exists.
return
pyenv_exec = self._pyenv_root / "bin" / "pyenv"
self._c.run(
f"{str(pyenv_exec)} exec python3 -m venv {venv_location}",
env=self._pyenv_env,
)

def install_conductor_wheel(self) -> None:
# Check if Conductor is already installed.
venv_bin = self._maestro_root / MAESTRO_VENV_NAME / "bin"
venv_python = venv_bin / "python3"
result = self._c.run(
f"{str(venv_python)} -c 'import conductor; print(conductor.__version__)'",
warn=True,
hide="both",
)
if result.ok:
installed_version = result.stdout.strip()
if installed_version == conductor.__version__:
return
# Otherwise, we need to reinstall.

wheel_file = MAESTRO_COND_WHEEL_TEMPLATE.format(version=conductor.__version__)
# Transfer the wheel.
wheel = pkg_resources.files(env_resources).joinpath(wheel_file)
wheel_path = self._maestro_root / wheel_file
with pkg_resources.as_file(wheel) as path:
self._c.put(path, str(wheel_path))
# Install.
venv_pip3 = venv_bin / "pip3"
self._c.run(
f"{self._pyenv_root}/bin/pyenv install {MAESTRO_PYTHON_VERSION}",
env={"PYENV_ROOT": self._pyenv_root},
f"{str(venv_pip3)} install '{str(wheel_path)}[envs]'",
hide="both",
)
5 changes: 0 additions & 5 deletions src/conductor/envs/maestro.py

This file was deleted.

Empty file.
57 changes: 57 additions & 0 deletions src/conductor/envs/maestro/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import grpc
from typing import Optional
import conductor.envs.proto_gen.maestro_pb2 as pb
import conductor.envs.proto_gen.maestro_pb2_grpc as maestro_grpc


class MaestroGrpcClient:
"""
A wrapper over Maestro's gRPC stub, to simplify programmatic access through
Python.
This client is a thin wrapper over the gRPC stub. Methods on this client are
synchronous.
Usage:
```
with MaestroGrpcClient(host, port) as client:
# RPC calls here.
pass
```
"""

def __init__(self, host: str, port: int):
self._host = host
self._port = port
self._channel = None
self._stub: Optional[maestro_grpc.MaestroStub] = None

def __enter__(self):
self.connect()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

def connect(self) -> None:
self._channel = grpc.insecure_channel("{}:{}".format(self._host, self._port))
self._stub = maestro_grpc.MaestroStub(self._channel)

def ping(self, message: str) -> str:
assert self._stub is not None
# pylint: disable-next=no-member
msg = pb.PingRequest(message=message)
return self._stub.Ping(msg).message

def shutdown(self, key: str) -> str:
assert self._stub is not None
# pylint: disable-next=no-member
msg = pb.ShutdownRequest(key=key)
return self._stub.Shutdown(msg).message

def close(self) -> None:
assert self._stub is not None
assert self._channel is not None
self._stub = None
self._channel.close()
self._channel = None
35 changes: 35 additions & 0 deletions src/conductor/envs/maestro/daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import logging
import pathlib
import subprocess
from conductor.envs.maestro.interface import MaestroInterface

logger = logging.getLogger(__name__)


class Maestro(MaestroInterface):
"""
Maestro is Conductor's remote daemon. It runs within a user-defined
environment and executes tasks when requested by the main Conductor process.
"""

def __init__(self, maestro_root: pathlib.Path) -> None:
self._maestro_root = maestro_root

async def ping(self, message: str) -> str:
logger.info("Received ping message: %s", message)
result = subprocess.run(["uname", "-a"], capture_output=True, check=False)
return result.stdout.decode("utf-8").strip()

async def shutdown(self, key: str) -> str:
logger.info("Received shutdown message with key %s", key)
loop = asyncio.get_running_loop()
loop.create_task(_orchestrate_shutdown())
return "OK"


async def _orchestrate_shutdown() -> None:
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
27 changes: 27 additions & 0 deletions src/conductor/envs/maestro/grpc_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import conductor.envs.proto_gen.maestro_pb2_grpc as rpc
import conductor.envs.proto_gen.maestro_pb2 as pb
from conductor.envs.maestro.interface import MaestroInterface

# pylint: disable=no-member
# See https://github.com/protocolbuffers/protobuf/issues/10372

# pylint: disable=invalid-overridden-method


class MaestroGrpc(rpc.MaestroServicer):
"""
A shim layer used to implement Maestro's gRPC interface.
"""

def __init__(self, maestro: MaestroInterface) -> None:
self._maestro = maestro

async def Ping(self, request: pb.PingRequest, context) -> pb.PingResponse:
response_message = await self._maestro.ping(request.message)
return pb.PingResponse(message=response_message)

async def Shutdown(
self, request: pb.ShutdownRequest, context
) -> pb.ShutdownResponse:
response_message = await self._maestro.shutdown(request.key)
return pb.ShutdownResponse(message=response_message)
11 changes: 11 additions & 0 deletions src/conductor/envs/maestro/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class MaestroInterface:
"""
Captures the RPC interface for Maestro. We use this interface to separate
the gRPC implementation details from Maestro.
"""

async def ping(self, message: str) -> str:
raise NotImplementedError

async def shutdown(self, key: str) -> str:
raise NotImplementedError
Loading

0 comments on commit eaa4ad0

Please sign in to comment.