Skip to content

Commit

Permalink
feat(backend): Instances deployment started
Browse files Browse the repository at this point in the history
  • Loading branch information
RezaRahemtola committed Dec 7, 2024
1 parent 183741b commit 85feddb
Show file tree
Hide file tree
Showing 8 changed files with 901 additions and 843 deletions.
1,522 changes: 774 additions & 748 deletions backend/poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ package-mode = false
python = "^3.12"
fastapi = { extras = ["standard"], version = "^0.115.2" }
aleph-sdk-python = "^1.1.0"
libertai-utils = "^0.0.5"
libertai-utils = "0.0.8"
setuptools = "^75.4.0"
paramiko = "^3.5.0"

[tool.poetry.group.dev.dependencies]
mypy = "^1.12.0"
Expand Down
11 changes: 9 additions & 2 deletions backend/src/interfaces/agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from enum import Enum

from libertai_utils.interfaces.agent import BaseDeleteAgentBody
from libertai_utils.interfaces.subscription import SubscriptionAccount
from pydantic import BaseModel, validator
Expand All @@ -20,18 +22,19 @@ class SetupAgentBody(DeleteAgentBody):


class UpdateAgentResponse(BaseModel):
vm_hash: str
instance_hash: str


class PublicAgentData(BaseModel):
id: str
subscription_id: str
vm_hash: str | None
instance_hash: str
last_update: int


class Agent(PublicAgentData):
encrypted_secret: str
encrypted_ssh_key: str
tags: list[str]


Expand All @@ -49,3 +52,7 @@ class GetAgentResponse(PublicAgentData):

class GetAgentSecretResponse(BaseModel):
secret: str

class AgentPythonPackageManager(str, Enum):
poetry = "poetry"
pip = "pip"
155 changes: 84 additions & 71 deletions backend/src/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import json
import io
import time
from http import HTTPStatus
from uuid import uuid4

import aiohttp
import paramiko
from aleph.sdk import AuthenticatedAlephHttpClient
from aleph.sdk.chains.ethereum import ETHAccount
from aleph_message.models.execution import Encoding
from aleph_message.models import Payment, Chain, PaymentType
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from libertai_utils.chains.index import is_signature_valid
from libertai_utils.interfaces.subscription import Subscription
Expand All @@ -22,11 +23,11 @@
GetAgentResponse,
GetAgentSecretResponse,
GetAgentSecretMessage,
AgentPythonPackageManager,
)
from src.interfaces.aleph import AlephVolume
from src.utils.agent import fetch_agents, fetch_agent_program_message
from src.utils.agent import fetch_agents
from src.utils.message import get_view_agent_secret_message
from src.utils.storage import upload_file
from src.utils.ssh import generate_ssh_key_pair

app = FastAPI(title="LibertAI agents")

Expand All @@ -50,20 +51,33 @@ async def setup(body: SetupAgentBody) -> None:
secret = str(uuid4())
encrypted_secret = encrypt(secret, config.ALEPH_SENDER_PK)

agent = Agent(
id=agent_id,
subscription_id=body.subscription_id,
vm_hash=None,
encrypted_secret=encrypted_secret,
last_update=int(time.time()),
tags=[agent_id, body.subscription_id, body.account.address],
)
private_key, public_key = generate_ssh_key_pair()
encrypted_private_key = encrypt(private_key, config.ALEPH_SENDER_SK)

