Skip to content

Commit

Permalink
Merge pull request #111 from Datura-ai/main
Browse files Browse the repository at this point in the history
deploy
  • Loading branch information
pyon12 authored Nov 29, 2024
2 parents 4702655 + 1932bb9 commit 6444f07
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 53 deletions.
2 changes: 1 addition & 1 deletion neurons/validators/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ RUN echo "export PYTHONPATH=$PYTHONPATH" >> ~/.bash_profile
COPY --from=base-image /root/app/ /root/app/
COPY --from=base-image /opt/pypackages/ /opt/pypackages/

LABEL version="3.2.2"
LABEL version="3.2.3"

CMD ["bash", "run.sh"]
2 changes: 1 addition & 1 deletion neurons/validators/Dockerfile.runner
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WORKDIR /root/validator
COPY docker-compose.app.yml docker-compose.yml
COPY entrypoint.sh /entrypoint.sh

LABEL version="3.2.2"
LABEL version="3.2.3"

RUN chmod u+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
25 changes: 25 additions & 0 deletions neurons/validators/src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from services.file_encrypt_service import FileEncryptService
from payload_models.payloads import (
MinerJobRequestPayload,
ContainerCreateRequest,
)

configure_logs_of_other_modules()
Expand Down Expand Up @@ -159,5 +160,29 @@ async def _request_job_to_miner(miner_hotkey: str, miner_address: str, miner_por
docker_hub_digests=docker_hub_digests,
)

@cli.command()
@click.option("--miner_hotkey", prompt="Miner Hotkey", help="Hotkey of Miner")
@click.option("--miner_address", prompt="Miner Address", help="Miner IP Address")
@click.option("--miner_port", type=int, prompt="Miner Port", help="Miner Port")
@click.option("--executor_id", prompt="Executor Id", help="Executor Id")
@click.option("--docker_image", prompt="Docker Image", help="Docker Image")
def create_container_to_miner(miner_hotkey: str, miner_address: str, miner_port: int, executor_id: str, docker_image: str):
asyncio.run(_create_container_to_miner(miner_hotkey, miner_address, miner_port, executor_id, docker_image))


async def _create_container_to_miner(miner_hotkey: str, miner_address: str, miner_port: int, executor_id: str, docker_image: str):
miner_service: MinerService = ioc["MinerService"]

payload = ContainerCreateRequest(
docker_image=docker_image,
user_public_key="user_public_key",
executor_id=executor_id,
miner_hotkey=miner_hotkey,
miner_address=miner_address,
miner_port=miner_port,
)
await miner_service.handle_container(payload)


if __name__ == "__main__":
cli()
1 change: 1 addition & 0 deletions neurons/validators/src/clients/compute_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ async def wait_for_specs(self, channel: aioredis.client.PubSub):
specs = ExecutorSpecRequest(
specs=msg["specs"],
score=msg["score"],
synthetic_job_score=msg["synthetic_job_score"],
log_status=msg["log_status"],
job_batch_id=msg["job_batch_id"],
log_text=msg["log_text"],
Expand Down
16 changes: 16 additions & 0 deletions neurons/validators/src/miner_jobs/machine_scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,21 @@ def get_all_container_digests():

return digests # Return the list of digests

# Define the get_md5_checksums function
def get_md5_checksums():
checksums = {"nvidia_smi": None, "libnvidia_ml": None}
try:
nvidia_smi_path = run_cmd("which nvidia-smi").strip()
if nvidia_smi_path:
checksums["nvidia_smi"] = run_cmd(f"md5sum {nvidia_smi_path}").split()[0]

lib_path = run_cmd("find /usr -name 'libnvidia-ml.so.1'").strip()
if lib_path:
checksums["libnvidia_ml"] = run_cmd(f"md5sum {lib_path}").split()[0]
except Exception as exc:
checksums["error"] = repr(exc)
return checksums

def get_machine_specs():
"""Get Specs of miner machine."""
data = {}
Expand Down Expand Up @@ -667,6 +682,7 @@ def get_machine_specs():

data["network"] = get_network_speed()
data["all_container_digests"] = get_all_container_digests()
data["md5_checksums"] = get_md5_checksums()
return data


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ExecutorSpecRequest(BaseValidatorRequest):
executor_port: int
specs: dict | None
score: float | None
synthetic_job_score: float | None
log_text: str | None
log_status: str | None
job_batch_id: str
Expand Down
126 changes: 104 additions & 22 deletions neurons/validators/src/services/docker_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
ContainerDeleteRequest,
ContainerStartRequest,
ContainerStopRequest,
FailedContainerRequest,
)
from protocol.vc_protocol.compute_requests import RentedMachine

