diff --git a/helpers/cudo_api.py b/helpers/cudo_api.py index cf21069..231987d 100644 --- a/helpers/cudo_api.py +++ b/helpers/cudo_api.py @@ -133,10 +133,8 @@ def user(): class PooledVirtualMachinesApi(cudo.VirtualMachinesApi): def __init__(self, api_client=None): - max_workers = 2 # TODO make larger - # self.executor = ThreadPoolExecutor(max_workers=max_workers) self.task_queue = None - self.max_workers = max_workers + self.max_workers = 2 self.shutdown_event = threading.Event() self.workers_active = False self.executor = None @@ -144,62 +142,52 @@ def __init__(self, api_client=None): super().__init__(api_client) def create_vm(self, project_id, create_vm_body, **kwargs): - print("create vm") # TODO - if not self.task_queue: - print("new task queue") - self.task_queue = Queue() + self.start_queue() self.task_queue.put((project_id, create_vm_body)) self.start_workers() - return {"id": create_vm_body.vm_id} def worker(self): while self.workers_active: - print("worker getting tasks") if not self.task_queue: break - req = self.task_queue.get(1) - print("worker processing task") + req = self.task_queue.get() # block true, timeout none + create_vm_body = None try: project, create_vm_body = req vm = super().create_vm(project, create_vm_body) print(f"Created VM: {vm.to_dict()}") except Exception as e: - print(f"Error creating VM: {create_vm_body.vm_id} {e}") + if create_vm_body: + print(f"Error creating VM: {create_vm_body.vm_id} {e}") + else: + print(f"Error creating VM: {e}") self.task_queue.task_done() - if self.task_queue.empty(): - self.shutdown() - # def start_queue(self): + def start_queue(self): + if not self.task_queue: + self.task_queue = Queue() def start_workers(self): if not self.workers_active: - self.shutdown_event.clear() - print("new threadpoolexecutor") - self.executor = ThreadPoolExecutor(max_workers=self.max_workers) self.workers_active = True + self.executor = ThreadPoolExecutor(max_workers=self.max_workers) + for _ in range(self.max_workers): self.executor.submit(self.worker) def stop_workers(self): if not self.shutdown_event.is_set(): - print("shutting down...") # TODO try: - self.shutdown_event.set() - # self.task_queue.join() self.workers_active = False - self.task_queue = None + self.shutdown_event.set() - print("joining executor") # TODO if self.executor: self.executor.shutdown(wait=False) - self.executor = None - print("shutdown") # TODO + except Exception as e: print(f"Error shutting down: {e}") - else: - print("Already shutting down") # TODO c, err = client()