Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] deployment run refactoring. bring back support for dev mode via pyins… #195

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def cancel_healthcheck_job(service: str) -> None:

def with_retries(f: t.Callable) -> t.Callable:
"""Retries decorator."""

return f
async def _call(request: Request) -> JSONResponse:
"""Call the endpoint."""
logger.info(f"Calling `{f.__name__}` with retries enabled")
Expand Down
374 changes: 374 additions & 0 deletions operate/services/deployment_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
import json
import os
import platform
import shutil
import signal
import subprocess
import sys
import time
import typing as t
from abc import ABC, ABCMeta, abstractmethod
from pathlib import Path
from venv import main as venv_cli

import psutil
from aea.__version__ import __version__ as aea_version
from autonomy.__version__ import __version__ as autonomy_version


class AbstractDeploymentRunner(ABC):
def __init__(self, work_directory: Path):
self._work_directory = work_directory

@abstractmethod
def start(self):
pass

@abstractmethod
def stop(self):
pass


def _kill_process(pid: int) -> None:
"""Kill process."""
print(f"Trying to kill process: {pid}")
while True:
if not psutil.pid_exists(pid=pid):
return
if psutil.Process(pid=pid).status() in (
psutil.STATUS_DEAD,
psutil.STATUS_ZOMBIE,
):
return
try:
os.kill(
pid,
(
signal.CTRL_C_EVENT # type: ignore
if platform.platform() == "Windows"
else signal.SIGKILL
),
)
except OSError:
return
time.sleep(1)


class BaseDeploymentRunner(AbstractDeploymentRunner, metaclass=ABCMeta):
def _run_aea(self, *args: t.List[str], cwd: Path):
return self._run_cmd(args=[self._aea_bin, *args], cwd=cwd)

@staticmethod
def _run_cmd(args: t.List[str], cwd: t.Optional[Path] = None) -> None:
"""Run command in a subprocess."""
print(f"Running: {' '.join(args)}")
# print working dir
print(f"Working dir: {os.getcwd()}")
result = subprocess.run( # pylint: disable=subprocess-run-check # nosec
args=args,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode != 0:
raise RuntimeError(
f"Error running: {args} @ {cwd}\n{result.stderr.decode()}"
)

def _prepare_agent_env(self):
working_dir = self._work_directory
env = json.loads((working_dir / "agent.json").read_text(encoding="utf-8"))
# Patch for trader agent
if "SKILL_TRADER_ABCI_MODELS_PARAMS_ARGS_STORE_PATH" in env:
data_dir = working_dir / "data"
data_dir.mkdir(exist_ok=True)
env["SKILL_TRADER_ABCI_MODELS_PARAMS_ARGS_STORE_PATH"] = str(data_dir)

# TODO: Dynamic port allocation, backport to service builder
env["CONNECTION_ABCI_CONFIG_HOST"] = "localhost"
env["CONNECTION_ABCI_CONFIG_PORT"] = "26658"

for var in env:
# Fix tendermint connection params
if var.endswith("MODELS_PARAMS_ARGS_TENDERMINT_COM_URL"):
env[var] = "http://localhost:8080"

if var.endswith("MODELS_PARAMS_ARGS_TENDERMINT_URL"):
env[var] = "http://localhost:26657"

if var.endswith("MODELS_PARAMS_ARGS_TENDERMINT_P2P_URL"):
env[var] = "localhost:26656"

if var.endswith("MODELS_BENCHMARK_TOOL_ARGS_LOG_DIR"):
benchmarks_dir = working_dir / "benchmarks"
benchmarks_dir.mkdir(exist_ok=True, parents=True)
env[var] = str(benchmarks_dir.resolve())

(working_dir / "agent.json").write_text(
json.dumps(env, indent=4),
encoding="utf-8",
)
return env

def _setup_agent(self) -> None:
"""Setup agent."""
print(1111111, "SETUP AGENT", flush=True)
working_dir = self._work_directory
env = self._prepare_agent_env()

# abin = self._aea_bin
# Fetch agent
# self._run_cmd(
# args=[
# abin,
# "init",
# "--reset",
# "--author",
# "valory",
# "--remote",
# "--ipfs",
# "--ipfs-node",
# "/dns/registry.autonolas.tech/tcp/443/https",
# ],
# cwd=working_dir,
# )
self._run_aea(
"init",
"--reset",
"--author",
"valory",
"--remote",
"--ipfs",
"--ipfs-node",
"/dns/registry.autonolas.tech/tcp/443/https",
cwd=working_dir,
)

# self._run_cmd(
# args=[
# abin,
# "fetch",
# env["AEA_AGENT"],
# "--alias",
# "agent",
# ],
# cwd=working_dir,
# )
self._run_aea("fetch", env["AEA_AGENT"], "--alias", "agent", cwd=working_dir)

# Add keys
shutil.copy(
working_dir / "ethereum_private_key.txt",
working_dir / "agent" / "ethereum_private_key.txt",
)

# self._run_cmd(
# args=[abin, "add-key", "ethereum"],
# cwd=working_dir / "agent",
# )
self._run_aea("add-key", "ethereum", cwd=working_dir / "agent")

# self._run_cmd(
# args=[abin, "issue-certificates"],
# cwd=working_dir / "agent",
# )
self._run_aea("issue-certificates", cwd=working_dir / "agent")
print(1111111, "SETUP AGENT complete", flush=True)

def start(self):
print(1111111111111111, "START _DEPLOYMENT", flush=True)
self._setup_agent()
self._start_tendermint()
self._start_agent()
print(1111111111111111, "START _DEPLOYMENT complete", flush=True)

def stop(self):
print(1111111111111111, "STOP _DEPLOYMENT", flush=True)
self._stop_agent()
self._stop_tendermint()
print(1111111111111111, "STOP _DEPLOYMENT complete", flush=True)

def _stop_agent(self) -> None:
"""Start process."""
pid = self._work_directory / "agent.pid"
if not pid.exists():
return
_kill_process(int(pid.read_text(encoding="utf-8")))

def _stop_tendermint(self) -> None:
"""Start tendermint process."""
pid = self._work_directory / "tendermint.pid"
if not pid.exists():
return
_kill_process(int(pid.read_text(encoding="utf-8")))

@abstractmethod
def _start_tendermint(self):
pass

@abstractmethod
def _start_agent(self):
pass

@property
@abstractmethod
def _aea_bin(self):
pass


class PyInstallerHostDeploymentRunner(BaseDeploymentRunner):
@property
def _aea_bin(self) -> str:
abin = str(Path(sys._MEIPASS) / "aea_bin") # type: ignore # pylint: disable=protected-access
return abin

@property
def _tendermint_bin(self) -> str:
return str(Path(sys._MEIPASS) / "tendermint")

def _start_agent(self) -> None:
"""Start agent process."""
working_dir = self._work_directory
env = json.loads((working_dir / "agent.json").read_text(encoding="utf-8"))
process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[self._aea_bin, "run"],
cwd=working_dir / "agent",
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env={**os.environ, **env},
creationflags=(
0x00000008 if platform.system() == "Windows" else 0
), # Detach process from the main process
)
(working_dir / "agent.pid").write_text(
data=str(process.pid),
encoding="utf-8",
)

