Skip to content

Commit

Permalink
[wip] add blacklist logic (#3)
Browse files Browse the repository at this point in the history
* add blacklist logic

* update readme

* delete exited nodes

* add ban logic for crashloopbackoff

* add ban logic for crashloopbackoff

* fix BLACKLIST_RESTART_TTL_SECONDS
  • Loading branch information
IMMORTALxJO authored Oct 28, 2024
1 parent cc1e3b7 commit 9ce9d9f
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 57 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ VAST_TEMPLATE_NAME=xxx
VAST_TEMPLATE_IMAGE=xxx
VAST_API_KEY=xxx
DOCKER_LOGIN=xxx
BLACKLIST_ENABLED=false
BLACKLIST_REDIS_URL=redis://redis:6379/10
BLACKLIST_BAN_AFTER_SECONDS=1200
BLACKLIST_BAN_TTL_SECONDS=86400
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ repos:
rev: 'v1.8.0'
hooks:
- id: mypy
additional_dependencies: [types-retry==0.9.9.4, types-requests==2.31.0.20240403]
additional_dependencies: [types-retry==0.9.9.4, types-requests==2.31.0.20240403, types-redis==4.6.0.20240903]
34 changes: 21 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,31 @@ The application creates instances from custom templates and recreates instances
- Create the instance if it doesn't exist.
- Delete the instance if the image is not the same as the one provided in `TEMPLATE_IMAGE`, or if the template hash ID is not the same as found in step #1.
- Do nothing if instance is up-to-date
3. Wait for the instance to become online and for the external port to be provided.
3. Wait for the instance to become online and for the external port to be provided. *
4. Generate an Nginx configuration with the `proxy_pass` configuration.
5. Run the Nginx process.

> A long-awaited instance host could be marked as blacklisted if the `BLACKLIST_ENABLED=true` environment variable is set.
This approach allows you to create a StatefulSet with static `POD_NAME`s and add a long startup probe plus a liveness probe to cover the case when the instance becomes unavailable. The pod becomes ready when the vast.ai instance is ready, and it can be used in k8s services.

You can configure the application via environment variables:

| ENV VAR | REQUIRED | DEFAULT | DESCRIPTION |
|--------------------|----------|---------|--------------------------------------------------------------------------|
| POD_NAME | TRUE | - | Unique instance identifier |
| VAST_API_KEY | TRUE | - | Vast.ai API key |
| VAST_TEMPLATE_NAME | TRUE | - | Private template name (will be created in your account) |
| VAST_TEMPLATE_IMAGE| TRUE | - | Target instance image |
| DOCKER_LOGIN | TRUE | - | Docker login command in the format `-u <user> -p <password> <host>` |
| VAST_SEARCH_QUERY | TRUE | - | Vast [search new instance query](https://vast.ai/docs/search/search-gpus)|
| NGINX_CONFIG_PATH | FALSE | /etc/nginx/http.d/default.conf | Path to store the final Nginx config |
| NGINX_LISTEN_PORT | FALSE | 3000 | Nginx listen port |
| NGINX_MAX_BODY_SIZE| FALSE | 10M | Nginx `max_body_size` parameter |
| DEBUG | FALSE | false | Enable debug logs |
| ENV VAR | REQUIRED | DEFAULT | DESCRIPTION |
|-----------------------------|----------|--------------------------------|--------------------------------------------------------------------------|
| POD_NAME | TRUE | - | Unique instance identifier |
| VAST_API_KEY | TRUE | - | Vast.ai API key |
| VAST_TEMPLATE_NAME | TRUE | - | Private template name (will be created in your account) |
| VAST_TEMPLATE_IMAGE | TRUE | - | Target instance image |
| DOCKER_LOGIN | TRUE | - | Docker login command in the format `-u <user> -p <password> <host>` |
| VAST_SEARCH_QUERY | TRUE | - | Vast [search new instance query](https://vast.ai/docs/search/search-gpus)|
| NGINX_CONFIG_PATH | FALSE | /etc/nginx/http.d/default.conf | Path to store the final Nginx config |
| NGINX_LISTEN_PORT | FALSE | 3000 | Nginx listen port |
| NGINX_MAX_BODY_SIZE | FALSE | 10M | Nginx `max_body_size` parameter |
| BLACKLIST_ENABLED | FALSE | false | Enable blacklist logic |
| BLACKLIST_REDIS_URL | FALSE | redis://redis:6379/10 | Redis to be used as blacklist storage |
| BLACKLIST_BAN_AFTER_SECONDS | FALSE | 1200 (20 minutes) | Waiting duration before instance marked as banned |
| BLACKLIST_BAN_TTL_SECONDS | FALSE | 604800 (7 days) | Host ban duration |
| BLACKLIST_RESTART_TTL_SECONDS | FALSE | 3600 (1 hour) | Flush pod restarts counter if no restarts for that time duration |
| BLACKLIST_RESTART_THRESHOLD | FALSE | 5 | Ban host if pod has this many restarts restarts within BLACKLIST_RESTART_TTL_SECONDS |
| DEBUG | FALSE | false | Enable debug logs |
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
services:
proxy:
container_name: proxy
build:
context: src
env_file: .env
ports:
- 3000:3000
redis:
image: redis:7.4.0-alpine3.20
container_name: redis
environment:
- REDIS_PASSWORD=testpassword
- REDIS_USER=testuser
- REDIS_USER_PASSWORD=testuserpassword
ports:
- 6379:6379
91 changes: 54 additions & 37 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,95 @@
import os
import time
import logging
import json
from jinja2 import Environment, FileSystemLoader
from models.vast import VastController
from models.blacklist import Blacklist
from settings import Settings
from typing import Optional

logging.basicConfig(level=logging.DEBUG if Settings.log_debug else logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S")
logging.info({"template_name": Settings.template_name, "template_image": Settings.template_image, "pod_name": Settings.pod_name})

def getenv(envname: str, default: str | None = None):
v = os.getenv(envname)
if v:
return v
if not default:
raise Exception(f"Env var {envname} is not defined")
return default


pod_name = getenv("POD_NAME")
vast_api_key = getenv("VAST_API_KEY")
template_name = getenv("VAST_TEMPLATE_NAME")
template_image = getenv("VAST_TEMPLATE_IMAGE")
nginx_config_path = getenv("NGINX_CONFIG_PATH", "/etc/nginx/http.d/default.conf")
docker_login = getenv("DOCKER_LOGIN")
vast_search_query = json.loads(getenv("VAST_SEARCH_QUERY"))
nginx_listen_port = getenv("NGINX_LISTEN_PORT", "3000")
nginx_max_body_size = getenv("NGINX_MAX_BODY_SIZE", "10M")
log_debug = getenv("DEBUG", "false")

logging.basicConfig(level=logging.DEBUG if log_debug.lower() == "true" else logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S")
logging.info({"template_name": template_name, "template_image": template_image, "pod_name": pod_name})

vast = VastController(api_key=vast_api_key)
vast = VastController(api_key=Settings.vast_api_key)
blacklist: Optional[Blacklist] = None
if Settings.blacklist_enabled:
blacklist = Blacklist(redis_url=Settings.blacklist_redis, ban_ttl=Settings.blacklist_ban_ttl_seconds)

# get template by name
logging.info(f"Searching template with name '{template_name}' ...")
template = vast.getTemplateByName(template_name)
logging.info(f"Searching template with name '{Settings.template_name}' ...")
template = vast.getTemplateByName(Settings.template_name)
if not template:
raise Exception(f"Template with name '{template_name}' not found")
raise Exception(f"Template with name '{Settings.template_name}' not found")
logging.info(template)

# check if instance already exists
instance_label = f"k8s_pod={pod_name}"
instance_label = f"k8s_pod={Settings.pod_name}"
logging.info(f"Searching instance with label '{instance_label}' ...")
instance = vast.getInstanceByLabel(instance_label)
if instance:
logging.info(instance)
if instance.image != template_image or instance.template_hash_id != template.id or instance.status == "offline":
logging.info("Destroying instance...")
destroy_reason = ""
if instance.image != Settings.template_image:
destroy_reason = "instance has wrong image"
elif instance.template_hash_id != template.id:
destroy_reason = "instance has wrong template"
elif instance.status in ["offline", "exited"]:
destroy_reason = f"instance is {instance.status}"
elif blacklist and blacklist.isBanned(instance.hostId):
destroy_reason = "instance host is banned"
elif blacklist and blacklist.getAndIncreaseInstanceRestarts(instance.id) > Settings.blacklist_restart_threshold:
logging.info("Too many pod restarts, instance host will be banned for some time...")
blacklist.add(instance.hostId, reason="restarts")
destroy_reason = "too many pod restarts"
if destroy_reason:
logging.info(f"Destroying instance, {destroy_reason}...")
instance.destroy()
if blacklist:
blacklist.cleanInstanceKeys(instance.id)
instance = None
else:
logging.info("Instance image and template are up-to-date")

# create instance if not exists or not up-to-date
# TODO: find a way to get docker_login from the template
if not instance:
vast.createInstance(template=template, label=instance_label, docker_login=docker_login, search_query=vast_search_query, image=template_image)
vast.createInstance(
template=template,
label=instance_label,
docker_login=Settings.docker_login,
search_query=Settings.vast_search_query,
image=Settings.template_image,
blacklist_host_ids=blacklist.list() if blacklist else [],
)

# wait for instance become online
wait_start_time = 0
while True:
instance = vast.getInstanceByLabel(instance_label)
if not instance:
raise Exception("Instance not found")
if not wait_start_time:
wait_start_time = blacklist.getInstanceStartTime(instance.id) if blacklist else int(time.time())
logging.info(instance)
if instance and instance.status == "running" and instance.port > 0:
if blacklist:
blacklist.delInstanceStartTime(instance.id)
break
logging.info("Waiting for instance become online ...")
wait_time = int(time.time()) - wait_start_time
if blacklist and wait_time > Settings.blacklist_ban_after_seconds:
logging.info("Too long waiting, instance host will be banned for some time...")
blacklist.add(instance.hostId)
instance.destroy()
raise Exception(f"Host {instance.hostId} marked as banned due to long startup waiting")
logging.info(f"Waiting for instance become online ({wait_time} seconds) ...")
time.sleep(10)
logging.info("Instance is ready to accept connections")

logging.info("Instance is ready to accept connections")

# generate nginx config
environment = Environment(loader=FileSystemLoader("./"), autoescape=True)
nginx_template = environment.get_template("nginx.conf.j2")
nginx_conf = nginx_template.render({"ip": instance.ip, "port": instance.port, "listen_port": nginx_listen_port, "max_body_size": nginx_max_body_size})
nginx_conf = nginx_template.render({"ip": instance.ip, "port": instance.port, "listen_port": Settings.nginx_listen_port, "max_body_size": Settings.nginx_max_body_size})
logging.info(f"Final nginx config:\n{nginx_conf}")
with open(nginx_config_path, "w") as nginx_conf_file:
with open(Settings.nginx_config_path, "w") as nginx_conf_file:
nginx_conf_file.write(nginx_conf)
logging.info("Nginx config saved")
108 changes: 108 additions & 0 deletions src/models/blacklist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import redis
import logging
import time


class Blacklist:
_redis_conn: redis.Redis
_ban_ttl: int

def __init__(self, redis_url: str, ban_ttl: int = 3600, restarts_ttl: int = 3600):
"""
Initializes the Blacklist class.
:param redis_url: URL of the Redis instance.
:param db: Redis database index to use.
:param ban_ttl: Time-to-live (TTL) for each ban record in seconds.
"""
self._ban_ttl = ban_ttl
self._restarts_record_ttl = restarts_ttl
self._redis_conn = redis.StrictRedis.from_url(url=redis_url)
self._logger = logging.getLogger("blacklist")
self._logger.info(f"initialized with ttl={self._ban_ttl}")

_ban_key_prefix: str = "ban_"

def _getBanKey(self, key: str) -> str:
return f"{self._ban_key_prefix}{key}"

def isBanned(self, key: str) -> bool:
"""
Checks if the given key is in the blacklist.
:param key: The string to check.
:return: True if the key is banned, False otherwise.
"""
return self._redis_conn.exists(self._getBanKey(key)) == 1

def add(self, key: str, reason: str = "slow_startup"):
"""
Adds the given key to the blacklist with the specified TTL.
:param key: The string to ban.
"""
self._redis_conn.setex(self._getBanKey(key), self._ban_ttl, reason)
self._logger.info(f"added '{key}' ttl={self._ban_ttl}")

def list(self) -> list[str]:
"""
Retrieves all currently banned keys (IDs) from the blacklist.
:return: A list of currently banned keys.
"""
banned_keys = []
cursor, keys = self._redis_conn.scan(match=f"{self._ban_key_prefix}*", count=100)
banned_keys.extend(keys)
while cursor != 0:
cursor, keys = self._redis_conn.scan(cursor=cursor, match=f"{self._ban_key_prefix}*", count=100)
banned_keys.extend(keys)
key_len = len(self._ban_key_prefix)
return [key.decode("utf-8")[key_len:] for key in banned_keys]

_wait_time_key_prefix: str = "wait_"
_wait_time_record_ttl: int = 24 * 60 * 60 # 1day

def getInstanceStartTime(self, instance_id: str) -> int:
"""
Returns the time difference (in seconds) between the current time and the stored timestamp for the given instance.
If no timestamp is found, it sets the current time as the start time.
:param instance_id: Vast instance id.
:return: Time difference in seconds.
"""
key = f"{self._wait_time_key_prefix}{instance_id}"
start_time_str = self._redis_conn.get(key)

if start_time_str is None:
start_time = int(time.time())
self._redis_conn.setex(key, self._wait_time_record_ttl, str(start_time))
else:
start_time = int(start_time_str)

self._logger.info(f"got '{instance_id}' start time - {start_time}")
return start_time

def delInstanceStartTime(self, instance_id: str):
"""
Deletes the stored wait time for the given instance.
:param instance_id: Vast instance id.
"""
self._redis_conn.delete(f"{self._wait_time_key_prefix}{instance_id}")
self._logger.info(f"deleted '{instance_id}' wait time")

_restarts_key_prefix: str = "restarts_"
_restarts_record_ttl: int = 60 * 60 # 1hour

def getAndIncreaseInstanceRestarts(self, instance_id: str) -> int:
key = f"{self._restarts_key_prefix}{instance_id}"
restarts = self._redis_conn.get(key)
restarts = int(restarts) if restarts else 0
restarts += 1
self._redis_conn.setex(key, self._restarts_record_ttl, restarts)
self._logger.info(f"got '{instance_id}' restarts counter - {restarts}")
return restarts

def cleanInstanceKeys(self, instance_id: str):
self._redis_conn.delete(f"{self._restarts_key_prefix}{instance_id}")
self._redis_conn.delete(f"{self._wait_time_key_prefix}{instance_id}")
19 changes: 14 additions & 5 deletions src/models/vast.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,20 @@ def getInstanceByLabel(self, label: str) -> VastInstance | None:
return VastInstance(instance, self)
return None

def createInstance(self, template: VastTemplate, label: str, docker_login: str, search_query: dict, image: str):
offers = self.get("/api/v0/bundles/", params={"q": json.dumps({**search_query, **{"disk_space": {"gte": template.disk_space}, "allocated_storage": template.disk_space}})})["offers"]
offer_id = offers[0]["id"]
self.__logger.info(f"Best offer is {offer_id}, creating instance ...")
result = self.put(f"/api/v0/asks/{offer_id}/", {"client_id": "me", "image_login": docker_login, "image": image, "template_hash_id": template.id, "label": label})
def createInstance(self, template: VastTemplate, label: str, docker_login: str, search_query: dict, image: str, blacklist_host_ids: list[str] = []):
self.__logger.info(f"Creating instance, blacklist_host_ids={blacklist_host_ids}")
best_offer = None
for offer in self.get("/api/v0/bundles/", params={"q": json.dumps({**search_query, **{"disk_space": {"gte": template.disk_space}, "allocated_storage": template.disk_space}})})["offers"]:
if str(offer["host_id"]) in blacklist_host_ids:
self.__logger.info(f"Offer {offer['id']} skipped, host {offer['host_id']} in blacklist")
continue
best_offer = offer
break
if not best_offer:
raise Exception("Offer not found")
self.__logger.info(f"Found offer is {best_offer['id']}, creating instance ...")
self.__logger.debug(f"Offer: {best_offer}")
result = self.put(f"/api/v0/asks/{best_offer['id']}/", {"client_id": "me", "image_login": docker_login, "image": image, "template_hash_id": template.id, "label": label})
if "success" not in result or not result["success"]:
logging.error(result)
raise Exception("Instance creation request is not succeed")
Expand Down
6 changes: 5 additions & 1 deletion src/models/vast_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ def template_hash_id(self) -> str:
def image(self) -> str:
return str(self.__raw_template["image_uuid"])

@property
def hostId(self) -> str:
return str(self.__raw_template["host_id"])

def destroy(self):
result = self.__vast.delete(f"/api/v0/instances/{self.id}/")
if "success" not in result or not result["success"]:
Expand All @@ -48,6 +52,6 @@ def destroy(self):

def __str__(self):
s = [f"Instance {self.id}"]
for t in ["image_uuid", "template_hash_id", "cpu_name", "cpu_cores", "actual_status", "status_msg", "public_ipaddr", "direct_port_start"]:
for t in ["image_uuid", "host_id", "template_hash_id", "cpu_name", "cpu_cores", "actual_status", "status_msg", "public_ipaddr", "direct_port_start"]:
s.append("\t%s = %s" % (t, self.__raw_template[t]))
return "\n".join(s)
1 change: 1 addition & 0 deletions src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ setuptools==72.1.0
urllib3==2.2.2
vastai==0.2.5
wheel==0.44.0
redis==5.0.8
Loading

0 comments on commit 9ce9d9f

Please sign in to comment.