From 4fa52696f8f7af30d1e8968db8ced60277c35e6d Mon Sep 17 00:00:00 2001 From: sean Date: Tue, 1 Oct 2024 18:08:31 +0100 Subject: [PATCH 1/7] create vm pool --- helpers/cudo_api.py | 72 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index aeb3cb5..3511590 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -1,6 +1,11 @@ import cudo_compute as cudo import os +from time import sleep import importlib.metadata +from concurrent.futures import ThreadPoolExecutor +from queue import Queue +import atexit +import threading home = os.path.expanduser("~") @@ -125,9 +130,74 @@ def user(): raise Exception(err) return cudo.UserApi(c) +class ExtendedVirtualMachinesApi(cudo.VirtualMachinesApi): + def __init__(self, api_client=None): + max_workers=2 #TODO make larger + self.executor = ThreadPoolExecutor(max_workers=max_workers) + self.task_queue = Queue() + self.max_workers = max_workers + self.shutdown_event = threading.Event() + self.workers_active = False + + # Register the shutdown method to be called at exit + atexit.register(self.shutdown) + + super().__init__(api_client) + # Additional initialization if needed + + def start_workers(self): + if not self.workers_active: + self.workers_active = True + self.shutdown_event.clear() + for _ in range(self.max_workers): + self.executor.submit(self.worker) + print("Workers started.") #TODO + + def stop_workers(self): + if self.workers_active: + self.workers_active = False + for _ in range(self.max_workers): + self.task_queue.put(None) + print("Workers stopped.") #TODO + + def worker(self): + while True: + req = self.task_queue.get() + if req is None: + break + project, create_vm_body, kwargs = req + super().create_vm(project, create_vm_body) + self.task_queue.task_done() + + # Check if the task queue is empty and call shutdown if it is + if self.task_queue.empty(): + self.shutdown() + + def create_vm(self, project_id, create_vm_body, **kwargs): + print("Adding VM...") #TODO + self.task_queue.put((project_id, create_vm_body)) + self.start_workers() + + # Custom implementation or additional logic + print("Creating VM with custom logic") + # return + + def shutdown(self): + if not self.shutdown_event.is_set(): + self.shutdown_event.set() + # Wait for all tasks to be processed + self.task_queue.join() + print("All VMs added. Waiting for all tasks to complete...") + + # Stop worker threads + self.stop_workers() + + self.executor.shutdown(wait=True) + print("Executor shutdown complete.") + def virtual_machines(): c, err = client() if err: raise Exception(err) - return cudo.VirtualMachinesApi(c) + return ExtendedVirtualMachinesApi(c) From b9ccb776256bd6e7699d1a1acba68399a06e4873 Mon Sep 17 00:00:00 2001 From: sean Date: Thu, 3 Oct 2024 12:05:51 +0100 Subject: [PATCH 2/7] create vm pool --- helpers/cudo_api.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index 3511590..3f7b1a8 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -130,7 +130,7 @@ def user(): raise Exception(err) return cudo.UserApi(c) -class ExtendedVirtualMachinesApi(cudo.VirtualMachinesApi): +class PooledVirtualMachinesApi(cudo.VirtualMachinesApi): def __init__(self, api_client=None): max_workers=2 #TODO make larger self.executor = ThreadPoolExecutor(max_workers=max_workers) @@ -139,65 +139,65 @@ def __init__(self, api_client=None): self.shutdown_event = threading.Event() self.workers_active = False - # Register the shutdown method to be called at exit atexit.register(self.shutdown) - super().__init__(api_client) - # Additional initialization if needed def start_workers(self): + if not self.workers_active: self.workers_active = True self.shutdown_event.clear() for _ in range(self.max_workers): self.executor.submit(self.worker) - print("Workers started.") #TODO def stop_workers(self): if self.workers_active: + print("stopping workers") #TODO self.workers_active = False for _ in range(self.max_workers): self.task_queue.put(None) - print("Workers stopped.") #TODO + print("workers stopped") #TODO def worker(self): while True: + print("worker getting tasks") req = self.task_queue.get() if req is None: break - project, create_vm_body, kwargs = req - super().create_vm(project, create_vm_body) + print("worker processing task") + try: + project, create_vm_body = req + vm = super().create_vm(project, create_vm_body) + print(f"Created VM: {vm.to_dict()}") + except Exception as e: + print(f"Error creating VM: {create_vm_body.vm_id} {e}") + self.task_queue.task_done() - # Check if the task queue is empty and call shutdown if it is if self.task_queue.empty(): self.shutdown() def create_vm(self, project_id, create_vm_body, **kwargs): - print("Adding VM...") #TODO + print("create vm") #TODO self.task_queue.put((project_id, create_vm_body)) self.start_workers() - # Custom implementation or additional logic - print("Creating VM with custom logic") - # return + return {"id":create_vm_body.vm_id} def shutdown(self): if not self.shutdown_event.is_set(): + print("shutting down...") # TODO + self.shutdown_event.set() - # Wait for all tasks to be processed self.task_queue.join() - print("All VMs added. Waiting for all tasks to complete...") - - # Stop worker threads self.stop_workers() self.executor.shutdown(wait=True) - print("Executor shutdown complete.") + print("shutdown") # TODO def virtual_machines(): c, err = client() if err: raise Exception(err) - return ExtendedVirtualMachinesApi(c) + return PooledVirtualMachinesApi(c) #cudo.VirtualMachinesApi(c) From 716d87b15233f453b41deb27b3e332ae8c0ffabb Mon Sep 17 00:00:00 2001 From: sean Date: Fri, 4 Oct 2024 11:40:57 +0100 Subject: [PATCH 3/7] pre rewrite --- helpers/cudo_api.py | 87 +++++++++++++++++++++++++-------------------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index 3f7b1a8..cf21069 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -130,40 +130,35 @@ def user(): raise Exception(err) return cudo.UserApi(c) + class PooledVirtualMachinesApi(cudo.VirtualMachinesApi): def __init__(self, api_client=None): - max_workers=2 #TODO make larger - self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.task_queue = Queue() + max_workers = 2 # TODO make larger + # self.executor = ThreadPoolExecutor(max_workers=max_workers) + self.task_queue = None self.max_workers = max_workers self.shutdown_event = threading.Event() self.workers_active = False - - atexit.register(self.shutdown) + self.executor = None + atexit.register(self.stop_workers) super().__init__(api_client) - def start_workers(self): - - if not self.workers_active: - self.workers_active = True - self.shutdown_event.clear() - for _ in range(self.max_workers): - self.executor.submit(self.worker) + def create_vm(self, project_id, create_vm_body, **kwargs): + print("create vm") # TODO + if not self.task_queue: + print("new task queue") + self.task_queue = Queue() + self.task_queue.put((project_id, create_vm_body)) + self.start_workers() - def stop_workers(self): - if self.workers_active: - print("stopping workers") #TODO - self.workers_active = False - for _ in range(self.max_workers): - self.task_queue.put(None) - print("workers stopped") #TODO + return {"id": create_vm_body.vm_id} def worker(self): - while True: + while self.workers_active: print("worker getting tasks") - req = self.task_queue.get() - if req is None: + if not self.task_queue: break + req = self.task_queue.get(1) print("worker processing task") try: project, create_vm_body = req @@ -176,28 +171,42 @@ def worker(self): if self.task_queue.empty(): self.shutdown() + # def start_queue(self): - def create_vm(self, project_id, create_vm_body, **kwargs): - print("create vm") #TODO - self.task_queue.put((project_id, create_vm_body)) - self.start_workers() - - return {"id":create_vm_body.vm_id} + def start_workers(self): + if not self.workers_active: + self.shutdown_event.clear() + print("new threadpoolexecutor") + self.executor = ThreadPoolExecutor(max_workers=self.max_workers) + self.workers_active = True + for _ in range(self.max_workers): + self.executor.submit(self.worker) - def shutdown(self): + def stop_workers(self): if not self.shutdown_event.is_set(): - print("shutting down...") # TODO + print("shutting down...") # TODO + try: + self.shutdown_event.set() + # self.task_queue.join() + self.workers_active = False + self.task_queue = None + + print("joining executor") # TODO + if self.executor: + self.executor.shutdown(wait=False) + self.executor = None + print("shutdown") # TODO + except Exception as e: + print(f"Error shutting down: {e}") + else: + print("Already shutting down") # TODO - self.shutdown_event.set() - self.task_queue.join() - self.stop_workers() - self.executor.shutdown(wait=True) - print("shutdown") # TODO +c, err = client() +if err: + raise Exception(err) +pool = PooledVirtualMachinesApi(c) def virtual_machines(): - c, err = client() - if err: - raise Exception(err) - return PooledVirtualMachinesApi(c) #cudo.VirtualMachinesApi(c) + return pool From fe9904d9da2dba9b3fb0a80dd0b9d388dd7376ff Mon Sep 17 00:00:00 2001 From: sean Date: Fri, 4 Oct 2024 12:01:26 +0100 Subject: [PATCH 4/7] possibly better --- helpers/cudo_api.py | 42 +++++++++++++++--------------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index cf21069..231987d 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -133,10 +133,8 @@ def user(): class PooledVirtualMachinesApi(cudo.VirtualMachinesApi): def __init__(self, api_client=None): - max_workers = 2 # TODO make larger - # self.executor = ThreadPoolExecutor(max_workers=max_workers) self.task_queue = None - self.max_workers = max_workers + self.max_workers = 2 self.shutdown_event = threading.Event() self.workers_active = False self.executor = None @@ -144,62 +142,52 @@ def __init__(self, api_client=None): super().__init__(api_client) def create_vm(self, project_id, create_vm_body, **kwargs): - print("create vm") # TODO - if not self.task_queue: - print("new task queue") - self.task_queue = Queue() + self.start_queue() self.task_queue.put((project_id, create_vm_body)) self.start_workers() - return {"id": create_vm_body.vm_id} def worker(self): while self.workers_active: - print("worker getting tasks") if not self.task_queue: break - req = self.task_queue.get(1) - print("worker processing task") + req = self.task_queue.get() # block true, timeout none + create_vm_body = None try: project, create_vm_body = req vm = super().create_vm(project, create_vm_body) print(f"Created VM: {vm.to_dict()}") except Exception as e: - print(f"Error creating VM: {create_vm_body.vm_id} {e}") + if create_vm_body: + print(f"Error creating VM: {create_vm_body.vm_id} {e}") + else: + print(f"Error creating VM: {e}") self.task_queue.task_done() - if self.task_queue.empty(): - self.shutdown() - # def start_queue(self): + def start_queue(self): + if not self.task_queue: + self.task_queue = Queue() def start_workers(self): if not self.workers_active: - self.shutdown_event.clear() - print("new threadpoolexecutor") - self.executor = ThreadPoolExecutor(max_workers=self.max_workers) self.workers_active = True + self.executor = ThreadPoolExecutor(max_workers=self.max_workers) + for _ in range(self.max_workers): self.executor.submit(self.worker) def stop_workers(self): if not self.shutdown_event.is_set(): - print("shutting down...") # TODO try: - self.shutdown_event.set() - # self.task_queue.join() self.workers_active = False - self.task_queue = None + self.shutdown_event.set() - print("joining executor") # TODO if self.executor: self.executor.shutdown(wait=False) - self.executor = None - print("shutdown") # TODO + except Exception as e: print(f"Error shutting down: {e}") - else: - print("Already shutting down") # TODO c, err = client() From 91f6219d302a9a04d0ad70e7745d1d8ee04edffb Mon Sep 17 00:00:00 2001 From: sean Date: Fri, 4 Oct 2024 12:55:34 +0100 Subject: [PATCH 5/7] possibly better --- helpers/cudo_api.py | 62 +++++++++++++++++---------------------------- 1 file changed, 23 insertions(+), 39 deletions(-) diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index 231987d..6e7f59d 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -1,3 +1,5 @@ +from asyncio import timeout + import cudo_compute as cudo import os from time import sleep @@ -7,6 +9,9 @@ import atexit import threading +from cudo_compute.models.create_vm_response import CreateVMResponse +from cudo_compute.models.vm import VM + home = os.path.expanduser("~") @@ -68,45 +73,31 @@ def project_id_throwable(): # APIs +c, err = client() +if err: + raise Exception(err) + def api_keys(): - c, err = client() - if err: - raise Exception(err) return cudo.APIKeysApi(c) def disks(): - c, err = client() - if err: - raise Exception(err) return cudo.DisksApi(c) def networks(): - c, err = client() - if err: - raise Exception(err) return cudo.NetworksApi(c) def object_storage(): - c, err = client() - if err: - raise Exception(err) return cudo.ObjectStorageApi(c) def permissions(): - c, err = client() - if err: - raise Exception(err) return cudo.PermissionsApi(c) def projects(): - c, err = client() - if err: - raise Exception(err) return cudo.ProjectsApi(c) @@ -118,16 +109,10 @@ def ssh_keys(): def search(): - c, err = client() - if err: - raise Exception(err) return cudo.SearchApi(c) def user(): - c, err = client() - if err: - raise Exception(err) return cudo.UserApi(c) @@ -145,13 +130,14 @@ def create_vm(self, project_id, create_vm_body, **kwargs): self.start_queue() self.task_queue.put((project_id, create_vm_body)) self.start_workers() - return {"id": create_vm_body.vm_id} + return CreateVMResponse(id=create_vm_body.vm_id, vm=VM()) def worker(self): while self.workers_active: if not self.task_queue: break - req = self.task_queue.get() # block true, timeout none + req = self.task_queue.get(timeout=1) + print(req) create_vm_body = None try: project, create_vm_body = req @@ -164,6 +150,7 @@ def worker(self): print(f"Error creating VM: {e}") self.task_queue.task_done() + print("Task done") def start_queue(self): if not self.task_queue: @@ -178,23 +165,20 @@ def start_workers(self): self.executor.submit(self.worker) def stop_workers(self): - if not self.shutdown_event.is_set(): - try: - self.workers_active = False - self.shutdown_event.set() + print("stop workers") + # if not self.shutdown_event.is_set(): + # try: + # self.workers_active = False + # self.shutdown_event.set() + # + # self.executor.shutdown(wait=False) + # + # except Exception as e: + # print(f"Error shutting down: {e}") - if self.executor: - self.executor.shutdown(wait=False) - - except Exception as e: - print(f"Error shutting down: {e}") -c, err = client() -if err: - raise Exception(err) pool = PooledVirtualMachinesApi(c) - def virtual_machines(): return pool From 3506e26757d75353cb3ae39dada362039ea173a8 Mon Sep 17 00:00:00 2001 From: sean Date: Fri, 4 Oct 2024 12:56:02 +0100 Subject: [PATCH 6/7] possibly better --- helpers/cudo_api.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index 6e7f59d..9572d56 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -150,7 +150,6 @@ def worker(self): print(f"Error creating VM: {e}") self.task_queue.task_done() - print("Task done") def start_queue(self): if not self.task_queue: @@ -165,16 +164,15 @@ def start_workers(self): self.executor.submit(self.worker) def stop_workers(self): - print("stop workers") - # if not self.shutdown_event.is_set(): - # try: - # self.workers_active = False - # self.shutdown_event.set() - # - # self.executor.shutdown(wait=False) - # - # except Exception as e: - # print(f"Error shutting down: {e}") + if not self.shutdown_event.is_set(): + try: + self.workers_active = False + self.shutdown_event.set() + + self.executor.shutdown(wait=False) + + except Exception as e: + print(f"Error shutting down: {e}") From 82f821a46d257f58e5ffe9bff7eb535918305f3f Mon Sep 17 00:00:00 2001 From: sean Date: Mon, 14 Oct 2024 10:51:45 +0100 Subject: [PATCH 7/7] pool ok --- dev/create-vm-pool.py | 40 ++++++++++++++++++++++++++++++++++++++++ helpers/cudo_api.py | 18 ++++++++++++++---- pyproject.toml | 2 +- 3 files changed, 55 insertions(+), 5 deletions(-) create mode 100644 dev/create-vm-pool.py diff --git a/dev/create-vm-pool.py b/dev/create-vm-pool.py new file mode 100644 index 0000000..f550140 --- /dev/null +++ b/dev/create-vm-pool.py @@ -0,0 +1,40 @@ +import sys +import os +from time import sleep + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'helpers'))) + +import cudo_api + +from cudo_compute import Disk, CreateVMBody +from cudo_compute.rest import ApiException + + +vm_name = 'launch-lot' +id = 1 +ids = [] +api = cudo_api.virtual_machines() +try: + for i in range(12): + vm_id = vm_name + str(id) + disk = Disk(storage_class="STORAGE_CLASS_NETWORK", size_gib=100, + id="my-disk-id-" + vm_id) + + request = CreateVMBody(vm_id=vm_id, machine_type="intel-broadwell", + data_center_id="gb-bournemouth-1", boot_disk_image_id='ubuntu-nvidia-docker', + memory_gib=4, vcpus=2, gpus=0, gpu_model="", boot_disk=disk) + + vm = api.create_vm(cudo_api.project_id_throwable(), request) + print(vm) + ids.append(vm_id) + id += 1 + +except ApiException as e: + print(e) + +sleep(80) +for del_id in ids: + res = api.terminate_vm(cudo_api.project_id_throwable(), del_id) + print(res) + +print("done") diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index 9572d56..c11b51c 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -77,6 +77,7 @@ def project_id_throwable(): if err: raise Exception(err) + def api_keys(): return cudo.APIKeysApi(c) @@ -116,10 +117,14 @@ def user(): return cudo.UserApi(c) +def legacy_virtual_machines(): + return cudo.VirtualMachinesApi(c) + + class PooledVirtualMachinesApi(cudo.VirtualMachinesApi): def __init__(self, api_client=None): self.task_queue = None - self.max_workers = 2 + self.max_workers = 5 self.shutdown_event = threading.Event() self.workers_active = False self.executor = None @@ -137,12 +142,19 @@ def worker(self): if not self.task_queue: break req = self.task_queue.get(timeout=1) - print(req) create_vm_body = None try: project, create_vm_body = req vm = super().create_vm(project, create_vm_body) print(f"Created VM: {vm.to_dict()}") + wait = True + while wait: + res = self.get_vm(project, create_vm_body.vm_id) + if (res.vm.state == 'ACTIVE' or res.vm.state == 'FAILED' or res.vm.state == 'STOPPED' + or res.vm.state == 'SUSPENDED' or res.vm.state == 'DELETED'): + wait = False + else: + sleep(5) except Exception as e: if create_vm_body: print(f"Error creating VM: {create_vm_body.vm_id} {e}") @@ -174,8 +186,6 @@ def stop_workers(self): except Exception as e: print(f"Error shutting down: {e}") - - pool = PooledVirtualMachinesApi(c) def virtual_machines(): diff --git a/pyproject.toml b/pyproject.toml index ff69b37..a188310 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "cudo-compute" -version = "0.2.0" +version = "0.3.0" authors = [ { name = "Cudo Ventures", email = "dev@cudoventures.com" }, ]