Skip to content

Commit

Permalink
Merge pull request #150 from Datura-ai/main
Browse files Browse the repository at this point in the history
deploy validator
  • Loading branch information
pyon12 authored Dec 20, 2024
2 parents a2de119 + ee5b448 commit 580d618
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 93 deletions.
2 changes: 1 addition & 1 deletion neurons/validators/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ RUN echo "export PYTHONPATH=$PYTHONPATH" >> ~/.bash_profile
COPY --from=base-image /root/app/ /root/app/
COPY --from=base-image /opt/pypackages/ /opt/pypackages/

LABEL version="3.3.6"
LABEL version="3.3.7"

CMD ["bash", "run.sh"]
2 changes: 1 addition & 1 deletion neurons/validators/Dockerfile.runner
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WORKDIR /root/validator
COPY docker-compose.app.yml docker-compose.yml
COPY entrypoint.sh /entrypoint.sh

LABEL version="3.3.6"
LABEL version="3.3.7"

RUN chmod u+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
30 changes: 1 addition & 29 deletions neurons/validators/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion neurons/validators/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependencies = [
"databases==0.9.0",
"datura @ file:///${PROJECT_ROOT}/../../datura",
"decorator==5.1.1",
"docker==7.1.0",
"ecdsa==0.19.0",
"eth-hash==0.7.0",
"eth-keys==0.6.0",
Expand Down
69 changes: 50 additions & 19 deletions neurons/validators/src/miner_jobs/machine_scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import psutil
from functools import wraps
import hashlib
import docker
from base64 import b64encode
from cryptography.fernet import Fernet
import tempfile
Expand Down Expand Up @@ -534,26 +533,54 @@ def get_network_speed():
return data


def get_all_container_digests():
"""Verify and return the digests of all running containers."""
client = docker.from_env()
containers = client.containers.list()
def get_docker_info(content: bytes):
data = {
"version": "",
"container_id": "",
"containers": []
}

with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(content)
docker_path = temp_file.name

try:
run_cmd(f'chmod +x {docker_path}')

result = run_cmd(f'{docker_path} version --format "{{{{.Client.Version}}}}"')
data["version"] = result.strip()

result = run_cmd(f'{docker_path} ps --no-trunc --format "{{{{.ID}}}}"')
container_ids = result.strip().split('\n')

containers = []

for container_id in container_ids:
# Get the image ID of the container
result = run_cmd(f'{docker_path} inspect --format "{{{{.Image}}}}" {container_id}')
image_id = result.strip()

digests = [] # Initialize an empty list to store digests
# Get the image details
result = run_cmd(f'{docker_path} inspect --format "{{{{json .RepoDigests}}}}" {image_id}')
repo_digests = json.loads(result.strip())

for container in containers:
image_id = container.image.id
image = client.images.get(image_id)
digest = None
if image.tags:
for repo_digest in image.attrs['RepoDigests']:
if repo_digest.startswith(image.tags[0].split(':')[0]):
digest = repo_digest.split('@')[1]
break
if digest:
digests.append({'id': container.id, 'digest': digest}) # Add the digest to the list
digest = None
if repo_digests:
digest = repo_digests[0].split('@')[1]
if repo_digests[0].split('@')[0] == 'daturaai/compute-subnet-executor':
data["container_id"] = container_id

return digests
if digest:
containers.append({'id': container_id, 'digest': digest})
else:
containers.append({'id': container_id, 'digest': ''})

data["containers"] = containers

finally:
os.remove(docker_path)

return data


def get_md5_checksum_from_path(file_path):
Expand Down Expand Up @@ -719,10 +746,14 @@ def get_machine_specs():
data["os_scrape_error"] = repr(exc)

data["network"] = get_network_speed()
data["all_container_digests"] = get_all_container_digests()

docker_content = get_file_content("/usr/bin/docker")
data["docker"] = get_docker_info(docker_content)

data["md5_checksums"] = {
"nvidia_smi": get_md5_checksum_from_path(run_cmd("which nvidia-smi").strip()),
"libnvidia_ml": get_md5_checksum_from_file_content(nvmlLib_content),
"docker": get_md5_checksum_from_file_content(docker_content),
}

return data
Expand Down
5 changes: 5 additions & 0 deletions neurons/validators/src/services/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,8 @@
"565.57.01": "c801dd3fc4660f3a8ddf977cfdffe113",
"550.127.08": "ac925f2cd192ad971c5466d55945a243",
}

DOCKER_DIGESTS = {
"26.1.3": "52d8fcc2c4370bf324cdf17cbc586784",
"27.3.1": "40f1f7724fa0432ea6878692a05b998c",
}
4 changes: 3 additions & 1 deletion neurons/validators/src/services/docker_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
"daturaai/ubuntu",
]

