Skip to content

Commit

Permalink
Merge pull request #163 from Datura-ai/main
Browse files Browse the repository at this point in the history
deploy executor
  • Loading branch information
pyon12 authored Jan 3, 2025
2 parents 2db7c62 + d824d87 commit ca7e2e6
Show file tree
Hide file tree
Showing 14 changed files with 496 additions and 150 deletions.
2 changes: 1 addition & 1 deletion neurons/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ RUN mkdir -p /etc/docker
RUN mkdir -p /etc/nvidia-container-runtime
RUN mkdir -p /root/.ssh

LABEL version="3.3.1"
LABEL version="3.3.2"

CMD ["bash", "run.sh"]
2 changes: 1 addition & 1 deletion neurons/executor/Dockerfile.runner
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ COPY entrypoint.sh /entrypoint.sh

RUN chmod u+x /entrypoint.sh

LABEL version="3.3.1"
LABEL version="3.3.2"

ENTRYPOINT ["/entrypoint.sh"]
43 changes: 42 additions & 1 deletion neurons/executor/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions neurons/executor/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ dependencies = [
"wheel==0.45.1",
"xxhash==3.5.0",
"yarl==1.18.3",
"pynvml>=12.0.0",
"psutil>=6.1.1",
]
requires-python = "==3.11.*"
readme = "README.md"
Expand Down
168 changes: 168 additions & 0 deletions neurons/executor/src/gpus_utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import asyncio
import logging
import time

import aiohttp
import click
import pynvml
import psutil

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class GPUMetricsTracker:
def __init__(self, threshold_percent: float = 10.0):
self.previous_metrics: dict[int, dict] = {}
self.threshold = threshold_percent

def has_significant_change(self, gpu_id: int, util: float, mem_used: float) -> bool:
if gpu_id not in self.previous_metrics:
self.previous_metrics[gpu_id] = {"util": util, "mem_used": mem_used}
return True

prev = self.previous_metrics[gpu_id]
util_diff = abs(util - prev["util"])
mem_diff_percent = abs(mem_used - prev["mem_used"]) / prev["mem_used"] * 100

if util_diff >= self.threshold or mem_diff_percent >= self.threshold:
self.previous_metrics[gpu_id] = {"util": util, "mem_used": mem_used}
return True
return False


async def scrape_gpu_metrics(
interval: int,
program_id: str,
signature: str,
executor_id: str,
validator_hotkey: str,
compute_rest_app_url: str,
):
try:
pynvml.nvmlInit()
device_count = pynvml.nvmlDeviceGetCount()
if device_count == 0:
logger.warning("No NVIDIA GPUs found in the system")
return
except pynvml.NVMLError as e:
logger.error(f"Failed to initialize NVIDIA Management Library: {e}")
logger.error(
"This might be because no NVIDIA GPU is present or drivers are not properly installed"
)
return

http_url = f"{compute_rest_app_url}/validator/{validator_hotkey}/update-gpu-metrics"
logger.info(f"Will send metrics to: {http_url}")

# Initialize the tracker
tracker = GPUMetricsTracker(threshold_percent=10.0)

async with aiohttp.ClientSession() as session:
logger.info(f"Scraping metrics for {device_count} GPUs...")
try:
while True:
try:
gpu_utilization = []
should_send = False

for i in range(device_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)

name = pynvml.nvmlDeviceGetName(handle)
if isinstance(name, bytes):
name = name.decode("utf-8")

utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
memory = pynvml.nvmlDeviceGetMemoryInfo(handle)

gpu_util = utilization.gpu
mem_used = memory.used
mem_total = memory.total
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")

# Check if there's a significant change for this GPU
if tracker.has_significant_change(i, gpu_util, mem_used):
should_send = True
logger.info(f"Significant change detected for GPU {i}")

gpu_utilization.append(
{
"utilization_in_percent": gpu_util,
"memory_utilization_in_bytes": mem_used,
"memory_utilization_in_percent": round(mem_used / mem_total * 100, 1)
}
)

# Get CPU, RAM, and Disk metrics using psutil
cpu_percent = psutil.cpu_percent(interval=1)
ram = psutil.virtual_memory()
disk = psutil.disk_usage('/')

cpu_ram_utilization = {
"cpu_utilization_in_percent": cpu_percent,
"ram_utilization_in_bytes": ram.used,
"ram_utilization_in_percent": ram.percent
}

disk_utilization = {
"disk_utilization_in_bytes": disk.used,
"disk_utilization_in_percent": disk.percent
}

# Only send if there's a significant change in any GPU
if should_send:
payload = {
"gpu_utilization": gpu_utilization,
"cpu_ram_utilization": cpu_ram_utilization,
"disk_utilization": disk_utilization,
"timestamp": timestamp,
"program_id": program_id,
"signature": signature,
"executor_id": executor_id,
}
# Send HTTP POST request
async with session.post(http_url, json=payload) as response:
if response.status == 200:
logger.info("Successfully sent metrics to backend")
else:
logger.error(f"Failed to send metrics. Status: {response.status}")
text = await response.text()
logger.error(f"Response: {text}")

await asyncio.sleep(interval)

except Exception as e:
logger.error(f"Error in main loop: {e}")
await asyncio.sleep(5) # Wait before retrying

except KeyboardInterrupt:
logger.info("Stopping GPU scraping...")
finally:
pynvml.nvmlShutdown()


