Skip to content

Commit

Permalink
possibly better
Browse files Browse the repository at this point in the history
  • Loading branch information
sean committed Oct 4, 2024
1 parent 716d87b commit fe9904d
Showing 1 changed file with 15 additions and 27 deletions.
42 changes: 15 additions & 27 deletions helpers/cudo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,73 +133,61 @@ def user():

class PooledVirtualMachinesApi(cudo.VirtualMachinesApi):
def __init__(self, api_client=None):
max_workers = 2 # TODO make larger
# self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.task_queue = None
self.max_workers = max_workers
self.max_workers = 2
self.shutdown_event = threading.Event()
self.workers_active = False
self.executor = None
atexit.register(self.stop_workers)
super().__init__(api_client)

def create_vm(self, project_id, create_vm_body, **kwargs):
print("create vm") # TODO
if not self.task_queue:
print("new task queue")
self.task_queue = Queue()
self.start_queue()
self.task_queue.put((project_id, create_vm_body))
self.start_workers()

return {"id": create_vm_body.vm_id}

def worker(self):
while self.workers_active:
print("worker getting tasks")
if not self.task_queue:
break
req = self.task_queue.get(1)
print("worker processing task")
req = self.task_queue.get() # block true, timeout none
create_vm_body = None
try:
project, create_vm_body = req
vm = super().create_vm(project, create_vm_body)
print(f"Created VM: {vm.to_dict()}")
except Exception as e:
print(f"Error creating VM: {create_vm_body.vm_id} {e}")
if create_vm_body:
print(f"Error creating VM: {create_vm_body.vm_id} {e}")
else:
print(f"Error creating VM: {e}")

self.task_queue.task_done()

if self.task_queue.empty():
self.shutdown()
# def start_queue(self):
def start_queue(self):
if not self.task_queue:
self.task_queue = Queue()

def start_workers(self):
if not self.workers_active:
self.shutdown_event.clear()
print("new threadpoolexecutor")
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
self.workers_active = True
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)

for _ in range(self.max_workers):
self.executor.submit(self.worker)

def stop_workers(self):
if not self.shutdown_event.is_set():
print("shutting down...") # TODO
try:
self.shutdown_event.set()
# self.task_queue.join()
self.workers_active = False
self.task_queue = None
self.shutdown_event.set()

print("joining executor") # TODO
if self.executor:
self.executor.shutdown(wait=False)
self.executor = None
print("shutdown") # TODO

except Exception as e:
print(f"Error shutting down: {e}")
else:
print("Already shutting down") # TODO


c, err = client()
Expand Down

0 comments on commit fe9904d

Please sign in to comment.