aleph_account = ETHAccount(config.ALEPH_SENDER_SK)
async with AuthenticatedAlephHttpClient(
account=aleph_account, api_server=config.ALEPH_API_URL
account=aleph_account, api_server=config.ALEPH_API_URL
) as client:
post_message, _ = await client.create_post(
instance_message, _status = await client.create_instance(
rootfs="TODO",
rootfs_size=0,
payment=Payment(chain=Chain.ETH, type=PaymentType.hold, receiver=None),
channel=config.ALEPH_CHANNEL,
address=config.ALEPH_OWNER,
ssh_keys=[public_key],
)

agent = Agent(
id=agent_id,
subscription_id=body.subscription_id,
instance_hash=instance_message.item_hash,
encrypted_secret=encrypted_secret,
encrypted_ssh_key=encrypted_private_key,
last_update=int(time.time()),
tags=[agent_id, body.subscription_id, body.account.address],
)

await client.create_post(
address=config.ALEPH_OWNER,
post_content=agent.dict(),
post_type=config.ALEPH_AGENT_POST_TYPE,
Expand All @@ -85,7 +99,7 @@ async def get_agent_public_info(agent_id: str) -> GetAgentResponse:

return GetAgentResponse(
id=agent.id,
vm_hash=agent.vm_hash,
instance_hash=agent.instance_hash,
last_update=agent.last_update,
subscription_id=agent.subscription_id,
)
Expand All @@ -104,7 +118,7 @@ async def get_agent_secret(agent_id: str, signature: str) -> GetAgentSecretRespo

async with aiohttp.ClientSession() as session:
async with session.get(
url=f"{config.SUBSCRIPTION_BACKEND_URL}/subscriptions/{agent.subscription_id}"
url=f"{config.SUBSCRIPTION_BACKEND_URL}/subscriptions/{agent.subscription_id}"
) as response:
data = await response.json()
if response.status != HTTPStatus.OK:
Expand Down Expand Up @@ -142,13 +156,12 @@ def get_agent_secret_message(agent_id: str) -> GetAgentSecretMessage:

@app.put("/agent/{agent_id}", description="Deploy an agent or update it")
async def update(
agent_id: str,
secret: str = Form(),
env_variables: str = Form(), # actually dict[str, str] but the typing doesn't work well with forms
code: UploadFile = File(...),
packages: UploadFile = File(...),
agent_id: str,
secret: str = Form(),
python_version: str = Form(),
package_manager: AgentPythonPackageManager = Form(),
code: UploadFile = File(...),
) -> UpdateAgentResponse:
env_variables = json.loads(env_variables)
agents = await fetch_agents([agent_id])

if len(agents) != 1:
Expand All @@ -157,76 +170,77 @@ async def update(
detail=f"Agent with ID {agent_id} not found.",
)
agent = agents[0]
agent_program = (
await fetch_agent_program_message(agent.vm_hash)
if agent.vm_hash is not None
else None
)

# Validating the secret
decrypted_secret = decrypt(agent.encrypted_secret, config.ALEPH_SENDER_SK)

if secret != decrypted_secret:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail="The secret provided doesn't match the one of this agent.",
)

previous_code_ref = (
agent_program.content.code.ref if agent_program is not None else None
)
# TODO: additional checks on the type of volume, find the right one based on mount etc
previous_packages_ref = (
agent_program.content.volumes[0].ref if agent_program is not None else None # type: ignore
ssh_private_key = decrypt(agent.encrypted_ssh_key, config.ALEPH_SENDER_SK)

# TODO: get hostname using instance_hash
hostname = "2a01:240:ad00:2100:3:89cf:401:4871"

# TODO: store link elsewhere, use main version and take as optional parameter in route
SCRIPT_URL = "https://raw.githubusercontent.com/Libertai/libertai-agents/refs/heads/reza/deployment-instances/deployment/deploy.sh"

# Create a Paramiko SSH client
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# Load private key from string
rsa_key = paramiko.RSAKey(file_obj=io.StringIO(ssh_private_key))

# Read the file content into memory
content = await code.read()

# Connect to the server
ssh_client.connect(hostname=hostname, username="root", pkey=rsa_key)

# Send the zip with the code
sftp = ssh_client.open_sftp()
remote_path = "/tmp/libertai-agent.zip"
sftp.putfo(io.BytesIO(content), remote_path)
sftp.close()

# Execute a command
_stdin, stdout, stderr = ssh_client.exec_command(
f"wget {SCRIPT_URL} -O /tmp/deploy-agent.sh -q --no-cached && chmod +x /tmp/deploy-agent.sh && /tmp/deploy-agent.sh {python_version} {package_manager.value}"
)

code_ref = await upload_file(code, previous_code_ref)
packages_ref = await upload_file(packages, previous_packages_ref)
output = stdout.read().decode("utf-8")
error = stderr.read().decode("utf-8")
# Close the connection
ssh_client.close()

print("Command Output:")
print(output)

if error:
print("Command Error:")
print(error)

# Register the program
aleph_account = ETHAccount(config.ALEPH_SENDER_SK)
async with AuthenticatedAlephHttpClient(
account=aleph_account, api_server=config.ALEPH_API_URL
account=aleph_account, api_server=config.ALEPH_API_URL
) as client:
vm_hash = agent.vm_hash

if vm_hash is None:
message, _ = await client.create_program(
address=config.ALEPH_OWNER,
program_ref=code_ref,
entrypoint="run",
runtime="63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696",
environment_variables=env_variables,
channel=config.ALEPH_CHANNEL,
encoding=Encoding.squashfs,
persistent=False,
sync=True,
volumes=[
AlephVolume(
comment="Python packages",
mount="/opt/packages",
ref=packages_ref,
use_latest=True,
).dict()
],
)
vm_hash = message.item_hash

# TODO: update env_vars also if vm already created

# Updating the related POST message
await client.create_post(
address=config.ALEPH_OWNER,
post_content=Agent(
**agent.dict(exclude={"vm_hash", "last_update"}),
vm_hash=vm_hash,
**agent.dict(exclude={"last_update"}),
last_update=int(time.time()),
),
post_type="amend",
ref=agent.post_hash,
channel=config.ALEPH_CHANNEL,
sync=True,
)
return UpdateAgentResponse(vm_hash=vm_hash)

return UpdateAgentResponse(instance_hash="TODO")


@app.delete("/agent", description="Remove an agent on subscription end")
Expand All @@ -242,12 +256,11 @@ async def delete(body: DeleteAgentBody):

aleph_account = ETHAccount(config.ALEPH_SENDER_SK)
async with AuthenticatedAlephHttpClient(
account=aleph_account, api_server=config.ALEPH_API_URL
account=aleph_account, api_server=config.ALEPH_API_URL
) as client:
# TODO: should we delete STORE messages of the code / deps too ?
await client.forget(
address=config.ALEPH_OWNER,
hashes=[agent.vm_hash],
hashes=[agent.instance_hash],
channel=config.ALEPH_CHANNEL,
reason="LibertAI Agent subscription ended",
)
7 changes: 0 additions & 7 deletions backend/src/utils/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from aleph.sdk import AlephHttpClient
from aleph.sdk.query.filters import PostFilter
from aleph_message.models import ProgramMessage

from src.config import config
from src.interfaces.agent import FetchedAgent
Expand All @@ -20,9 +19,3 @@ async def fetch_agents(ids: list[str] | None = None) -> list[FetchedAgent]:
FetchedAgent(**post.content, post_hash=post.original_item_hash)
for post in result.posts
]


async def fetch_agent_program_message(item_hash: str) -> ProgramMessage:
async with AlephHttpClient(api_server=config.ALEPH_API_URL) as client:
result = await client.get_message(item_hash, ProgramMessage)
return result
18 changes: 18 additions & 0 deletions backend/src/utils/ssh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import io

import paramiko


def generate_ssh_key_pair() -> tuple[str, str]:
# Generate RSA key pair
key = paramiko.RSAKey.generate(4096)

# Serialize private key to string
private_key_io = io.StringIO()
key.write_private_key(private_key_io)
private_key_str = private_key_io.getvalue()

# Generate public key in OpenSSH format
public_key_str = f"{key.get_name()} {key.get_base64()}"

return private_key_str, public_key_str
2 changes: 1 addition & 1 deletion deployment/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ docker stop libertai-agent && docker rm $_

# Deploying the new agent
unzip $ZIP_PATH -d $CODE_PATH
wget https://raw.githubusercontent.com/Libertai/libertai-agents/refs/heads/reza/deployment-instances/deployment/$2.Dockerfile -O $DOCKERFILE_PATH -q
wget https://raw.githubusercontent.com/Libertai/libertai-agents/refs/heads/reza/deployment-instances/deployment/$2.Dockerfile -O $DOCKERFILE_PATH -q --no-cache
docker build $CODE_PATH \
-f $DOCKERFILE_PATH \
-t libertai-agent \
Expand Down
Loading

0 comments on commit 85feddb

Please sign in to comment.