diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index 3f7b1a8..cf21069 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -130,40 +130,35 @@ def user(): raise Exception(err) return cudo.UserApi(c) + class PooledVirtualMachinesApi(cudo.VirtualMachinesApi): def __init__(self, api_client=None): - max_workers=2 #TODO make larger - self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.task_queue = Queue() + max_workers = 2 # TODO make larger + # self.executor = ThreadPoolExecutor(max_workers=max_workers) + self.task_queue = None self.max_workers = max_workers self.shutdown_event = threading.Event() self.workers_active = False - - atexit.register(self.shutdown) + self.executor = None + atexit.register(self.stop_workers) super().__init__(api_client) - def start_workers(self): - - if not self.workers_active: - self.workers_active = True - self.shutdown_event.clear() - for _ in range(self.max_workers): - self.executor.submit(self.worker) + def create_vm(self, project_id, create_vm_body, **kwargs): + print("create vm") # TODO + if not self.task_queue: + print("new task queue") + self.task_queue = Queue() + self.task_queue.put((project_id, create_vm_body)) + self.start_workers() - def stop_workers(self): - if self.workers_active: - print("stopping workers") #TODO - self.workers_active = False - for _ in range(self.max_workers): - self.task_queue.put(None) - print("workers stopped") #TODO + return {"id": create_vm_body.vm_id} def worker(self): - while True: + while self.workers_active: print("worker getting tasks") - req = self.task_queue.get() - if req is None: + if not self.task_queue: break + req = self.task_queue.get(1) print("worker processing task") try: project, create_vm_body = req @@ -176,28 +171,42 @@ def worker(self): if self.task_queue.empty(): self.shutdown() + # def start_queue(self): - def create_vm(self, project_id, create_vm_body, **kwargs): - print("create vm") #TODO - self.task_queue.put((project_id, create_vm_body)) - self.start_workers() - - return {"id":create_vm_body.vm_id} + def start_workers(self): + if not self.workers_active: + self.shutdown_event.clear() + print("new threadpoolexecutor") + self.executor = ThreadPoolExecutor(max_workers=self.max_workers) + self.workers_active = True + for _ in range(self.max_workers): + self.executor.submit(self.worker) - def shutdown(self): + def stop_workers(self): if not self.shutdown_event.is_set(): - print("shutting down...") # TODO + print("shutting down...") # TODO + try: + self.shutdown_event.set() + # self.task_queue.join() + self.workers_active = False + self.task_queue = None + + print("joining executor") # TODO + if self.executor: + self.executor.shutdown(wait=False) + self.executor = None + print("shutdown") # TODO + except Exception as e: + print(f"Error shutting down: {e}") + else: + print("Already shutting down") # TODO - self.shutdown_event.set() - self.task_queue.join() - self.stop_workers() - self.executor.shutdown(wait=True) - print("shutdown") # TODO +c, err = client() +if err: + raise Exception(err) +pool = PooledVirtualMachinesApi(c) def virtual_machines(): - c, err = client() - if err: - raise Exception(err) - return PooledVirtualMachinesApi(c) #cudo.VirtualMachinesApi(c) + return pool