Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deployment on instances #10

Merged
merged 10 commits into from
Dec 19, 2024
1,781 changes: 911 additions & 870 deletions backend/poetry.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ package-mode = false
python = "^3.12"
fastapi = { extras = ["standard"], version = "^0.115.2" }
aleph-sdk-python = "^1.1.0"
libertai-utils = "^0.0.5"
libertai-utils = "0.0.9"
setuptools = "^75.4.0"
paramiko = "^3.5.0"

[tool.poetry.group.dev.dependencies]
mypy = "^1.12.0"
ruff = "^0.7.0"

[tool.ruff]
lint.select = ["C", "E", "F", "I", "W"]
lint.ignore = ["E501"]

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
16 changes: 10 additions & 6 deletions backend/src/interfaces/agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from enum import Enum

from libertai_utils.interfaces.agent import BaseDeleteAgentBody
from libertai_utils.interfaces.subscription import SubscriptionAccount
from pydantic import BaseModel, validator
Expand All @@ -19,19 +21,16 @@ class SetupAgentBody(DeleteAgentBody):
account: SubscriptionAccount


class UpdateAgentResponse(BaseModel):
vm_hash: str


class PublicAgentData(BaseModel):
id: str
subscription_id: str
vm_hash: str | None
instance_hash: str
last_update: int


class Agent(PublicAgentData):
encrypted_secret: str
encrypted_ssh_key: str
tags: list[str]


Expand All @@ -44,8 +43,13 @@ class GetAgentSecretMessage(BaseModel):


class GetAgentResponse(PublicAgentData):
pass
instance_ip: str | None


class GetAgentSecretResponse(BaseModel):
secret: str


class AgentPythonPackageManager(str, Enum):
poetry = "poetry"
pip = "pip"
184 changes: 109 additions & 75 deletions backend/src/main.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
import json
import io
import time
from http import HTTPStatus
from uuid import uuid4

import aiohttp
import paramiko
from aleph.sdk import AuthenticatedAlephHttpClient
from aleph.sdk.chains.ethereum import ETHAccount
from aleph_message.models.execution import Encoding
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from aleph.sdk.conf import settings
from aleph_message.models import Chain, Payment, PaymentType, StoreMessage
from aleph_message.models.execution.environment import HypervisorType
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from libertai_utils.chains.index import is_signature_valid
from libertai_utils.interfaces.agent import UpdateAgentResponse
from libertai_utils.interfaces.subscription import Subscription
from libertai_utils.utils.crypto import decrypt, encrypt
from starlette.middleware.cors import CORSMiddleware

from src.config import config
from src.interfaces.agent import (
Agent,
SetupAgentBody,
AgentPythonPackageManager,
DeleteAgentBody,
UpdateAgentResponse,
GetAgentResponse,
GetAgentSecretResponse,
GetAgentSecretMessage,
GetAgentSecretResponse,
SetupAgentBody,
)
from src.interfaces.aleph import AlephVolume
from src.utils.agent import fetch_agents, fetch_agent_program_message
from src.utils.agent import fetch_agents
from src.utils.aleph import fetch_instance_ip
from src.utils.message import get_view_agent_secret_message
from src.utils.storage import upload_file
from src.utils.ssh import generate_ssh_key_pair

app = FastAPI(title="LibertAI agents")

Expand All @@ -50,20 +54,49 @@ async def setup(body: SetupAgentBody) -> None:
secret = str(uuid4())
encrypted_secret = encrypt(secret, config.ALEPH_SENDER_PK)

agent = Agent(
id=agent_id,
subscription_id=body.subscription_id,
vm_hash=None,
encrypted_secret=encrypted_secret,
last_update=int(time.time()),
tags=[agent_id, body.subscription_id, body.account.address],
)
private_key, public_key = generate_ssh_key_pair()
encrypted_private_key = encrypt(private_key, config.ALEPH_SENDER_PK)

rootfs = settings.UBUNTU_22_QEMU_ROOTFS_ID

