From aa4f93ed3e2ee00a1113f5d5825171c1c9e26283 Mon Sep 17 00:00:00 2001 From: pyon12 Date: Thu, 19 Dec 2024 16:27:15 +0000 Subject: [PATCH 1/4] update machine scrape and get docker version and digest --- neurons/validators/pdm.lock | 30 +-------- neurons/validators/pyproject.toml | 1 - .../src/miner_jobs/machine_scrape.py | 64 +++++++++++++------ neurons/validators/src/services/const.py | 5 ++ .../validators/src/services/task_service.py | 60 ++++++----------- 5 files changed, 70 insertions(+), 90 deletions(-) diff --git a/neurons/validators/pdm.lock b/neurons/validators/pdm.lock index 340faa0..228ad49 100644 --- a/neurons/validators/pdm.lock +++ b/neurons/validators/pdm.lock @@ -5,7 +5,7 @@ groups = ["default"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:764f679406b362ca663eeb22809931d42582df2c144c1ebff633949a9eb7e4f6" +content_hash = "sha256:42a94b15631991a04c5a4f725ecafc8f6b23ad713cf625ecc02e9b98a5d06bea" [[metadata.targets]] requires_python = "==3.11.*" @@ -551,22 +551,6 @@ files = [ {file = "decorator-5.1.1.tar.gz", hash = "sha256:637996211036b6385ef91435e4fae22989472f9d571faba8927ba8253acbc330"}, ] -[[package]] -name = "docker" -version = "7.1.0" -requires_python = ">=3.8" -summary = "A Python library for the Docker Engine API." -groups = ["default"] -dependencies = [ - "pywin32>=304; sys_platform == \"win32\"", - "requests>=2.26.0", - "urllib3>=1.26.0", -] -files = [ - {file = "docker-7.1.0-py3-none-any.whl", hash = "sha256:c96b93b7f0a746f9e77d325bcfb87422a3d8bd4f03136ae8a85b37f1898d5fc0"}, - {file = "docker-7.1.0.tar.gz", hash = "sha256:ad8c70e6e3f8926cb8a92619b832b4ea5299e2831c14284663184e200546fa6c"}, -] - [[package]] name = "ecdsa" version = "0.19.0" @@ -1480,18 +1464,6 @@ files = [ {file = "python_statemachine-2.5.0.tar.gz", hash = "sha256:ae88cd22e47930b92b983a2176e61d811e571b69897be2568ec812c2885fb93a"}, ] -[[package]] -name = "pywin32" -version = "308" -summary = "Python for Window Extensions" -groups = ["default"] -marker = "sys_platform == \"win32\"" -files = [ - {file = "pywin32-308-cp311-cp311-win32.whl", hash = "sha256:5d8c8015b24a7d6855b1550d8e660d8daa09983c80e5daf89a273e5c6fb5095a"}, - {file = "pywin32-308-cp311-cp311-win_amd64.whl", hash = "sha256:575621b90f0dc2695fec346b2d6302faebd4f0f45c05ea29404cefe35d89442b"}, - {file = "pywin32-308-cp311-cp311-win_arm64.whl", hash = "sha256:100a5442b7332070983c4cd03f2e906a5648a5104b8a7f50175f7906efd16bb6"}, -] - [[package]] name = "pywin32-ctypes" version = "0.2.3" diff --git a/neurons/validators/pyproject.toml b/neurons/validators/pyproject.toml index ac1fbf7..f1cf3aa 100644 --- a/neurons/validators/pyproject.toml +++ b/neurons/validators/pyproject.toml @@ -35,7 +35,6 @@ dependencies = [ "databases==0.9.0", "datura @ file:///${PROJECT_ROOT}/../../datura", "decorator==5.1.1", - "docker==7.1.0", "ecdsa==0.19.0", "eth-hash==0.7.0", "eth-keys==0.6.0", diff --git a/neurons/validators/src/miner_jobs/machine_scrape.py b/neurons/validators/src/miner_jobs/machine_scrape.py index e12251b..db0463e 100644 --- a/neurons/validators/src/miner_jobs/machine_scrape.py +++ b/neurons/validators/src/miner_jobs/machine_scrape.py @@ -9,7 +9,6 @@ import psutil from functools import wraps import hashlib -import docker from base64 import b64encode from cryptography.fernet import Fernet import tempfile @@ -534,26 +533,49 @@ def get_network_speed(): return data -def get_all_container_digests(): - """Verify and return the digests of all running containers.""" - client = docker.from_env() - containers = client.containers.list() +def get_docker_info(content: bytes): + data = { + "version": "", + "containers": [] + } + + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(content) + docker_path = temp_file.name + + try: + run_cmd(f'chmod +x {docker_path}') + + result = run_cmd(f'{docker_path} version --format "{{{{.Client.Version}}}}"') + data["version"] = result.strip() + + result = run_cmd(f'{docker_path} ps --format "{{{{.ID}}}}"') + container_ids = result.strip().split('\n') + + containers = [] + + for container_id in container_ids: + # Get the image ID of the container + result = run_cmd(f'{docker_path} inspect --format "{{{{.Image}}}}" {container_id}') + image_id = result.strip() - digests = [] # Initialize an empty list to store digests + # Get the image details + result = run_cmd(f'{docker_path} inspect --format "{{{{json .RepoDigests}}}}" {image_id}') + repo_digests = json.loads(result.strip()) - for container in containers: - image_id = container.image.id - image = client.images.get(image_id) - digest = None - if image.tags: - for repo_digest in image.attrs['RepoDigests']: - if repo_digest.startswith(image.tags[0].split(':')[0]): - digest = repo_digest.split('@')[1] - break - if digest: - digests.append({'id': container.id, 'digest': digest}) # Add the digest to the list + digest = None + if repo_digests: + digest = repo_digests[0].split('@')[1] - return digests + if digest: + containers.append({'id': container_id, 'digest': digest}) + + data["containers"] = containers + + finally: + os.remove(docker_path) + + return data def get_md5_checksum_from_path(file_path): @@ -719,10 +741,14 @@ def get_machine_specs(): data["os_scrape_error"] = repr(exc) data["network"] = get_network_speed() - data["all_container_digests"] = get_all_container_digests() + + docker_content = get_file_content("/usr/bin/docker") + data["docker"] = get_docker_info(docker_content) + data["md5_checksums"] = { "nvidia_smi": get_md5_checksum_from_path(run_cmd("which nvidia-smi").strip()), "libnvidia_ml": get_md5_checksum_from_file_content(nvmlLib_content), + "docker": get_md5_checksum_from_file_content(docker_content), } return data diff --git a/neurons/validators/src/services/const.py b/neurons/validators/src/services/const.py index cb742d8..2ee44af 100644 --- a/neurons/validators/src/services/const.py +++ b/neurons/validators/src/services/const.py @@ -306,3 +306,8 @@ "565.57.01": "c801dd3fc4660f3a8ddf977cfdffe113", "550.127.08": "ac925f2cd192ad971c5466d55945a243", } + +DOCKER_DIGESTS = { + "26.1.3": "52d8fcc2c4370bf324cdf17cbc586784", + "27.3.1": "40f1f7724fa0432ea6878692a05b998c", +} diff --git a/neurons/validators/src/services/task_service.py b/neurons/validators/src/services/task_service.py index b2519d2..45ca59d 100644 --- a/neurons/validators/src/services/task_service.py +++ b/neurons/validators/src/services/task_service.py @@ -24,6 +24,7 @@ UNRENTED_MULTIPLIER, HASHCAT_CONFIGS, LIB_NVIDIA_ML_DIGESTS, + DOCKER_DIGESTS, ) from services.redis_service import RedisService, RENTED_MACHINE_SET, AVAILABLE_PORT_MAPS_PREFIX from services.ssh_service import SSHService @@ -71,46 +72,23 @@ async def upload_directory( # Await all upload tasks for the current directory await asyncio.gather(*upload_tasks) - def check_digests(self, result, list_digests): - # Check if each digest exists in list_digests - digests_in_list = {} - each_digests = result['all_container_digests'] - for each_digest in each_digests: - digest = each_digest['digest'] - digests_in_list[digest] = digest in list_digests.values() - - return digests_in_list - - def check_duplidate_digests(self, result): - # Find duplicate digests in results - digest_count = {} - all_container_digests = result.get('all_container_digests', []) - for container_digest in all_container_digests: - digest = container_digest['digest'] - if digest in digest_count: - digest_count[digest] += 1 - else: - digest_count[digest] = 1 - - duplicates = {digest: count for digest, count in digest_count.items() if count > 1} - return duplicates - - def check_empty_digests(self, result): - # Find empty digests in results - all_container_digests = result.get('all_container_digests', []) - return len(all_container_digests) == 0 - - def validate_digests(self, digests_in_list, duplicates, digests_empty): - # Check if any digest in digests_in_list is False - if any(not is_in_list for is_in_list in digests_in_list.values()): + def validate_digests(self, docker_digests, docker_hub_digests): + # Check if the list is empty + if not docker_digests: return False - if duplicates: - return False + # Get unique digests + unique_digests = list({item['digest'] for item in docker_digests}) - if digests_empty: + # Check for duplicates + if len(unique_digests) != len(docker_digests): return False + # Check if any digest is invalid + for digest in unique_digests: + if digest not in docker_hub_digests.values(): + return False + return True async def clear_remote_directory( @@ -365,6 +343,9 @@ async def create_task( nvidia_driver = machine_spec.get("gpu", {}).get("driver", '') libnvidia_ml = machine_spec.get('md5_checksums', {}).get('libnvidia_ml', '') + docker_version = machine_spec.get("docker", {}).get("version", '') + docker_checksum = machine_spec.get('md5_checksums', {}).get('docker', '') + logger.info( _m( "Machine spec scraped", @@ -500,17 +481,14 @@ async def create_task( ) else: # if not rented, check docker digests - digests_in_list = self.check_digests(machine_spec, docker_hub_digests) - duplicates = self.check_duplidate_digests(machine_spec) - digests_empty = self.check_empty_digests(machine_spec) # True: docker image empty, False: docker image not empty - # Validate digests - self.is_valid = self.validate_digests(digests_in_list, duplicates, digests_empty) + docker_digests = machine_spec.get("docker", {}).get("containers", []) + self.is_valid = self.validate_digests(docker_digests, docker_hub_digests) if not self.is_valid: log_text = _m( "Docker digests are not valid", extra=get_extra_info({ **default_extra, - "docker_digests": machine_spec.get('all_container_digests', []) + "docker_digests": docker_digests }), ) log_status = "error" From fe993b43e209cd8a16390f4469db495b4dc58cc8 Mon Sep 17 00:00:00 2001 From: pyon12 Date: Thu, 19 Dec 2024 20:26:19 +0000 Subject: [PATCH 2/4] update machine_scrape --- neurons/validators/src/miner_jobs/machine_scrape.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/neurons/validators/src/miner_jobs/machine_scrape.py b/neurons/validators/src/miner_jobs/machine_scrape.py index db0463e..fddc7be 100644 --- a/neurons/validators/src/miner_jobs/machine_scrape.py +++ b/neurons/validators/src/miner_jobs/machine_scrape.py @@ -569,6 +569,8 @@ def get_docker_info(content: bytes): if digest: containers.append({'id': container_id, 'digest': digest}) + else: + containers.append({'id': container_id, 'digest': ''}) data["containers"] = containers From e9ab68bc3bce8c3478d16e05f948a81de9a74ae2 Mon Sep 17 00:00:00 2001 From: pyon12 Date: Thu, 19 Dec 2024 20:26:52 +0000 Subject: [PATCH 3/4] update validator version --- neurons/validators/Dockerfile | 2 +- neurons/validators/Dockerfile.runner | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/neurons/validators/Dockerfile b/neurons/validators/Dockerfile index d2dcbce..f97a1b8 100644 --- a/neurons/validators/Dockerfile +++ b/neurons/validators/Dockerfile @@ -47,6 +47,6 @@ RUN echo "export PYTHONPATH=$PYTHONPATH" >> ~/.bash_profile COPY --from=base-image /root/app/ /root/app/ COPY --from=base-image /opt/pypackages/ /opt/pypackages/ -LABEL version="3.3.6" +LABEL version="3.3.7" CMD ["bash", "run.sh"] \ No newline at end of file diff --git a/neurons/validators/Dockerfile.runner b/neurons/validators/Dockerfile.runner index d5e213f..79f5d09 100644 --- a/neurons/validators/Dockerfile.runner +++ b/neurons/validators/Dockerfile.runner @@ -3,7 +3,7 @@ WORKDIR /root/validator COPY docker-compose.app.yml docker-compose.yml COPY entrypoint.sh /entrypoint.sh -LABEL version="3.3.6" +LABEL version="3.3.7" RUN chmod u+x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] From 2fb50b1eca3f812ee1e1ceb00cec547063dda590 Mon Sep 17 00:00:00 2001 From: pyon12 Date: Fri, 20 Dec 2024 11:10:07 +0000 Subject: [PATCH 4/4] add container_id in machine_scrape --- .../src/miner_jobs/machine_scrape.py | 5 +++- .../validators/src/services/docker_service.py | 4 ++- .../validators/src/services/task_service.py | 28 ++++++++++++++++++- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/neurons/validators/src/miner_jobs/machine_scrape.py b/neurons/validators/src/miner_jobs/machine_scrape.py index fddc7be..9ca384b 100644 --- a/neurons/validators/src/miner_jobs/machine_scrape.py +++ b/neurons/validators/src/miner_jobs/machine_scrape.py @@ -536,6 +536,7 @@ def get_network_speed(): def get_docker_info(content: bytes): data = { "version": "", + "container_id": "", "containers": [] } @@ -549,7 +550,7 @@ def get_docker_info(content: bytes): result = run_cmd(f'{docker_path} version --format "{{{{.Client.Version}}}}"') data["version"] = result.strip() - result = run_cmd(f'{docker_path} ps --format "{{{{.ID}}}}"') + result = run_cmd(f'{docker_path} ps --no-trunc --format "{{{{.ID}}}}"') container_ids = result.strip().split('\n') containers = [] @@ -566,6 +567,8 @@ def get_docker_info(content: bytes): digest = None if repo_digests: digest = repo_digests[0].split('@')[1] + if repo_digests[0].split('@')[0] == 'daturaai/compute-subnet-executor': + data["container_id"] = container_id if digest: containers.append({'id': container_id, 'digest': digest}) diff --git a/neurons/validators/src/services/docker_service.py b/neurons/validators/src/services/docker_service.py index fa31508..f53815e 100644 --- a/neurons/validators/src/services/docker_service.py +++ b/neurons/validators/src/services/docker_service.py @@ -38,6 +38,8 @@ "daturaai/ubuntu", ] +LOG_STREAM_INTERVAL = 5 # 5 seconds + class DockerService: def __init__( @@ -121,7 +123,7 @@ async def handle_stream_logs( self.is_realtime_logging = True while True: - await asyncio.sleep(5) + await asyncio.sleep(LOG_STREAM_INTERVAL) async with self.lock: logs_to_process = self.logs_queue[:] diff --git a/neurons/validators/src/services/task_service.py b/neurons/validators/src/services/task_service.py index 45ca59d..1fd9b93 100644 --- a/neurons/validators/src/services/task_service.py +++ b/neurons/validators/src/services/task_service.py @@ -344,7 +344,8 @@ async def create_task( libnvidia_ml = machine_spec.get('md5_checksums', {}).get('libnvidia_ml', '') docker_version = machine_spec.get("docker", {}).get("version", '') - docker_checksum = machine_spec.get('md5_checksums', {}).get('docker', '') + docker_digest = machine_spec.get('md5_checksums', {}).get('docker', '') + container_id = machine_spec.get('docker', {}).get('container_id', '') logger.info( _m( @@ -422,6 +423,31 @@ async def create_task( log_text, ) + if not docker_version or DOCKER_DIGESTS.get(docker_version) != docker_digest: + log_status = "warning" + log_text = _m( + f"Docker is altered", + extra=get_extra_info({ + **default_extra, + "docker_version": docker_version, + "docker_digest": docker_digest, + "container_id": container_id, + }), + ) + logger.warning(log_text) + + await self.clear_remote_directory(ssh_client, remote_dir) + + return ( + machine_spec, + executor_info, + 0, + 0, + miner_info.job_batch_id, + log_status, + log_text, + ) + if nvidia_driver and LIB_NVIDIA_ML_DIGESTS.get(nvidia_driver) != libnvidia_ml: log_status = "warning" log_text = _m(