Skip to content

Commit

Permalink
Merge pull request #159 from Datura-ai/main
Browse files Browse the repository at this point in the history
deploy validator
  • Loading branch information
pyon12 authored Dec 27, 2024
2 parents 06de494 + eca46d4 commit b5fe76f
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 12 deletions.
2 changes: 1 addition & 1 deletion neurons/validators/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ RUN echo "export PYTHONPATH=$PYTHONPATH" >> ~/.bash_profile
COPY --from=base-image /root/app/ /root/app/
COPY --from=base-image /opt/pypackages/ /opt/pypackages/

LABEL version="3.3.11"
LABEL version="3.3.12"

CMD ["bash", "run.sh"]
2 changes: 1 addition & 1 deletion neurons/validators/Dockerfile.runner
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WORKDIR /root/validator
COPY docker-compose.app.yml docker-compose.yml
COPY entrypoint.sh /entrypoint.sh

LABEL version="3.3.11"
LABEL version="3.3.12"

RUN chmod u+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
49 changes: 45 additions & 4 deletions neurons/validators/src/clients/compute_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ContainerCreateRequest,
ContainerDeleteRequest,
FailedContainerRequest,
DuplicateContainersResponse,
ContainerStartRequest,
ContainerStopRequest,
ContainerBaseRequest,
Expand All @@ -23,6 +24,7 @@
ExecutorSpecRequest,
LogStreamRequest,
RentedMachineRequest,
DuplicateContainersRequest,
)
from pydantic import BaseModel
from websockets.asyncio.client import ClientConnection
Expand All @@ -34,6 +36,7 @@
from services.redis_service import (
MACHINE_SPEC_CHANNEL_NAME,
RENTED_MACHINE_SET,
DUPLICATED_MACHINE_SET,
STREAMING_LOG_CHANNEL,
)

Expand Down Expand Up @@ -67,9 +70,10 @@ def __init__(
"compute_app_uri": compute_app_uri,
}
)

def accepted_request_type(self) -> type[BaseRequest]:
return ContainerBaseRequest

def connect(self):
"""Create an awaitable/async-iterable websockets.connect() object"""
logger.info(
Expand Down Expand Up @@ -396,6 +400,15 @@ async def poll_rented_machines(self):
)
)
await self.send_model(RentedMachineRequest())

logger.info(
_m(
"Request duplicated machines",
extra=self.logging_extra,
)
)
await self.send_model(DuplicateContainersRequest())

await asyncio.sleep(10 * 60)
else:
await asyncio.sleep(10)
Expand Down Expand Up @@ -439,13 +452,41 @@ async def handle_message(self, raw_msg: str | bytes):
)

redis_service = self.miner_service.redis_service
await redis_service.clear_set(RENTED_MACHINE_SET)
await redis_service.delete(RENTED_MACHINE_SET)

for machine in response.machines:
await redis_service.add_rented_machine(machine)

return

try:
response = pydantic.TypeAdapter(DuplicateContainersResponse).validate_json(raw_msg)
except pydantic.ValidationError as exc:
logger.error(
_m(
"could not parse raw message as DuplicateContainersResponse",
extra={**self.logging_extra, "error": str(exc), "raw_msg": raw_msg},
)
)
else:
logger.info(
_m(
"Duplicated containers",
extra={**self.logging_extra, "machines": response.containers},
)
)

redis_service = self.miner_service.redis_service
await redis_service.delete(DUPLICATED_MACHINE_SET)

for container_id, details_list in response.containers.items():
for detail in details_list:
executor_id = detail.get("executor_id")
miner_hotkey = detail.get("miner_hotkey")
await redis_service.sadd(DUPLICATED_MACHINE_SET, f"{miner_hotkey}:{executor_id}")

return