from core.utils import _m, context, get_extra_info
from services.redis_service import RedisService
from core.utils import _m, get_extra_info
from services.redis_service import RedisService, AVAILABLE_PORTS_PREFIX
from services.ssh_service import SSHService

logger = logging.getLogger(__name__)
Expand All @@ -42,27 +43,29 @@ def __init__(
self.ssh_service = ssh_service
self.redis_service = redis_service

def generate_portMappings(self, range_external_ports):
internal_ports = [22, 20000, 20001, 20002, 20003]
if range_external_ports:
if '-' in range_external_ports:
start, end = map(int, range_external_ports.split('-'))
available_ports = list(range(start, end + 1))
async def generate_portMappings(self, miner_hotkey, executor_id):
try:
internal_ports = [22, 20000, 20001, 20002, 20003]

key = f"{AVAILABLE_PORTS_PREFIX}:{miner_hotkey}:{executor_id}"
available_ports_str = await self.redis_service.get(key)
if available_ports_str:
available_ports = list(map(int, available_ports_str.decode().split(',')))
else:
available_ports = list(map(int, range_external_ports.split(',')))
else:
available_ports = list(range(40000, 65535))
available_ports = []

if 0 in available_ports:
available_ports.remove(0)
if 0 in available_ports:
available_ports.remove(0)

mappings = []
for i, internal_port in enumerate(internal_ports):
if i < len(available_ports):
mappings.append((internal_port, available_ports[i]))
else:
break
return mappings
mappings = []
for i, internal_port in enumerate(internal_ports):
if i < len(available_ports):
mappings.append((internal_port, available_ports[i]))
else:
break
return mappings
except:
return []

async def create_container(
self,
Expand All @@ -89,9 +92,15 @@ async def create_container(
)

# generate port maps
port_maps = self.generate_portMappings(executor_info.port_range)
port_maps = await self.generate_portMappings(payload.miner_hotkey, payload.executor_id)
if not port_maps:
return None
log_text = "No port mappings found"
logger.error(log_text)
return FailedContainerRequest(
miner_hotkey=payload.miner_hotkey,
executor_id=payload.executor_id,
msg=log_text
)

private_key = self.ssh_service.decrypt_payload(keypair.ss58_address, private_key)
pkey = asyncssh.import_private_key(private_key)
Expand Down Expand Up @@ -403,3 +412,76 @@ async def get_docker_hub_digests(self, repositories) -> dict[str, str]:
print(f"Error retrieving data for {repo}: {e}")

return all_digests

async def setup_ssh_access(
self,
ssh_client: asyncssh.SSHClientConnection,
container_name: str,
ip_address: str,
username: str = "root",
port_maps: list[tuple[int, int]] = None
) -> tuple[bool, str, str]:
"""Generate an SSH key pair, add the public key to the Docker container, and check SSH connection."""

my_key = "my_key"
private_key, public_key = self.ssh_service.generate_ssh_key(my_key)

public_key = public_key.decode("utf-8")
private_key = private_key.decode("utf-8")

private_key = self.ssh_service.decrypt_payload(my_key, private_key)
pkey = asyncssh.import_private_key(private_key)

await asyncio.sleep(5)

command = f"docker exec {container_name} sh -c 'echo \"{public_key}\" >> /root/.ssh/authorized_keys'"

result = await ssh_client.run(command)
if result.exit_status != 0:
log_text = "Error creating docker connection"
log_status = "error"
logger.error(log_text)

return False, log_text, log_status

port = 0
for internal, external in port_maps:
if internal == 22:
port = external
# Check SSH connection
try:
async with asyncssh.connect(
host=ip_address,
port=port,
username=username,
client_keys=[pkey],
known_hosts=None,
) as ssh_client_1:
log_status = "info"
log_text = "SSH connection successful!"
logger.info(
_m(
log_text,
extra={
"container_name": container_name,
"ip_address": ip_address,
"port_maps": port_maps,
},
)
)
return True, log_text, log_status
except Exception as e:
log_text = "SSH connection failed"
log_status = "error"
logger.error(
_m(
log_text,
extra={
"container_name": container_name,
"ip_address": ip_address,
"port_maps": port_maps,
"error": str(e),
},
)
)
return False, log_text, log_status
19 changes: 8 additions & 11 deletions neurons/validators/src/services/miner_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async def request_job_to_miner(
await self.store_executor_counts(payload.miner_hotkey, payload.job_batch_id, len(msg.executors), results)

total_score = 0
for _, _, score, _, _, _ in results:
for _, _, score, _, _, _, _ in results:
total_score += score

logger.info(
Expand Down Expand Up @@ -222,7 +222,7 @@ async def publish_machine_specs(
extra=get_extra_info({**default_extra, "results": len(results)}),
),
)
for specs, ssh_info, score, job_batch_id, log_status, log_text in results:
for specs, ssh_info, score, synthetic_job_score, job_batch_id, log_status, log_text in results:
try:
await self.redis_service.publish(
MACHINE_SPEC_CHANNEL_NAME,
Expand All @@ -233,6 +233,7 @@ async def publish_machine_specs(
"executor_ip": ssh_info.address,
"executor_port": ssh_info.port,
"score": score,
"synthetic_job_score": synthetic_job_score,
"job_batch_id": job_batch_id,
"log_status": log_status,
"log_text": str(log_text),
Expand All @@ -256,7 +257,7 @@ async def store_executor_counts(self, miner_hotkey: str, job_batch_id: str, tota
success = 0
failed = 0

for _, _, score, _, _, _ in results:
for _, _, score, _, _, _, _ in results:
if score > 0:
success += 1
else:
Expand Down Expand Up @@ -322,7 +323,7 @@ async def handle_container(self, payload: ContainerBaseRequest):

try:
msg = await asyncio.wait_for(
miner_client.job_state.miner_accepted_ssh_key_or_failed_future, timeout=1
miner_client.job_state.miner_accepted_ssh_key_or_failed_future, timeout=JOB_LENGTH
)
except TimeoutError:
logger.error(
Expand Down Expand Up @@ -405,13 +406,9 @@ async def handle_container(self, payload: ContainerBaseRequest):
public_key=public_key, executor_id=payload.executor_id
)
)

if result is None:
return FailedContainerRequest(
miner_hotkey=payload.miner_hotkey,
executor_id=payload.executor_id,
msg=f"create container error: No ports available",
)

if isinstance(result, FailedContainerRequest):
return result

return ContainerCreated(
miner_hotkey=payload.miner_hotkey,
Expand Down
6 changes: 6 additions & 0 deletions neurons/validators/src/services/redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
MACHINE_SPEC_CHANNEL_NAME = "channel:1"
RENTED_MACHINE_SET = "rented_machines"
EXECUTOR_COUNT_PREFIX = "executor_counts"
AVAILABLE_PORTS_PREFIX = "available_ports"


class RedisService:
Expand Down Expand Up @@ -34,6 +35,11 @@ async def get(self, key: str):
async with self.lock:
return await self.redis.get(key)

async def delete(self, key: str):
"""Remove a key from Redis."""
async with self.lock:
await self.redis.delete(key)

async def sadd(self, key: str, elem: str):
"""Add a machine ID to the set of rented machines."""
async with self.lock:
Expand Down
Loading

0 comments on commit 6444f07

Please sign in to comment.