aleph_account = ETHAccount(config.ALEPH_SENDER_SK)
async with AuthenticatedAlephHttpClient(
account=aleph_account, api_server=config.ALEPH_API_URL
account=aleph_account, api_server=config.ALEPH_API_URL
) as client:
post_message, _ = await client.create_post(
rootfs_message: StoreMessage = await client.get_message(
item_hash=rootfs, message_type=StoreMessage
)
rootfs_size = (
rootfs_message.content.size
if rootfs_message.content.size is not None
else settings.DEFAULT_ROOTFS_SIZE
)

instance_message, _status = await client.create_instance(
rootfs=rootfs,
rootfs_size=rootfs_size,
hypervisor=HypervisorType.qemu,
payment=Payment(chain=Chain.ETH, type=PaymentType.hold, receiver=None),
channel=config.ALEPH_CHANNEL,
address=config.ALEPH_OWNER,
ssh_keys=[public_key],
metadata={"name": agent_id},
vcpus=settings.DEFAULT_VM_VCPUS,
memory=settings.DEFAULT_INSTANCE_MEMORY,
sync=True,
)

agent = Agent(
id=agent_id,
subscription_id=body.subscription_id,
instance_hash=instance_message.item_hash,
encrypted_secret=encrypted_secret,
encrypted_ssh_key=encrypted_private_key,
last_update=int(time.time()),
tags=[agent_id, body.subscription_id, body.account.address],
)

await client.create_post(
address=config.ALEPH_OWNER,
post_content=agent.dict(),
post_type=config.ALEPH_AGENT_POST_TYPE,
Expand All @@ -83,9 +116,15 @@ async def get_agent_public_info(agent_id: str) -> GetAgentResponse:
)
agent = agents[0]

try:
ip_address = await fetch_instance_ip(agent.instance_hash)
except ValueError:
ip_address = None

return GetAgentResponse(
id=agent.id,
vm_hash=agent.vm_hash,
instance_hash=agent.instance_hash,
instance_ip=ip_address,
last_update=agent.last_update,
subscription_id=agent.subscription_id,
)
Expand All @@ -104,7 +143,7 @@ async def get_agent_secret(agent_id: str, signature: str) -> GetAgentSecretRespo

async with aiohttp.ClientSession() as session:
async with session.get(
url=f"{config.SUBSCRIPTION_BACKEND_URL}/subscriptions/{agent.subscription_id}"
url=f"{config.SUBSCRIPTION_BACKEND_URL}/subscriptions/{agent.subscription_id}"
) as response:
data = await response.json()
if response.status != HTTPStatus.OK:
Expand Down Expand Up @@ -142,13 +181,15 @@ def get_agent_secret_message(agent_id: str) -> GetAgentSecretMessage:

@app.put("/agent/{agent_id}", description="Deploy an agent or update it")
async def update(
agent_id: str,
secret: str = Form(),
env_variables: str = Form(), # actually dict[str, str] but the typing doesn't work well with forms
code: UploadFile = File(...),
packages: UploadFile = File(...),
agent_id: str,
secret: str = Form(),
deploy_script_url: str = Form(
default="https://raw.githubusercontent.com/Libertai/libertai-agents/refs/heads/reza/deployment-instances/deployment/deploy.sh"
),
python_version: str = Form(),
package_manager: AgentPythonPackageManager = Form(),
code: UploadFile = File(...),
) -> UpdateAgentResponse:
env_variables = json.loads(env_variables)
agents = await fetch_agents([agent_id])

if len(agents) != 1:
Expand All @@ -157,76 +198,70 @@ async def update(
detail=f"Agent with ID {agent_id} not found.",
)
agent = agents[0]
agent_program = (
await fetch_agent_program_message(agent.vm_hash)
if agent.vm_hash is not None
else None
)

# Validating the secret
decrypted_secret = decrypt(agent.encrypted_secret, config.ALEPH_SENDER_SK)

if secret != decrypted_secret:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail="The secret provided doesn't match the one of this agent.",
)

previous_code_ref = (
agent_program.content.code.ref if agent_program is not None else None
)
# TODO: additional checks on the type of volume, find the right one based on mount etc
previous_packages_ref = (
agent_program.content.volumes[0].ref if agent_program is not None else None # type: ignore
ssh_private_key = decrypt(agent.encrypted_ssh_key, config.ALEPH_SENDER_SK)

try:
hostname = await fetch_instance_ip(agent.instance_hash)
except ValueError:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="Instance IPv6 address not found, it probably isn't allocated yet. Please try again in a few minutes.",
)

# Create a Paramiko SSH client
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# Load private key from string
rsa_key = paramiko.RSAKey(file_obj=io.StringIO(ssh_private_key))

# Read the file content into memory
content = await code.read()

