-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(agent): Basic agents deployment in instances
1 parent
b13cfce
commit 8e20f5c
Showing
5 changed files
with
437 additions
and
447 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,128 +1,123 @@ | ||
import json | ||
import os | ||
import zipfile | ||
from typing import Annotated | ||
|
||
import aiohttp | ||
import pathspec | ||
import typer | ||
from docker import client # type: ignore | ||
from docker.models.containers import Container # type: ignore | ||
from dotenv import dotenv_values | ||
from libertai_utils.aleph.program import get_vm_url | ||
from rich.console import Console | ||
from rich.progress import Progress, TextColumn, SpinnerColumn, TimeElapsedColumn | ||
|
||
from libertai_client.config import config | ||
from libertai_client.interfaces.agent import DockerCommand, UpdateAgentResponse | ||
from libertai_client.utils.agent import parse_agent_config_env | ||
from libertai_client.utils.rich import TaskOfTotalColumn, TEXT_PROGRESS_FORMAT | ||
from libertai_client.utils.system import get_full_path | ||
from libertai_client.utils.typer import AsyncTyper | ||
|
||
app = typer.Typer(name="agent", help="Deploy and manage agents") | ||
app = AsyncTyper(name="agent", help="Deploy and manage agents") | ||
|
||
err_console = Console(stderr=True) | ||
|
||
AGENT_ZIP_BLACKLIST = [".git", ".idea", ".vscode"] | ||
AGENT_ZIP_WHITELIST = [".env"] | ||
|
||
|
||
def create_agent_zip(src_dir: str, zip_name: str): | ||
# Read and parse the .gitignore file | ||
with open(get_full_path(src_dir, ".gitignore"), 'r') as gitignore_file: | ||
gitignore_patterns = gitignore_file.read() | ||
spec = pathspec.PathSpec.from_lines('gitwildmatch', gitignore_patterns.splitlines() + AGENT_ZIP_BLACKLIST) | ||
|
||
with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zipf: | ||
for root, _, files in os.walk(src_dir): | ||
for file in files: | ||
file_path = os.path.join(root, file) | ||
relative_path = os.path.relpath(file_path, src_dir) | ||
|
||
# Check if the file matches any .gitignore pattern | ||
if not spec.match_file(relative_path) or relative_path in AGENT_ZIP_WHITELIST: | ||
zipf.write(file_path, arcname=relative_path) | ||
|
||
|
||
@app.command() | ||
def deploy(path: Annotated[str, typer.Option(help="Path to the root of your repository", prompt=True)] = ".", | ||
code_path: Annotated[ | ||
str, typer.Option(help="Path to the package that contains the code", prompt=True)] = "./src"): | ||
async def deploy(path: Annotated[str, typer.Option(help="Path to the root of your repository", prompt=True)] = ".", | ||
python_version: Annotated[str, typer.Option(help="Version to deploy with", prompt=True)] = "3.11"): | ||
""" | ||
Deploy or redeploy an agent | ||
""" | ||
|
||
try: | ||
requirements_path = get_full_path(path, "requirements.txt") | ||
libertai_env_path = get_full_path(path, ".env.libertai") | ||
code_path = get_full_path(code_path) | ||
except FileNotFoundError as error: | ||
err_console.print(f"[red]{error}") | ||
raise typer.Exit(1) | ||
|
||
env_values: dict[str, str | None] = {} | ||
try: | ||
env_path = get_full_path(path, ".env") | ||
env_values = dict(dotenv_values(env_path)) | ||
except FileNotFoundError: | ||
pass | ||
|
||
# Double dumps to escape the double quotes for when the command is passed in /bin/bash -c "command" | ||
encoded_env_values = json.dumps(json.dumps(env_values))[1:-1] | ||
|
||
try: | ||
libertai_config = parse_agent_config_env(dotenv_values(libertai_env_path)) | ||
except EnvironmentError as error: | ||
except (FileNotFoundError, EnvironmentError) as error: | ||
err_console.print(f"[red]{error}") | ||
raise typer.Exit(1) | ||
|
||
commands: list[DockerCommand] = [ | ||
DockerCommand(id="update-system", title="Updating system packages", content="apt-get update"), | ||
DockerCommand(id="install-deps", title="Installing system dependencies", | ||
# TODO: make sure we are using the right version of python in docker, and maybe use a venv for safety | ||
content="apt-get install python3-pip squashfs-tools curl -y"), | ||
DockerCommand(id="install-packages", title="Installing agent packages", | ||
content="pip install -t /opt/packages -r /opt/requirements.txt"), | ||
DockerCommand(id="archive-packages", title="Generating agent packages archive", | ||
content="mksquashfs /opt/packages /opt/packages.squashfs -noappend"), | ||
DockerCommand(id="archive-code", title="Generating agent code archive", | ||
content="mksquashfs /opt/code /opt/code.squashfs -noappend"), | ||
DockerCommand(id="call-backend", title="Uploading to Aleph and creating the agent VM", | ||
content=f"""curl --no-progress-meter --fail-with-body -X 'PUT' \ | ||
'{config.AGENTS_BACKEND_URL}/agent/{libertai_config.id}' \ | ||
-H 'accept: application/json' \ | ||
-H 'Content-Type: multipart/form-data' \ | ||
-F 'secret="{libertai_config.secret}"' \ | ||
-F 'env_variables={encoded_env_values}' \ | ||
-F code=@/opt/code.squashfs \ | ||
-F packages=@/opt/packages.squashfs \ | ||
2>/dev/null; | ||
""") | ||
] | ||
|
||
# Setup | ||
with Progress(TextColumn(TEXT_PROGRESS_FORMAT), | ||
SpinnerColumn(finished_text="✔ ")) as progress: | ||
setup_task_text = "Starting Docker container" | ||
task = progress.add_task(f"{setup_task_text}", start=True, total=1) | ||
docker_client = client.from_env() | ||
container: Container = docker_client.containers.run("debian:bookworm", platform="linux/amd64", tty=True, | ||
detach=True, volumes={ | ||
requirements_path: {'bind': '/opt/requirements.txt', 'mode': 'ro'}, | ||
code_path: {'bind': '/opt/code', 'mode': 'ro'} | ||
}) | ||
progress.update(task, description=f"[green]{setup_task_text}", advance=1) | ||
|
||
agent_result: str | None = None | ||
error_message: str | None = None | ||
|
||
with Progress(TaskOfTotalColumn(len(commands)), TextColumn(TEXT_PROGRESS_FORMAT), | ||
SpinnerColumn(finished_text="✔ "), | ||
TimeElapsedColumn()) as progress: | ||
for command in commands: | ||
task = progress.add_task(f"{command.title}", start=True, total=1) | ||
result = container.exec_run(f'/bin/bash -c "{command.content}"') | ||
|
||
if result.exit_code != 0: | ||
command_output = result.output.decode().strip('\n') | ||
error_message = f"\n[red]Docker command error: '{command_output}'" | ||
break | ||
|
||
if command.id == "call-backend": | ||
agent_result = result.output.decode() | ||
progress.update(task, description=f"[green]{command.title}", advance=1) | ||
progress.stop_task(task) | ||
|
||
if error_message is not None: | ||
err_console.print(error_message) | ||
|
||
# Cleanup | ||
with Progress(TextColumn(TEXT_PROGRESS_FORMAT), | ||
SpinnerColumn(finished_text="✔ ")) as progress: | ||
stop_task_text = "Stopping and removing container" | ||
task = progress.add_task(f"{stop_task_text}", start=True, total=1) | ||
container.stop() | ||
container.remove() | ||
progress.update(task, description=f"[green]{stop_task_text}", advance=1) | ||
|
||
if agent_result is not None: | ||
agent_data = UpdateAgentResponse(**json.loads(agent_result)) | ||
print(f"Agent successfully deployed on {get_vm_url(agent_data.vm_hash)}") | ||
else: | ||
typer.Exit(1) | ||
create_agent_zip(path, "/tmp/libertai-agent.zip") | ||
|
||
data = aiohttp.FormData() | ||
data.add_field('secret', libertai_config.secret) | ||
data.add_field('python_version', python_version) | ||
data.add_field('package_manager', "poetry") # TODO: detect/ask user | ||
data.add_field('code', open('/tmp/libertai-agent.zip', 'rb'), filename='libertai-agent.zip') | ||
|
||
async with aiohttp.ClientSession() as session: | ||
async with session.put(f"{config.AGENTS_BACKEND_URL}/agent/{libertai_config.id}", | ||
headers={'accept': 'application/json'}, | ||
data=data) as response: | ||
if response.status == 200: | ||
print("Request succeeded:", await response.text()) | ||
else: | ||
print(f"Request failed: {response.status}") | ||
print(await response.text()) | ||
|
||
os.remove("/tmp/libertai-agent.zip") | ||
|
||
# with Progress(TextColumn(TEXT_PROGRESS_FORMAT), | ||
# SpinnerColumn(finished_text="✔ ")) as progress: | ||
# setup_task_text = "Starting Docker container" | ||
# task = progress.add_task(f"{setup_task_text}", start=True, total=1) | ||
# docker_client = client.from_env() | ||
# container: Container = docker_client.containers.run("debian:bookworm", platform="linux/amd64", tty=True, | ||
# detach=True, volumes={ | ||
# requirements_path: {'bind': '/opt/requirements.txt', 'mode': 'ro'}, | ||
# code_path: {'bind': '/opt/code', 'mode': 'ro'} | ||
# }) | ||
# progress.update(task, description=f"[green]{setup_task_text}", advance=1) | ||
# | ||
# agent_result: str | None = None | ||
# error_message: str | None = None | ||
# | ||
# with Progress(TaskOfTotalColumn(len(commands)), TextColumn(TEXT_PROGRESS_FORMAT), | ||
# SpinnerColumn(finished_text="✔ "), | ||
# TimeElapsedColumn()) as progress: | ||
# for command in commands: | ||
# task = progress.add_task(f"{command.title}", start=True, total=1) | ||
# result = container.exec_run(f'/bin/bash -c "{command.content}"') | ||
# | ||
# if result.exit_code != 0: | ||
# command_output = result.output.decode().strip('\n') | ||
# error_message = f"\n[red]Docker command error: '{command_output}'" | ||
# break | ||
# | ||
# if command.id == "call-backend": | ||
# agent_result = result.output.decode() | ||
# progress.update(task, description=f"[green]{command.title}", advance=1) | ||
# progress.stop_task(task) | ||
# | ||
# if error_message is not None: | ||
# err_console.print(error_message) | ||
# | ||
# # Cleanup | ||
# with Progress(TextColumn(TEXT_PROGRESS_FORMAT), | ||
# SpinnerColumn(finished_text="✔ ")) as progress: | ||
# stop_task_text = "Stopping and removing container" | ||
# task = progress.add_task(f"{stop_task_text}", start=True, total=1) | ||
# container.stop() | ||
# container.remove() | ||
# progress.update(task, description=f"[green]{stop_task_text}", advance=1) | ||
# | ||
# if agent_result is not None: | ||
# agent_data = UpdateAgentResponse(**json.loads(agent_result)) | ||
# print(f"Agent successfully deployed on {get_vm_url(agent_data.vm_hash)}") | ||
# else: | ||
# typer.Exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
import asyncio | ||
import inspect | ||
from functools import wraps, partial | ||
|
||
from typer import Typer | ||
|
||
|
||
class AsyncTyper(Typer): | ||
@staticmethod | ||
def maybe_run_async(decorator, f): | ||
if inspect.iscoroutinefunction(f): | ||
|
||
@wraps(f) | ||
def runner(*args, **kwargs): | ||
return asyncio.run(f(*args, **kwargs)) | ||
|
||
decorator(runner) | ||
else: | ||
decorator(f) | ||
return f | ||
|
||
def callback(self, *args, **kwargs): | ||
decorator = super().callback(*args, **kwargs) | ||
return partial(self.maybe_run_async, decorator) | ||
|
||
def command(self, *args, **kwargs): | ||
decorator = super().command(*args, **kwargs) | ||
return partial(self.maybe_run_async, decorator) |
Oops, something went wrong.