@click.command()
@click.option("--program_id", prompt="Program ID", help="Program ID for monitoring")
@click.option("--signature", prompt="Signature", help="Signature for verification")
@click.option("--executor_id", prompt="Executor ID", help="Executor ID")
@click.option("--validator_hotkey", prompt="Validator Hotkey", help="Validator hotkey")
@click.option("--compute_rest_app_url", prompt="Compute-app Url", help="Compute-app Url")
@click.option("--interval", default=5, type=int, help="Scraping interval in seconds")
def main(
interval: int,
program_id: str,
signature: str,
executor_id: str,
validator_hotkey: str,
compute_rest_app_url: str,
):
asyncio.run(
scrape_gpu_metrics(
interval, program_id, signature, executor_id, validator_hotkey, compute_rest_app_url
)
)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion neurons/validators/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ RUN echo "export PYTHONPATH=$PYTHONPATH" >> ~/.bash_profile
COPY --from=base-image /root/app/ /root/app/
COPY --from=base-image /opt/pypackages/ /opt/pypackages/

LABEL version="3.3.13"
LABEL version="3.3.14"

CMD ["bash", "run.sh"]
2 changes: 1 addition & 1 deletion neurons/validators/Dockerfile.runner
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WORKDIR /root/validator
COPY docker-compose.app.yml docker-compose.yml
COPY entrypoint.sh /entrypoint.sh

LABEL version="3.3.13"
LABEL version="3.3.14"

RUN chmod u+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
18 changes: 17 additions & 1 deletion neurons/validators/src/core/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING
import argparse
import pathlib
from typing import TYPE_CHECKING

import bittensor
from pydantic import Field
Expand Down Expand Up @@ -28,13 +28,18 @@ class Settings(BaseSettings):
ASYNC_SQLALCHEMY_DATABASE_URI: str = Field(env="ASYNC_SQLALCHEMY_DATABASE_URI")
DEBUG: bool = Field(env="DEBUG", default=False)
DEBUG_MINER_HOTKEY: str = Field(env="DEBUG_MINER_HOTKEY", default="")
DEBUG_MINER_ADDRESS: str | None = Field(env="DEBUG_MINER_ADDRESS", default=None)
DEBUG_MINER_PORT: int | None = Field(env="DEBUG_MINER_PORT", default=None)

INTERNAL_PORT: int = Field(env="INTERNAL_PORT", default=8000)
BLOCKS_FOR_JOB: int = 50

REDIS_HOST: str = Field(env="REDIS_HOST", default="localhost")
REDIS_PORT: int = Field(env="REDIS_PORT", default=6379)
COMPUTE_APP_URI: str = "wss://celiumcompute.ai"
COMPUTE_REST_API_URL: str | None = Field(
env="COMPUTE_REST_API_URL", default="https://celiumcompute.ai/api"
)

ENV: str = Field(env="ENV", default="dev")

Expand Down Expand Up @@ -90,5 +95,16 @@ def get_bittensor_config(self) -> bittensor.config:

return bittensor.config(parser)

def get_debug_miner(self) -> dict:
if not self.DEBUG_MINER_ADDRESS or not self.DEBUG_MINER_PORT:
raise RuntimeError("Debug miner not configured")

miner = type("Miner", (object,), {})()
miner.hotkey = self.DEBUG_MINER_HOTKEY
miner.axon_info = type("AxonInfo", (object,), {})()
miner.axon_info.ip = self.DEBUG_MINER_ADDRESS
miner.axon_info.port = self.DEBUG_MINER_PORT
return miner


settings = Settings()
13 changes: 8 additions & 5 deletions neurons/validators/src/core/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
convert_weights_and_uids_for_emit,
process_weights_for_netuid,
)
from websockets.protocol import State as WebSocketClientState
from payload_models.payloads import MinerJobRequestPayload
from websockets.protocol import State as WebSocketClientState

from core.config import settings
from core.utils import _m, get_extra_info, get_logger
Expand Down Expand Up @@ -155,10 +155,12 @@ def set_subtensor(self):
logger.info(
_m(
"[Error] Getting subtensor",
extra=get_extra_info({
** self.default_extra,
"error": str(e),
}),
extra=get_extra_info(
{
**self.default_extra,
"error": str(e),
}
),
),
)

Expand Down Expand Up @@ -534,6 +536,7 @@ async def sync(self):
),
encypted_files=encypted_files,
docker_hub_digests=docker_hub_digests,
debug=settings.DEBUG,
)
)
for miner in miners
Expand Down
4 changes: 2 additions & 2 deletions neurons/validators/src/miner_jobs/machine_scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ def get_md5_checksum_from_file_content(file_content: bytes):
def get_libnvidia_ml_path():
try:
original_path = run_cmd("find /usr -name 'libnvidia-ml.so.1'").strip()
return original_path
return original_path.split('\n')[-1]
except:
return ''

Expand Down Expand Up @@ -767,4 +767,4 @@ def _encrypt(key: str, payload: str) -> str:
key = 'encrypt_key'
machine_specs = get_machine_specs()
encoded_str = _encrypt(key, json.dumps(machine_specs))
print(encoded_str)
print(encoded_str)
3 changes: 2 additions & 1 deletion neurons/validators/src/miner_jobs/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def run_hashcat(device_id: int, job: dict) -> list[str]:
payload_file.flush()
os.fsync(payload_file.fileno())

subprocess.check_output(f"cp /usr/bin/hashcat /usr/bin/hashcat{device_id}", shell=True)
if not os.path.exists(f"/usr/bin/hashcat{device_id}"):
subprocess.check_output(f"cp /usr/bin/hashcat /usr/bin/hashcat{device_id}", shell=True)

cmd = f'hashcat{device_id} --potfile-disable --restore-disable --attack-mode 3 -d {device_id} --workload-profile 3 --optimized-kernel-enable --hash-type {algorithm} --hex-salt -1 "?l?d?u" --outfile-format 2 --quiet {payload_file.name} "{mask}"'
stdout = subprocess.check_output(cmd, shell=True, text=True)
Expand Down
Loading

0 comments on commit ca7e2e6

Please sign in to comment.