From 9ce9d9f699bb0acd67bb6e440bcff72cd8c38f71 Mon Sep 17 00:00:00 2001 From: Aleksandr Prusov Date: Mon, 28 Oct 2024 15:26:40 +0300 Subject: [PATCH] [wip] add blacklist logic (#3) * add blacklist logic * update readme * delete exited nodes * add ban logic for crashloopbackoff * add ban logic for crashloopbackoff * fix BLACKLIST_RESTART_TTL_SECONDS --- .env.example | 4 ++ .pre-commit-config.yaml | 2 +- README.md | 34 +++++++----- docker-compose.yml | 10 ++++ src/main.py | 91 ++++++++++++++++++------------ src/models/blacklist.py | 108 ++++++++++++++++++++++++++++++++++++ src/models/vast.py | 19 +++++-- src/models/vast_instance.py | 6 +- src/requirements.txt | 1 + src/settings.py | 30 ++++++++++ 10 files changed, 248 insertions(+), 57 deletions(-) create mode 100644 src/models/blacklist.py create mode 100644 src/settings.py diff --git a/.env.example b/.env.example index f57cd00..ba62d2f 100644 --- a/.env.example +++ b/.env.example @@ -4,3 +4,7 @@ VAST_TEMPLATE_NAME=xxx VAST_TEMPLATE_IMAGE=xxx VAST_API_KEY=xxx DOCKER_LOGIN=xxx +BLACKLIST_ENABLED=false +BLACKLIST_REDIS_URL=redis://redis:6379/10 +BLACKLIST_BAN_AFTER_SECONDS=1200 +BLACKLIST_BAN_TTL_SECONDS=86400 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6185add..3de3c73 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -30,4 +30,4 @@ repos: rev: 'v1.8.0' hooks: - id: mypy - additional_dependencies: [types-retry==0.9.9.4, types-requests==2.31.0.20240403] + additional_dependencies: [types-retry==0.9.9.4, types-requests==2.31.0.20240403, types-redis==4.6.0.20240903] diff --git a/README.md b/README.md index b929f4d..565fa81 100644 --- a/README.md +++ b/README.md @@ -30,23 +30,31 @@ The application creates instances from custom templates and recreates instances - Create the instance if it doesn't exist. - Delete the instance if the image is not the same as the one provided in `TEMPLATE_IMAGE`, or if the template hash ID is not the same as found in step #1. - Do nothing if instance is up-to-date -3. Wait for the instance to become online and for the external port to be provided. +3. Wait for the instance to become online and for the external port to be provided. * 4. Generate an Nginx configuration with the `proxy_pass` configuration. 5. Run the Nginx process. +> A long-awaited instance host could be marked as blacklisted if the `BLACKLIST_ENABLED=true` environment variable is set. + This approach allows you to create a StatefulSet with static `POD_NAME`s and add a long startup probe plus a liveness probe to cover the case when the instance becomes unavailable. The pod becomes ready when the vast.ai instance is ready, and it can be used in k8s services. You can configure the application via environment variables: -| ENV VAR | REQUIRED | DEFAULT | DESCRIPTION | -|--------------------|----------|---------|--------------------------------------------------------------------------| -| POD_NAME | TRUE | - | Unique instance identifier | -| VAST_API_KEY | TRUE | - | Vast.ai API key | -| VAST_TEMPLATE_NAME | TRUE | - | Private template name (will be created in your account) | -| VAST_TEMPLATE_IMAGE| TRUE | - | Target instance image | -| DOCKER_LOGIN | TRUE | - | Docker login command in the format `-u -p ` | -| VAST_SEARCH_QUERY | TRUE | - | Vast [search new instance query](https://vast.ai/docs/search/search-gpus)| -| NGINX_CONFIG_PATH | FALSE | /etc/nginx/http.d/default.conf | Path to store the final Nginx config | -| NGINX_LISTEN_PORT | FALSE | 3000 | Nginx listen port | -| NGINX_MAX_BODY_SIZE| FALSE | 10M | Nginx `max_body_size` parameter | -| DEBUG | FALSE | false | Enable debug logs | +| ENV VAR | REQUIRED | DEFAULT | DESCRIPTION | +|-----------------------------|----------|--------------------------------|--------------------------------------------------------------------------| +| POD_NAME | TRUE | - | Unique instance identifier | +| VAST_API_KEY | TRUE | - | Vast.ai API key | +| VAST_TEMPLATE_NAME | TRUE | - | Private template name (will be created in your account) | +| VAST_TEMPLATE_IMAGE | TRUE | - | Target instance image | +| DOCKER_LOGIN | TRUE | - | Docker login command in the format `-u -p ` | +| VAST_SEARCH_QUERY | TRUE | - | Vast [search new instance query](https://vast.ai/docs/search/search-gpus)| +| NGINX_CONFIG_PATH | FALSE | /etc/nginx/http.d/default.conf | Path to store the final Nginx config | +| NGINX_LISTEN_PORT | FALSE | 3000 | Nginx listen port | +| NGINX_MAX_BODY_SIZE | FALSE | 10M | Nginx `max_body_size` parameter | +| BLACKLIST_ENABLED | FALSE | false | Enable blacklist logic | +| BLACKLIST_REDIS_URL | FALSE | redis://redis:6379/10 | Redis to be used as blacklist storage | +| BLACKLIST_BAN_AFTER_SECONDS | FALSE | 1200 (20 minutes) | Waiting duration before instance marked as banned | +| BLACKLIST_BAN_TTL_SECONDS | FALSE | 604800 (7 days) | Host ban duration | +| BLACKLIST_RESTART_TTL_SECONDS | FALSE | 3600 (1 hour) | Flush pod restarts counter if no restarts for that time duration | +| BLACKLIST_RESTART_THRESHOLD | FALSE | 5 | Ban host if pod has this many restarts restarts within BLACKLIST_RESTART_TTL_SECONDS | +| DEBUG | FALSE | false | Enable debug logs | diff --git a/docker-compose.yml b/docker-compose.yml index 19a0f11..2b0d72c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,17 @@ services: proxy: + container_name: proxy build: context: src env_file: .env ports: - 3000:3000 + redis: + image: redis:7.4.0-alpine3.20 + container_name: redis + environment: + - REDIS_PASSWORD=testpassword + - REDIS_USER=testuser + - REDIS_USER_PASSWORD=testuserpassword + ports: + - 6379:6379 diff --git a/src/main.py b/src/main.py index 90dddbe..843e38c 100755 --- a/src/main.py +++ b/src/main.py @@ -1,78 +1,95 @@ -import os import time import logging -import json from jinja2 import Environment, FileSystemLoader from models.vast import VastController +from models.blacklist import Blacklist +from settings import Settings +from typing import Optional +logging.basicConfig(level=logging.DEBUG if Settings.log_debug else logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S") +logging.info({"template_name": Settings.template_name, "template_image": Settings.template_image, "pod_name": Settings.pod_name}) -def getenv(envname: str, default: str | None = None): - v = os.getenv(envname) - if v: - return v - if not default: - raise Exception(f"Env var {envname} is not defined") - return default - - -pod_name = getenv("POD_NAME") -vast_api_key = getenv("VAST_API_KEY") -template_name = getenv("VAST_TEMPLATE_NAME") -template_image = getenv("VAST_TEMPLATE_IMAGE") -nginx_config_path = getenv("NGINX_CONFIG_PATH", "/etc/nginx/http.d/default.conf") -docker_login = getenv("DOCKER_LOGIN") -vast_search_query = json.loads(getenv("VAST_SEARCH_QUERY")) -nginx_listen_port = getenv("NGINX_LISTEN_PORT", "3000") -nginx_max_body_size = getenv("NGINX_MAX_BODY_SIZE", "10M") -log_debug = getenv("DEBUG", "false") - -logging.basicConfig(level=logging.DEBUG if log_debug.lower() == "true" else logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S") -logging.info({"template_name": template_name, "template_image": template_image, "pod_name": pod_name}) - -vast = VastController(api_key=vast_api_key) +vast = VastController(api_key=Settings.vast_api_key) +blacklist: Optional[Blacklist] = None +if Settings.blacklist_enabled: + blacklist = Blacklist(redis_url=Settings.blacklist_redis, ban_ttl=Settings.blacklist_ban_ttl_seconds) # get template by name -logging.info(f"Searching template with name '{template_name}' ...") -template = vast.getTemplateByName(template_name) +logging.info(f"Searching template with name '{Settings.template_name}' ...") +template = vast.getTemplateByName(Settings.template_name) if not template: - raise Exception(f"Template with name '{template_name}' not found") + raise Exception(f"Template with name '{Settings.template_name}' not found") logging.info(template) # check if instance already exists -instance_label = f"k8s_pod={pod_name}" +instance_label = f"k8s_pod={Settings.pod_name}" logging.info(f"Searching instance with label '{instance_label}' ...") instance = vast.getInstanceByLabel(instance_label) if instance: logging.info(instance) - if instance.image != template_image or instance.template_hash_id != template.id or instance.status == "offline": - logging.info("Destroying instance...") + destroy_reason = "" + if instance.image != Settings.template_image: + destroy_reason = "instance has wrong image" + elif instance.template_hash_id != template.id: + destroy_reason = "instance has wrong template" + elif instance.status in ["offline", "exited"]: + destroy_reason = f"instance is {instance.status}" + elif blacklist and blacklist.isBanned(instance.hostId): + destroy_reason = "instance host is banned" + elif blacklist and blacklist.getAndIncreaseInstanceRestarts(instance.id) > Settings.blacklist_restart_threshold: + logging.info("Too many pod restarts, instance host will be banned for some time...") + blacklist.add(instance.hostId, reason="restarts") + destroy_reason = "too many pod restarts" + if destroy_reason: + logging.info(f"Destroying instance, {destroy_reason}...") instance.destroy() + if blacklist: + blacklist.cleanInstanceKeys(instance.id) instance = None else: logging.info("Instance image and template are up-to-date") # create instance if not exists or not up-to-date +# TODO: find a way to get docker_login from the template if not instance: - vast.createInstance(template=template, label=instance_label, docker_login=docker_login, search_query=vast_search_query, image=template_image) + vast.createInstance( + template=template, + label=instance_label, + docker_login=Settings.docker_login, + search_query=Settings.vast_search_query, + image=Settings.template_image, + blacklist_host_ids=blacklist.list() if blacklist else [], + ) # wait for instance become online +wait_start_time = 0 while True: instance = vast.getInstanceByLabel(instance_label) if not instance: raise Exception("Instance not found") + if not wait_start_time: + wait_start_time = blacklist.getInstanceStartTime(instance.id) if blacklist else int(time.time()) logging.info(instance) if instance and instance.status == "running" and instance.port > 0: + if blacklist: + blacklist.delInstanceStartTime(instance.id) break - logging.info("Waiting for instance become online ...") + wait_time = int(time.time()) - wait_start_time + if blacklist and wait_time > Settings.blacklist_ban_after_seconds: + logging.info("Too long waiting, instance host will be banned for some time...") + blacklist.add(instance.hostId) + instance.destroy() + raise Exception(f"Host {instance.hostId} marked as banned due to long startup waiting") + logging.info(f"Waiting for instance become online ({wait_time} seconds) ...") time.sleep(10) -logging.info("Instance is ready to accept connections") +logging.info("Instance is ready to accept connections") # generate nginx config environment = Environment(loader=FileSystemLoader("./"), autoescape=True) nginx_template = environment.get_template("nginx.conf.j2") -nginx_conf = nginx_template.render({"ip": instance.ip, "port": instance.port, "listen_port": nginx_listen_port, "max_body_size": nginx_max_body_size}) +nginx_conf = nginx_template.render({"ip": instance.ip, "port": instance.port, "listen_port": Settings.nginx_listen_port, "max_body_size": Settings.nginx_max_body_size}) logging.info(f"Final nginx config:\n{nginx_conf}") -with open(nginx_config_path, "w") as nginx_conf_file: +with open(Settings.nginx_config_path, "w") as nginx_conf_file: nginx_conf_file.write(nginx_conf) logging.info("Nginx config saved") diff --git a/src/models/blacklist.py b/src/models/blacklist.py new file mode 100644 index 0000000..c8615b1 --- /dev/null +++ b/src/models/blacklist.py @@ -0,0 +1,108 @@ +import redis +import logging +import time + + +class Blacklist: + _redis_conn: redis.Redis + _ban_ttl: int + + def __init__(self, redis_url: str, ban_ttl: int = 3600, restarts_ttl: int = 3600): + """ + Initializes the Blacklist class. + + :param redis_url: URL of the Redis instance. + :param db: Redis database index to use. + :param ban_ttl: Time-to-live (TTL) for each ban record in seconds. + """ + self._ban_ttl = ban_ttl + self._restarts_record_ttl = restarts_ttl + self._redis_conn = redis.StrictRedis.from_url(url=redis_url) + self._logger = logging.getLogger("blacklist") + self._logger.info(f"initialized with ttl={self._ban_ttl}") + + _ban_key_prefix: str = "ban_" + + def _getBanKey(self, key: str) -> str: + return f"{self._ban_key_prefix}{key}" + + def isBanned(self, key: str) -> bool: + """ + Checks if the given key is in the blacklist. + + :param key: The string to check. + :return: True if the key is banned, False otherwise. + """ + return self._redis_conn.exists(self._getBanKey(key)) == 1 + + def add(self, key: str, reason: str = "slow_startup"): + """ + Adds the given key to the blacklist with the specified TTL. + + :param key: The string to ban. + """ + self._redis_conn.setex(self._getBanKey(key), self._ban_ttl, reason) + self._logger.info(f"added '{key}' ttl={self._ban_ttl}") + + def list(self) -> list[str]: + """ + Retrieves all currently banned keys (IDs) from the blacklist. + + :return: A list of currently banned keys. + """ + banned_keys = [] + cursor, keys = self._redis_conn.scan(match=f"{self._ban_key_prefix}*", count=100) + banned_keys.extend(keys) + while cursor != 0: + cursor, keys = self._redis_conn.scan(cursor=cursor, match=f"{self._ban_key_prefix}*", count=100) + banned_keys.extend(keys) + key_len = len(self._ban_key_prefix) + return [key.decode("utf-8")[key_len:] for key in banned_keys] + + _wait_time_key_prefix: str = "wait_" + _wait_time_record_ttl: int = 24 * 60 * 60 # 1day + + def getInstanceStartTime(self, instance_id: str) -> int: + """ + Returns the time difference (in seconds) between the current time and the stored timestamp for the given instance. + If no timestamp is found, it sets the current time as the start time. + + :param instance_id: Vast instance id. + :return: Time difference in seconds. + """ + key = f"{self._wait_time_key_prefix}{instance_id}" + start_time_str = self._redis_conn.get(key) + + if start_time_str is None: + start_time = int(time.time()) + self._redis_conn.setex(key, self._wait_time_record_ttl, str(start_time)) + else: + start_time = int(start_time_str) + + self._logger.info(f"got '{instance_id}' start time - {start_time}") + return start_time + + def delInstanceStartTime(self, instance_id: str): + """ + Deletes the stored wait time for the given instance. + + :param instance_id: Vast instance id. + """ + self._redis_conn.delete(f"{self._wait_time_key_prefix}{instance_id}") + self._logger.info(f"deleted '{instance_id}' wait time") + + _restarts_key_prefix: str = "restarts_" + _restarts_record_ttl: int = 60 * 60 # 1hour + + def getAndIncreaseInstanceRestarts(self, instance_id: str) -> int: + key = f"{self._restarts_key_prefix}{instance_id}" + restarts = self._redis_conn.get(key) + restarts = int(restarts) if restarts else 0 + restarts += 1 + self._redis_conn.setex(key, self._restarts_record_ttl, restarts) + self._logger.info(f"got '{instance_id}' restarts counter - {restarts}") + return restarts + + def cleanInstanceKeys(self, instance_id: str): + self._redis_conn.delete(f"{self._restarts_key_prefix}{instance_id}") + self._redis_conn.delete(f"{self._wait_time_key_prefix}{instance_id}") diff --git a/src/models/vast.py b/src/models/vast.py index a021817..8e7f4c5 100644 --- a/src/models/vast.py +++ b/src/models/vast.py @@ -30,11 +30,20 @@ def getInstanceByLabel(self, label: str) -> VastInstance | None: return VastInstance(instance, self) return None - def createInstance(self, template: VastTemplate, label: str, docker_login: str, search_query: dict, image: str): - offers = self.get("/api/v0/bundles/", params={"q": json.dumps({**search_query, **{"disk_space": {"gte": template.disk_space}, "allocated_storage": template.disk_space}})})["offers"] - offer_id = offers[0]["id"] - self.__logger.info(f"Best offer is {offer_id}, creating instance ...") - result = self.put(f"/api/v0/asks/{offer_id}/", {"client_id": "me", "image_login": docker_login, "image": image, "template_hash_id": template.id, "label": label}) + def createInstance(self, template: VastTemplate, label: str, docker_login: str, search_query: dict, image: str, blacklist_host_ids: list[str] = []): + self.__logger.info(f"Creating instance, blacklist_host_ids={blacklist_host_ids}") + best_offer = None + for offer in self.get("/api/v0/bundles/", params={"q": json.dumps({**search_query, **{"disk_space": {"gte": template.disk_space}, "allocated_storage": template.disk_space}})})["offers"]: + if str(offer["host_id"]) in blacklist_host_ids: + self.__logger.info(f"Offer {offer['id']} skipped, host {offer['host_id']} in blacklist") + continue + best_offer = offer + break + if not best_offer: + raise Exception("Offer not found") + self.__logger.info(f"Found offer is {best_offer['id']}, creating instance ...") + self.__logger.debug(f"Offer: {best_offer}") + result = self.put(f"/api/v0/asks/{best_offer['id']}/", {"client_id": "me", "image_login": docker_login, "image": image, "template_hash_id": template.id, "label": label}) if "success" not in result or not result["success"]: logging.error(result) raise Exception("Instance creation request is not succeed") diff --git a/src/models/vast_instance.py b/src/models/vast_instance.py index a547202..49a5775 100644 --- a/src/models/vast_instance.py +++ b/src/models/vast_instance.py @@ -39,6 +39,10 @@ def template_hash_id(self) -> str: def image(self) -> str: return str(self.__raw_template["image_uuid"]) + @property + def hostId(self) -> str: + return str(self.__raw_template["host_id"]) + def destroy(self): result = self.__vast.delete(f"/api/v0/instances/{self.id}/") if "success" not in result or not result["success"]: @@ -48,6 +52,6 @@ def destroy(self): def __str__(self): s = [f"Instance {self.id}"] - for t in ["image_uuid", "template_hash_id", "cpu_name", "cpu_cores", "actual_status", "status_msg", "public_ipaddr", "direct_port_start"]: + for t in ["image_uuid", "host_id", "template_hash_id", "cpu_name", "cpu_cores", "actual_status", "status_msg", "public_ipaddr", "direct_port_start"]: s.append("\t%s = %s" % (t, self.__raw_template[t])) return "\n".join(s) diff --git a/src/requirements.txt b/src/requirements.txt index c2377b3..b7d2688 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -8,3 +8,4 @@ setuptools==72.1.0 urllib3==2.2.2 vastai==0.2.5 wheel==0.44.0 +redis==5.0.8 diff --git a/src/settings.py b/src/settings.py new file mode 100644 index 0000000..a637cb4 --- /dev/null +++ b/src/settings.py @@ -0,0 +1,30 @@ +import os +import json + + +def getenv(envname: str, default: str | None = None): + v = os.getenv(envname) + if v: + return v + if not default: + raise Exception(f"Env var {envname} is not defined") + return default + + +class Settings: + pod_name = getenv("POD_NAME") + vast_api_key = getenv("VAST_API_KEY") + template_name = getenv("VAST_TEMPLATE_NAME") + template_image = getenv("VAST_TEMPLATE_IMAGE") + nginx_config_path = getenv("NGINX_CONFIG_PATH", "/etc/nginx/http.d/default.conf") + docker_login = getenv("DOCKER_LOGIN") + vast_search_query = json.loads(getenv("VAST_SEARCH_QUERY")) + nginx_listen_port = getenv("NGINX_LISTEN_PORT", "3000") + nginx_max_body_size = getenv("NGINX_MAX_BODY_SIZE", "10M") + log_debug = getenv("DEBUG", "false").lower() == "true" + blacklist_enabled = getenv("BLACKLIST_ENABLED", "false").lower() == "true" + blacklist_redis = getenv("BLACKLIST_REDIS", "redis://redis:6379/10") + blacklist_ban_ttl_seconds = int(getenv("BLACKLIST_BAN_TTL_SECONDS", str(24 * 60 * 60))) # 1 day + blacklist_ban_after_seconds = int(getenv("BLACKLIST_BAN_AFTER_SECONDS", str(20 * 60))) # 20 minutes + blacklist_restart_ttl_seconds = int(getenv("BLACKLIST_RESTART_TTL_SECONDS", str(60 * 60))) # 1 hour + blacklist_restart_threshold = int(getenv("BLACKLIST_RESTART_THRESHOLD", "5"))