def _start_tendermint(self) -> None:
"""Start tendermint process."""
working_dir = self._work_directory
env = json.loads((working_dir / "tendermint.json").read_text(encoding="utf-8"))
tendermint_com = self._tendermint_bin # type: ignore # pylint: disable=protected-access
process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[tendermint_com],
cwd=working_dir,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env={**os.environ, **env},
creationflags=(
0x00000008 if platform.system() == "Windows" else 0
), # Detach process from the main process
)
(working_dir / "tendermint.pid").write_text(
data=str(process.pid),
encoding="utf-8",
)


class HostPythonHostDeploymentRunner(BaseDeploymentRunner):
@property
def _aea_bin(self) -> str:
return str(self._venv_dir / "bin" / "aea")

def _start_agent(self) -> None:
"""Start agent process."""
working_dir = self._work_directory
env = json.loads((working_dir / "agent.json").read_text(encoding="utf-8"))
print(1111, "START AGENT", working_dir / "agent", (working_dir / "agent").exists(), flush=True)
print(1111, "START AGENT", working_dir / "agent", (working_dir / "agent").exists(), flush=True)
process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[self._aea_bin, "run"],
cwd=str(working_dir / "agent"),
#stdout=subprocess.STDOUT,
#stderr=subprocess.STDOUT,
env={**os.environ, **env},
creationflags=(
0x00000008 if platform.system() == "Windows" else 0
), # Detach process from the main process
)
(working_dir / "agent.pid").write_text(
data=str(process.pid),
encoding="utf-8",
)

def _start_tendermint(self) -> None:
"""Start tendermint process."""
working_dir = self._work_directory
env = json.loads((working_dir / "tendermint.json").read_text(encoding="utf-8"))
process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[
str(self._venv_dir / "bin" / "flask"),
"run",
"--host",
"localhost",
"--port",
"8080",
],
cwd=working_dir,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env={**os.environ, **env},
creationflags=(
0x00000008 if platform.system() == "Windows" else 0
), # Detach process from the main process
)
(working_dir / "tendermint.pid").write_text(
data=str(process.pid),
encoding="utf-8",
)

@property
def _venv_dir(self):
return self._work_directory / "venv"

def _setup_venv(self):
print("SETUP VENV", flush=True)
self._venv_dir.mkdir(exist_ok=True)
venv_cli(args=[str(self._venv_dir)])
pbin = str(self._venv_dir / "bin" / "python")
# Install agent dependencies
self._run_cmd(
args=[
pbin,
"-m",
"pip",
"install",
f"open-autonomy[all]=={autonomy_version}",
f"open-aea-ledger-ethereum=={aea_version}",
f"open-aea-ledger-ethereum-flashbots=={aea_version}",
f"open-aea-ledger-cosmos=={aea_version}",
# Install tendermint dependencies
"flask",
"requests",
],
)
print("SETUP VENV COMPLETE", flush=True)

def _setup_agent(self) -> None:
self._setup_venv()
super()._setup_agent()
# Install agent dependencies
self._run_aea("-v", "debug", "install", "--timeout", "600",
cwd=self._work_directory / "agent",
)


def _get_host_deployment_runner(build_dir: Path) -> BaseDeploymentRunner:
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
deployment_runner = PyInstallerHostDeploymentRunner(build_dir)
else:
deployment_runner = HostPythonHostDeploymentRunner(build_dir)
return deployment_runner


def run_host_deployment(build_dir: Path) -> None:
"""Run host deployment."""
deployment_runner = _get_host_deployment_runner(build_dir=build_dir)
deployment_runner.start()


def stop_host_deployment(build_dir: Path) -> None:
"""Stop host deployment."""
deployment_runner = _get_host_deployment_runner(build_dir=build_dir)
deployment_runner.stop()
2 changes: 1 addition & 1 deletion operate/services/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ async def healthcheck_job(
logging.info(
f"Error occured while checking the service health\n{traceback.format_exc()}"
)
await asyncio.sleep(30)
await asyncio.sleep(300)

def deploy_service_locally(self, hash: str, force: bool = True) -> Deployment:
"""
Expand Down
Loading
Loading