LOG_STREAM_INTERVAL = 5 # 5 seconds


class DockerService:
def __init__(
Expand Down Expand Up @@ -121,7 +123,7 @@ async def handle_stream_logs(
self.is_realtime_logging = True

while True:
await asyncio.sleep(5)
await asyncio.sleep(LOG_STREAM_INTERVAL)

async with self.lock:
logs_to_process = self.logs_queue[:]
Expand Down
86 changes: 45 additions & 41 deletions neurons/validators/src/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
UNRENTED_MULTIPLIER,
HASHCAT_CONFIGS,
LIB_NVIDIA_ML_DIGESTS,
DOCKER_DIGESTS,
)
from services.redis_service import RedisService, RENTED_MACHINE_SET, AVAILABLE_PORT_MAPS_PREFIX
from services.ssh_service import SSHService
Expand Down Expand Up @@ -71,46 +72,23 @@ async def upload_directory(
# Await all upload tasks for the current directory
await asyncio.gather(*upload_tasks)

def check_digests(self, result, list_digests):
# Check if each digest exists in list_digests
digests_in_list = {}
each_digests = result['all_container_digests']
for each_digest in each_digests:
digest = each_digest['digest']
digests_in_list[digest] = digest in list_digests.values()

return digests_in_list

def check_duplidate_digests(self, result):
# Find duplicate digests in results
digest_count = {}
all_container_digests = result.get('all_container_digests', [])
for container_digest in all_container_digests:
digest = container_digest['digest']
if digest in digest_count:
digest_count[digest] += 1
else:
digest_count[digest] = 1

duplicates = {digest: count for digest, count in digest_count.items() if count > 1}
return duplicates

def check_empty_digests(self, result):
# Find empty digests in results
all_container_digests = result.get('all_container_digests', [])
return len(all_container_digests) == 0

def validate_digests(self, digests_in_list, duplicates, digests_empty):
# Check if any digest in digests_in_list is False
if any(not is_in_list for is_in_list in digests_in_list.values()):
def validate_digests(self, docker_digests, docker_hub_digests):
# Check if the list is empty
if not docker_digests:
return False

if duplicates:
return False
# Get unique digests
unique_digests = list({item['digest'] for item in docker_digests})

if digests_empty:
# Check for duplicates
if len(unique_digests) != len(docker_digests):
return False

# Check if any digest is invalid
for digest in unique_digests:
if digest not in docker_hub_digests.values():
return False

return True

async def clear_remote_directory(
Expand Down Expand Up @@ -365,6 +343,10 @@ async def create_task(
nvidia_driver = machine_spec.get("gpu", {}).get("driver", '')
libnvidia_ml = machine_spec.get('md5_checksums', {}).get('libnvidia_ml', '')

docker_version = machine_spec.get("docker", {}).get("version", '')
docker_digest = machine_spec.get('md5_checksums', {}).get('docker', '')
container_id = machine_spec.get('docker', {}).get('container_id', '')

logger.info(
_m(
"Machine spec scraped",
Expand Down Expand Up @@ -441,6 +423,31 @@ async def create_task(
log_text,
)

if not docker_version or DOCKER_DIGESTS.get(docker_version) != docker_digest:
log_status = "warning"
log_text = _m(
f"Docker is altered",
extra=get_extra_info({
**default_extra,
"docker_version": docker_version,
"docker_digest": docker_digest,
"container_id": container_id,
}),
)
logger.warning(log_text)

await self.clear_remote_directory(ssh_client, remote_dir)

return (
machine_spec,
executor_info,
0,
0,
miner_info.job_batch_id,
log_status,
log_text,
)

if nvidia_driver and LIB_NVIDIA_ML_DIGESTS.get(nvidia_driver) != libnvidia_ml:
log_status = "warning"
log_text = _m(
Expand Down Expand Up @@ -500,17 +507,14 @@ async def create_task(
)
else:
# if not rented, check docker digests
digests_in_list = self.check_digests(machine_spec, docker_hub_digests)
duplicates = self.check_duplidate_digests(machine_spec)
digests_empty = self.check_empty_digests(machine_spec) # True: docker image empty, False: docker image not empty
# Validate digests
self.is_valid = self.validate_digests(digests_in_list, duplicates, digests_empty)
docker_digests = machine_spec.get("docker", {}).get("containers", [])
self.is_valid = self.validate_digests(docker_digests, docker_hub_digests)
if not self.is_valid:
log_text = _m(
"Docker digests are not valid",
extra=get_extra_info({
**default_extra,
"docker_digests": machine_spec.get('all_container_digests', [])
"docker_digests": docker_digests
}),
)
log_status = "error"
Expand Down

0 comments on commit 580d618

Please sign in to comment.