Skip to content

Commit

Permalink
create vm pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sean committed Oct 1, 2024
1 parent 71f5621 commit 4fa5269
Showing 1 changed file with 71 additions and 1 deletion.
72 changes: 71 additions & 1 deletion helpers/cudo_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import cudo_compute as cudo
import os
from time import sleep
import importlib.metadata
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import atexit
import threading

home = os.path.expanduser("~")

Expand Down Expand Up @@ -125,9 +130,74 @@ def user():
raise Exception(err)
return cudo.UserApi(c)

class ExtendedVirtualMachinesApi(cudo.VirtualMachinesApi):
def __init__(self, api_client=None):
max_workers=2 #TODO make larger
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.task_queue = Queue()
self.max_workers = max_workers
self.shutdown_event = threading.Event()
self.workers_active = False

# Register the shutdown method to be called at exit
atexit.register(self.shutdown)

super().__init__(api_client)
# Additional initialization if needed

def start_workers(self):
if not self.workers_active:
self.workers_active = True
self.shutdown_event.clear()
for _ in range(self.max_workers):
self.executor.submit(self.worker)
print("Workers started.") #TODO

def stop_workers(self):
if self.workers_active:
self.workers_active = False
for _ in range(self.max_workers):
self.task_queue.put(None)
print("Workers stopped.") #TODO

def worker(self):
while True:
req = self.task_queue.get()
if req is None:
break
project, create_vm_body, kwargs = req
super().create_vm(project, create_vm_body)
self.task_queue.task_done()

# Check if the task queue is empty and call shutdown if it is
if self.task_queue.empty():
self.shutdown()

def create_vm(self, project_id, create_vm_body, **kwargs):
print("Adding VM...") #TODO
self.task_queue.put((project_id, create_vm_body))
self.start_workers()

# Custom implementation or additional logic
print("Creating VM with custom logic")
# return

def shutdown(self):
if not self.shutdown_event.is_set():
self.shutdown_event.set()
# Wait for all tasks to be processed
self.task_queue.join()
print("All VMs added. Waiting for all tasks to complete...")

# Stop worker threads
self.stop_workers()

self.executor.shutdown(wait=True)
print("Executor shutdown complete.")


def virtual_machines():
c, err = client()
if err:
raise Exception(err)
return cudo.VirtualMachinesApi(c)
return ExtendedVirtualMachinesApi(c)

0 comments on commit 4fa5269

Please sign in to comment.