From 3d556afc9f10924473c85454d39be5e5da9bb484 Mon Sep 17 00:00:00 2001 From: pyon12 Date: Thu, 26 Dec 2024 10:16:26 +0000 Subject: [PATCH 1/3] add duplicated machines logic --- .../validators/src/clients/compute_client.py | 42 ++++++++++++++++++- .../validators/src/payload_models/payloads.py | 6 +++ .../vc_protocol/validator_requests.py | 5 +++ .../validators/src/services/redis_service.py | 6 +-- .../validators/src/services/task_service.py | 31 +++++++++++++- 5 files changed, 83 insertions(+), 7 deletions(-) diff --git a/neurons/validators/src/clients/compute_client.py b/neurons/validators/src/clients/compute_client.py index d141575..0079289 100644 --- a/neurons/validators/src/clients/compute_client.py +++ b/neurons/validators/src/clients/compute_client.py @@ -13,6 +13,7 @@ ContainerCreateRequest, ContainerDeleteRequest, FailedContainerRequest, + DuplicateContainersResponse, ) from protocol.vc_protocol.compute_requests import Error, RentedMachineResponse, Response from protocol.vc_protocol.validator_requests import ( @@ -20,6 +21,7 @@ ExecutorSpecRequest, LogStreamRequest, RentedMachineRequest, + DuplicateContainersRequest, ) from pydantic import BaseModel from websockets.asyncio.client import ClientConnection @@ -30,6 +32,7 @@ from services.redis_service import ( MACHINE_SPEC_CHANNEL_NAME, RENTED_MACHINE_SET, + DUPLICATED_MACHINE_SET, STREAMING_LOG_CHANNEL, ) @@ -390,6 +393,15 @@ async def poll_rented_machines(self): ) ) await self.send_model(RentedMachineRequest()) + + logger.info( + _m( + "Request duplicated machines", + extra=self.logging_extra, + ) + ) + await self.send_model(DuplicateContainersRequest()) + await asyncio.sleep(10 * 60) else: await asyncio.sleep(10) @@ -433,13 +445,41 @@ async def handle_message(self, raw_msg: str | bytes): ) redis_service = self.miner_service.redis_service - await redis_service.clear_set(RENTED_MACHINE_SET) + await redis_service.delete(RENTED_MACHINE_SET) for machine in response.machines: await redis_service.add_rented_machine(machine) return + try: + response = pydantic.TypeAdapter(DuplicateContainersResponse).validate_json(raw_msg) + except pydantic.ValidationError as exc: + logger.error( + _m( + "could not parse raw message as DuplicateContainersResponse", + extra={**self.logging_extra, "error": str(exc), "raw_msg": raw_msg}, + ) + ) + else: + logger.info( + _m( + "Duplicated containers", + extra={**self.logging_extra, "machines": response.containers}, + ) + ) + + redis_service = self.miner_service.redis_service + await redis_service.delete(DUPLICATED_MACHINE_SET) + + for container_id, details_list in response.containers.items(): + for detail in details_list: + executor_id = detail.get("executor_id") + miner_hotkey = detail.get("miner_hotkey") + await redis_service.sadd(DUPLICATED_MACHINE_SET, f"{miner_hotkey}:{executor_id}") + + return + try: job_request = pydantic.TypeAdapter(ContainerCreateRequest).validate_json(raw_msg) except pydantic.ValidationError as exc: diff --git a/neurons/validators/src/payload_models/payloads.py b/neurons/validators/src/payload_models/payloads.py index 01906ad..b78787a 100644 --- a/neurons/validators/src/payload_models/payloads.py +++ b/neurons/validators/src/payload_models/payloads.py @@ -50,6 +50,7 @@ class ContainerRequestType(enum.Enum): ContainerStartRequest = "ContainerStartRequest" ContainerStopRequest = "ContainerStopRequest" ContainerDeleteRequest = "ContainerDeleteRequest" + DuplicateContainersResponse = "DuplicateContainersResponse" class ContainerBaseRequest(BaseRequest): @@ -137,3 +138,8 @@ class FailedContainerRequest(ContainerBaseResponse): message_type: ContainerResponseType = ContainerResponseType.FailedRequest msg: str error_code: FailedContainerErrorCodes | None = None + + +class DuplicateContainersResponse(BaseModel): + message_type: ContainerRequestType = ContainerRequestType.DuplicateContainersResponse + containers: dict[str, list] diff --git a/neurons/validators/src/protocol/vc_protocol/validator_requests.py b/neurons/validators/src/protocol/vc_protocol/validator_requests.py index 9dfafc8..64eed07 100644 --- a/neurons/validators/src/protocol/vc_protocol/validator_requests.py +++ b/neurons/validators/src/protocol/vc_protocol/validator_requests.py @@ -13,6 +13,7 @@ class RequestType(enum.Enum): ExecutorSpecRequest = "ExecutorSpecRequest" RentedMachineRequest = "RentedMachineRequest" LogStreamRequest = "LogStreamRequest" + DuplicateContainersRequest = "DuplicateContainersRequest" class BaseValidatorRequest(BaseRequest): @@ -70,3 +71,7 @@ class LogStreamRequest(BaseValidatorRequest): validator_hotkey: str executor_uuid: str logs: list[dict] + + +class DuplicateContainersRequest(BaseValidatorRequest): + message_type: RequestType = RequestType.DuplicateContainersRequest diff --git a/neurons/validators/src/services/redis_service.py b/neurons/validators/src/services/redis_service.py index 60b84e4..0cd2385 100644 --- a/neurons/validators/src/services/redis_service.py +++ b/neurons/validators/src/services/redis_service.py @@ -7,6 +7,7 @@ MACHINE_SPEC_CHANNEL_NAME = "channel:1" STREAMING_LOG_CHANNEL = "channel:2" RENTED_MACHINE_SET = "rented_machines" +DUPLICATED_MACHINE_SET = "duplicated_machines" EXECUTOR_COUNT_PREFIX = "executor_counts" AVAILABLE_PORT_MAPS_PREFIX = "available_port_maps" @@ -96,11 +97,6 @@ async def rpop(self, key: str) -> bytes: async with self.lock: return await self.redis.rpop(key) - async def clear_set(self, key: str): - """Clear all elements from a set in Redis.""" - async with self.lock: - await self.redis.delete(key) - async def hset(self, key: str, field: str, value: str): async with self.lock: await self.redis.hset(key, field, value) diff --git a/neurons/validators/src/services/task_service.py b/neurons/validators/src/services/task_service.py index d2c3df5..e61e8a0 100644 --- a/neurons/validators/src/services/task_service.py +++ b/neurons/validators/src/services/task_service.py @@ -26,7 +26,12 @@ LIB_NVIDIA_ML_DIGESTS, DOCKER_DIGESTS, ) -from services.redis_service import RedisService, RENTED_MACHINE_SET, AVAILABLE_PORT_MAPS_PREFIX +from services.redis_service import ( + RedisService, + RENTED_MACHINE_SET, + DUPLICATED_MACHINE_SET, + AVAILABLE_PORT_MAPS_PREFIX, +) from services.ssh_service import SSHService from services.hash_service import HashService @@ -488,6 +493,30 @@ async def create_task( ), ) + # check duplicated + is_duplicated = await self.redis_service.is_elem_exists_in_set( + DUPLICATED_MACHINE_SET, f"{miner_info.miner_hotkey}:{executor_info.uuid}" + ) + if is_duplicated: + log_status = "warning" + log_text = _m( + f"Executor is duplicated", + extra=get_extra_info(default_extra), + ) + logger.warning(log_text) + + await self.clear_remote_directory(ssh_client, remote_dir) + + return ( + machine_spec, + executor_info, + 0, + 0, + miner_info.job_batch_id, + log_status, + log_text, + ) + # check rented status is_rented = await self.redis_service.is_elem_exists_in_set( RENTED_MACHINE_SET, f"{miner_info.miner_hotkey}:{executor_info.uuid}" From d07d8c9458e9ab23d93f1ba6e29e1bed3ca2f855 Mon Sep 17 00:00:00 2001 From: pyon12 Date: Thu, 26 Dec 2024 10:19:04 +0000 Subject: [PATCH 2/3] update validator version --- neurons/validators/Dockerfile | 2 +- neurons/validators/Dockerfile.runner | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/neurons/validators/Dockerfile b/neurons/validators/Dockerfile index 5d565dd..e9a73b1 100644 --- a/neurons/validators/Dockerfile +++ b/neurons/validators/Dockerfile @@ -47,6 +47,6 @@ RUN echo "export PYTHONPATH=$PYTHONPATH" >> ~/.bash_profile COPY --from=base-image /root/app/ /root/app/ COPY --from=base-image /opt/pypackages/ /opt/pypackages/ -LABEL version="3.3.10" +LABEL version="3.3.11" CMD ["bash", "run.sh"] \ No newline at end of file diff --git a/neurons/validators/Dockerfile.runner b/neurons/validators/Dockerfile.runner index 71f0fdf..5ffd4fe 100644 --- a/neurons/validators/Dockerfile.runner +++ b/neurons/validators/Dockerfile.runner @@ -3,7 +3,7 @@ WORKDIR /root/validator COPY docker-compose.app.yml docker-compose.yml COPY entrypoint.sh /entrypoint.sh -LABEL version="3.3.10" +LABEL version="3.3.11" RUN chmod u+x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] From eca46d49c75c539f4f5e788eca17dce6e2e97cff Mon Sep 17 00:00:00 2001 From: pyon12 Date: Fri, 27 Dec 2024 13:27:06 +0000 Subject: [PATCH 3/3] update validator version --- neurons/validators/Dockerfile | 2 +- neurons/validators/Dockerfile.runner | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/neurons/validators/Dockerfile b/neurons/validators/Dockerfile index 81cd98e..86d1aab 100644 --- a/neurons/validators/Dockerfile +++ b/neurons/validators/Dockerfile @@ -47,6 +47,6 @@ RUN echo "export PYTHONPATH=$PYTHONPATH" >> ~/.bash_profile COPY --from=base-image /root/app/ /root/app/ COPY --from=base-image /opt/pypackages/ /opt/pypackages/ -LABEL version="3.3.11" +LABEL version="3.3.12" CMD ["bash", "run.sh"] diff --git a/neurons/validators/Dockerfile.runner b/neurons/validators/Dockerfile.runner index 5ffd4fe..bdc88c7 100644 --- a/neurons/validators/Dockerfile.runner +++ b/neurons/validators/Dockerfile.runner @@ -3,7 +3,7 @@ WORKDIR /root/validator COPY docker-compose.app.yml docker-compose.yml COPY entrypoint.sh /entrypoint.sh -LABEL version="3.3.11" +LABEL version="3.3.12" RUN chmod u+x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"]