diff --git a/neurons/validators/Dockerfile b/neurons/validators/Dockerfile index 81cd98e..86d1aab 100644 --- a/neurons/validators/Dockerfile +++ b/neurons/validators/Dockerfile @@ -47,6 +47,6 @@ RUN echo "export PYTHONPATH=$PYTHONPATH" >> ~/.bash_profile COPY --from=base-image /root/app/ /root/app/ COPY --from=base-image /opt/pypackages/ /opt/pypackages/ -LABEL version="3.3.11" +LABEL version="3.3.12" CMD ["bash", "run.sh"] diff --git a/neurons/validators/Dockerfile.runner b/neurons/validators/Dockerfile.runner index 5ffd4fe..bdc88c7 100644 --- a/neurons/validators/Dockerfile.runner +++ b/neurons/validators/Dockerfile.runner @@ -3,7 +3,7 @@ WORKDIR /root/validator COPY docker-compose.app.yml docker-compose.yml COPY entrypoint.sh /entrypoint.sh -LABEL version="3.3.11" +LABEL version="3.3.12" RUN chmod u+x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/neurons/validators/src/clients/compute_client.py b/neurons/validators/src/clients/compute_client.py index 290dbff..09adae7 100644 --- a/neurons/validators/src/clients/compute_client.py +++ b/neurons/validators/src/clients/compute_client.py @@ -13,6 +13,7 @@ ContainerCreateRequest, ContainerDeleteRequest, FailedContainerRequest, + DuplicateContainersResponse, ContainerStartRequest, ContainerStopRequest, ContainerBaseRequest, @@ -23,6 +24,7 @@ ExecutorSpecRequest, LogStreamRequest, RentedMachineRequest, + DuplicateContainersRequest, ) from pydantic import BaseModel from websockets.asyncio.client import ClientConnection @@ -34,6 +36,7 @@ from services.redis_service import ( MACHINE_SPEC_CHANNEL_NAME, RENTED_MACHINE_SET, + DUPLICATED_MACHINE_SET, STREAMING_LOG_CHANNEL, ) @@ -67,9 +70,10 @@ def __init__( "compute_app_uri": compute_app_uri, } ) + def accepted_request_type(self) -> type[BaseRequest]: return ContainerBaseRequest - + def connect(self): """Create an awaitable/async-iterable websockets.connect() object""" logger.info( @@ -396,6 +400,15 @@ async def poll_rented_machines(self): ) ) await self.send_model(RentedMachineRequest()) + + logger.info( + _m( + "Request duplicated machines", + extra=self.logging_extra, + ) + ) + await self.send_model(DuplicateContainersRequest()) + await asyncio.sleep(10 * 60) else: await asyncio.sleep(10) @@ -439,13 +452,41 @@ async def handle_message(self, raw_msg: str | bytes): ) redis_service = self.miner_service.redis_service - await redis_service.clear_set(RENTED_MACHINE_SET) + await redis_service.delete(RENTED_MACHINE_SET) for machine in response.machines: await redis_service.add_rented_machine(machine) return + try: + response = pydantic.TypeAdapter(DuplicateContainersResponse).validate_json(raw_msg) + except pydantic.ValidationError as exc: + logger.error( + _m( + "could not parse raw message as DuplicateContainersResponse", + extra={**self.logging_extra, "error": str(exc), "raw_msg": raw_msg}, + ) + ) + else: + logger.info( + _m( + "Duplicated containers", + extra={**self.logging_extra, "machines": response.containers}, + ) + ) + + redis_service = self.miner_service.redis_service + await redis_service.delete(DUPLICATED_MACHINE_SET) + + for container_id, details_list in response.containers.items(): + for detail in details_list: + executor_id = detail.get("executor_id") + miner_hotkey = detail.get("miner_hotkey") + await redis_service.sadd(DUPLICATED_MACHINE_SET, f"{miner_hotkey}:{executor_id}") + + return + try: job_request = self.accepted_request_type().parse(raw_msg) except Exception as ex: @@ -530,7 +571,7 @@ async def miner_driver(self, job_request: ContainerCreateRequest | ContainerDele extra={**logging_extra, "response": str(response)}, ) ) - await self.send_model(response) + await self.send_model(response) elif isinstance(job_request, ContainerStartRequest): job_request.miner_address = miner_axon_info.ip job_request.miner_port = miner_axon_info.port @@ -544,4 +585,4 @@ async def miner_driver(self, job_request: ContainerCreateRequest | ContainerDele extra={**logging_extra, "response": str(response)}, ) ) - await self.send_model(response) + await self.send_model(response) diff --git a/neurons/validators/src/payload_models/payloads.py b/neurons/validators/src/payload_models/payloads.py index 01906ad..b78787a 100644 --- a/neurons/validators/src/payload_models/payloads.py +++ b/neurons/validators/src/payload_models/payloads.py @@ -50,6 +50,7 @@ class ContainerRequestType(enum.Enum): ContainerStartRequest = "ContainerStartRequest" ContainerStopRequest = "ContainerStopRequest" ContainerDeleteRequest = "ContainerDeleteRequest" + DuplicateContainersResponse = "DuplicateContainersResponse" class ContainerBaseRequest(BaseRequest): @@ -137,3 +138,8 @@ class FailedContainerRequest(ContainerBaseResponse): message_type: ContainerResponseType = ContainerResponseType.FailedRequest msg: str error_code: FailedContainerErrorCodes | None = None + + +class DuplicateContainersResponse(BaseModel): + message_type: ContainerRequestType = ContainerRequestType.DuplicateContainersResponse + containers: dict[str, list] diff --git a/neurons/validators/src/protocol/vc_protocol/validator_requests.py b/neurons/validators/src/protocol/vc_protocol/validator_requests.py index 9dfafc8..64eed07 100644 --- a/neurons/validators/src/protocol/vc_protocol/validator_requests.py +++ b/neurons/validators/src/protocol/vc_protocol/validator_requests.py @@ -13,6 +13,7 @@ class RequestType(enum.Enum): ExecutorSpecRequest = "ExecutorSpecRequest" RentedMachineRequest = "RentedMachineRequest" LogStreamRequest = "LogStreamRequest" + DuplicateContainersRequest = "DuplicateContainersRequest" class BaseValidatorRequest(BaseRequest): @@ -70,3 +71,7 @@ class LogStreamRequest(BaseValidatorRequest): validator_hotkey: str executor_uuid: str logs: list[dict] + + +class DuplicateContainersRequest(BaseValidatorRequest): + message_type: RequestType = RequestType.DuplicateContainersRequest diff --git a/neurons/validators/src/services/redis_service.py b/neurons/validators/src/services/redis_service.py index 60b84e4..0cd2385 100644 --- a/neurons/validators/src/services/redis_service.py +++ b/neurons/validators/src/services/redis_service.py @@ -7,6 +7,7 @@ MACHINE_SPEC_CHANNEL_NAME = "channel:1" STREAMING_LOG_CHANNEL = "channel:2" RENTED_MACHINE_SET = "rented_machines" +DUPLICATED_MACHINE_SET = "duplicated_machines" EXECUTOR_COUNT_PREFIX = "executor_counts" AVAILABLE_PORT_MAPS_PREFIX = "available_port_maps" @@ -96,11 +97,6 @@ async def rpop(self, key: str) -> bytes: async with self.lock: return await self.redis.rpop(key) - async def clear_set(self, key: str): - """Clear all elements from a set in Redis.""" - async with self.lock: - await self.redis.delete(key) - async def hset(self, key: str, field: str, value: str): async with self.lock: await self.redis.hset(key, field, value) diff --git a/neurons/validators/src/services/task_service.py b/neurons/validators/src/services/task_service.py index d2c3df5..e61e8a0 100644 --- a/neurons/validators/src/services/task_service.py +++ b/neurons/validators/src/services/task_service.py @@ -26,7 +26,12 @@ LIB_NVIDIA_ML_DIGESTS, DOCKER_DIGESTS, ) -from services.redis_service import RedisService, RENTED_MACHINE_SET, AVAILABLE_PORT_MAPS_PREFIX +from services.redis_service import ( + RedisService, + RENTED_MACHINE_SET, + DUPLICATED_MACHINE_SET, + AVAILABLE_PORT_MAPS_PREFIX, +) from services.ssh_service import SSHService from services.hash_service import HashService @@ -488,6 +493,30 @@ async def create_task( ), ) + # check duplicated + is_duplicated = await self.redis_service.is_elem_exists_in_set( + DUPLICATED_MACHINE_SET, f"{miner_info.miner_hotkey}:{executor_info.uuid}" + ) + if is_duplicated: + log_status = "warning" + log_text = _m( + f"Executor is duplicated", + extra=get_extra_info(default_extra), + ) + logger.warning(log_text) + + await self.clear_remote_directory(ssh_client, remote_dir) + + return ( + machine_spec, + executor_info, + 0, + 0, + miner_info.job_batch_id, + log_status, + log_text, + ) + # check rented status is_rented = await self.redis_service.is_elem_exists_in_set( RENTED_MACHINE_SET, f"{miner_info.miner_hotkey}:{executor_info.uuid}"