try:
job_request = self.accepted_request_type().parse(raw_msg)
except Exception as ex:
Expand Down Expand Up @@ -530,7 +571,7 @@ async def miner_driver(self, job_request: ContainerCreateRequest | ContainerDele
extra={**logging_extra, "response": str(response)},
)
)
await self.send_model(response)
await self.send_model(response)
elif isinstance(job_request, ContainerStartRequest):
job_request.miner_address = miner_axon_info.ip
job_request.miner_port = miner_axon_info.port
Expand All @@ -544,4 +585,4 @@ async def miner_driver(self, job_request: ContainerCreateRequest | ContainerDele
extra={**logging_extra, "response": str(response)},
)
)
await self.send_model(response)
await self.send_model(response)
6 changes: 6 additions & 0 deletions neurons/validators/src/payload_models/payloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ContainerRequestType(enum.Enum):
ContainerStartRequest = "ContainerStartRequest"
ContainerStopRequest = "ContainerStopRequest"
ContainerDeleteRequest = "ContainerDeleteRequest"
DuplicateContainersResponse = "DuplicateContainersResponse"


class ContainerBaseRequest(BaseRequest):
Expand Down Expand Up @@ -137,3 +138,8 @@ class FailedContainerRequest(ContainerBaseResponse):
message_type: ContainerResponseType = ContainerResponseType.FailedRequest
msg: str
error_code: FailedContainerErrorCodes | None = None


class DuplicateContainersResponse(BaseModel):
message_type: ContainerRequestType = ContainerRequestType.DuplicateContainersResponse
containers: dict[str, list]
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class RequestType(enum.Enum):
ExecutorSpecRequest = "ExecutorSpecRequest"
RentedMachineRequest = "RentedMachineRequest"
LogStreamRequest = "LogStreamRequest"
DuplicateContainersRequest = "DuplicateContainersRequest"


class BaseValidatorRequest(BaseRequest):
Expand Down Expand Up @@ -70,3 +71,7 @@ class LogStreamRequest(BaseValidatorRequest):
validator_hotkey: str
executor_uuid: str
logs: list[dict]


class DuplicateContainersRequest(BaseValidatorRequest):
message_type: RequestType = RequestType.DuplicateContainersRequest
6 changes: 1 addition & 5 deletions neurons/validators/src/services/redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
MACHINE_SPEC_CHANNEL_NAME = "channel:1"
STREAMING_LOG_CHANNEL = "channel:2"
RENTED_MACHINE_SET = "rented_machines"
DUPLICATED_MACHINE_SET = "duplicated_machines"
EXECUTOR_COUNT_PREFIX = "executor_counts"
AVAILABLE_PORT_MAPS_PREFIX = "available_port_maps"

Expand Down Expand Up @@ -96,11 +97,6 @@ async def rpop(self, key: str) -> bytes:
async with self.lock:
return await self.redis.rpop(key)

async def clear_set(self, key: str):
"""Clear all elements from a set in Redis."""
async with self.lock:
await self.redis.delete(key)

async def hset(self, key: str, field: str, value: str):
async with self.lock:
await self.redis.hset(key, field, value)
Expand Down
31 changes: 30 additions & 1 deletion neurons/validators/src/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
LIB_NVIDIA_ML_DIGESTS,
DOCKER_DIGESTS,
)
from services.redis_service import RedisService, RENTED_MACHINE_SET, AVAILABLE_PORT_MAPS_PREFIX
from services.redis_service import (
RedisService,
RENTED_MACHINE_SET,
DUPLICATED_MACHINE_SET,
AVAILABLE_PORT_MAPS_PREFIX,
)
from services.ssh_service import SSHService
from services.hash_service import HashService

Expand Down Expand Up @@ -488,6 +493,30 @@ async def create_task(
),
)

# check duplicated
is_duplicated = await self.redis_service.is_elem_exists_in_set(
DUPLICATED_MACHINE_SET, f"{miner_info.miner_hotkey}:{executor_info.uuid}"
)
if is_duplicated:
log_status = "warning"
log_text = _m(
f"Executor is duplicated",
extra=get_extra_info(default_extra),
)
logger.warning(log_text)

await self.clear_remote_directory(ssh_client, remote_dir)

return (
machine_spec,
executor_info,
0,
0,
miner_info.job_batch_id,
log_status,
log_text,
)

# check rented status
is_rented = await self.redis_service.is_elem_exists_in_set(
RENTED_MACHINE_SET, f"{miner_info.miner_hotkey}:{executor_info.uuid}"
Expand Down

0 comments on commit b5fe76f

Please sign in to comment.