# Connect to the server
ssh_client.connect(hostname=hostname, username="root", pkey=rsa_key)

# Send the zip with the code
sftp = ssh_client.open_sftp()
remote_path = "/tmp/libertai-agent.zip"
sftp.putfo(io.BytesIO(content), remote_path)
sftp.close()

# Execute the command
ssh_client.exec_command(
f"wget {deploy_script_url} -O /tmp/deploy-agent.sh -q --no-cache && chmod +x /tmp/deploy-agent.sh && /tmp/deploy-agent.sh {python_version} {package_manager.value}"
)

code_ref = await upload_file(code, previous_code_ref)
packages_ref = await upload_file(packages, previous_packages_ref)
# Close the connection
ssh_client.close()

# Register the program
aleph_account = ETHAccount(config.ALEPH_SENDER_SK)
async with AuthenticatedAlephHttpClient(
account=aleph_account, api_server=config.ALEPH_API_URL
account=aleph_account, api_server=config.ALEPH_API_URL
) as client:
vm_hash = agent.vm_hash

if vm_hash is None:
message, _ = await client.create_program(
address=config.ALEPH_OWNER,
program_ref=code_ref,
entrypoint="run",
runtime="63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696",
environment_variables=env_variables,
channel=config.ALEPH_CHANNEL,
encoding=Encoding.squashfs,
persistent=False,
sync=True,
volumes=[
AlephVolume(
comment="Python packages",
mount="/opt/packages",
ref=packages_ref,
use_latest=True,
).dict()
],
)
vm_hash = message.item_hash

# TODO: update env_vars also if vm already created

# Updating the related POST message
await client.create_post(
address=config.ALEPH_OWNER,
post_content=Agent(
**agent.dict(exclude={"vm_hash", "last_update"}),
vm_hash=vm_hash,
**agent.dict(exclude={"last_update"}),
last_update=int(time.time()),
),
post_type="amend",
ref=agent.post_hash,
channel=config.ALEPH_CHANNEL,
sync=True,
)
return UpdateAgentResponse(vm_hash=vm_hash)

return UpdateAgentResponse(instance_ip=hostname)


@app.delete("/agent", description="Remove an agent on subscription end")
Expand All @@ -242,12 +277,11 @@ async def delete(body: DeleteAgentBody):

aleph_account = ETHAccount(config.ALEPH_SENDER_SK)
async with AuthenticatedAlephHttpClient(
account=aleph_account, api_server=config.ALEPH_API_URL
account=aleph_account, api_server=config.ALEPH_API_URL
) as client:
# TODO: should we delete STORE messages of the code / deps too ?
await client.forget(
address=config.ALEPH_OWNER,
hashes=[agent.vm_hash],
hashes=[agent.instance_hash],
channel=config.ALEPH_CHANNEL,
reason="LibertAI Agent subscription ended",
)
Empty file added backend/src/utils/__init__.py
Empty file.
7 changes: 0 additions & 7 deletions backend/src/utils/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from aleph.sdk import AlephHttpClient
from aleph.sdk.query.filters import PostFilter
from aleph_message.models import ProgramMessage

from src.config import config
from src.interfaces.agent import FetchedAgent
Expand All @@ -20,9 +19,3 @@ async def fetch_agents(ids: list[str] | None = None) -> list[FetchedAgent]:
FetchedAgent(**post.content, post_hash=post.original_item_hash)
for post in result.posts
]


async def fetch_agent_program_message(item_hash: str) -> ProgramMessage:
async with AlephHttpClient(api_server=config.ALEPH_API_URL) as client:
result = await client.get_message(item_hash, ProgramMessage)
return result
27 changes: 27 additions & 0 deletions backend/src/utils/aleph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import aiohttp


async def fetch_instance_ip(item_hash: str) -> str:
"""
Fetches IPv6 of an allocated instance given a message hash.

Args:
item_hash: Instance message hash.
Returns:
IPv6 address
"""

async with aiohttp.ClientSession() as session:
try:
async with session.get(
f"https://scheduler.api.aleph.cloud/api/v0/allocation/{item_hash}"
) as resp:
resp.raise_for_status()
allocation = await resp.json()
return allocation["vm_ipv6"]
except (
aiohttp.ClientResponseError,
aiohttp.ClientConnectorError,
aiohttp.ConnectionTimeoutError,
):
raise ValueError()
Loading
Loading