Skip to content

Commit

Permalink
pre rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
sean committed Oct 4, 2024
1 parent b9ccb77 commit 716d87b
Showing 1 changed file with 48 additions and 39 deletions.
87 changes: 48 additions & 39 deletions helpers/cudo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,40 +130,35 @@ def user():
raise Exception(err)
return cudo.UserApi(c)


class PooledVirtualMachinesApi(cudo.VirtualMachinesApi):
def __init__(self, api_client=None):
max_workers=2 #TODO make larger
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.task_queue = Queue()
max_workers = 2 # TODO make larger
# self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.task_queue = None
self.max_workers = max_workers
self.shutdown_event = threading.Event()
self.workers_active = False

atexit.register(self.shutdown)
self.executor = None
atexit.register(self.stop_workers)
super().__init__(api_client)

def start_workers(self):

if not self.workers_active:
self.workers_active = True
self.shutdown_event.clear()
for _ in range(self.max_workers):
self.executor.submit(self.worker)
def create_vm(self, project_id, create_vm_body, **kwargs):
print("create vm") # TODO
if not self.task_queue:
print("new task queue")
self.task_queue = Queue()
self.task_queue.put((project_id, create_vm_body))
self.start_workers()

def stop_workers(self):
if self.workers_active:
print("stopping workers") #TODO
self.workers_active = False
for _ in range(self.max_workers):
self.task_queue.put(None)
print("workers stopped") #TODO
return {"id": create_vm_body.vm_id}

def worker(self):
while True:
while self.workers_active:
print("worker getting tasks")
req = self.task_queue.get()
if req is None:
if not self.task_queue:
break
req = self.task_queue.get(1)
print("worker processing task")
try:
project, create_vm_body = req
Expand All @@ -176,28 +171,42 @@ def worker(self):

if self.task_queue.empty():
self.shutdown()
# def start_queue(self):

def create_vm(self, project_id, create_vm_body, **kwargs):
print("create vm") #TODO
self.task_queue.put((project_id, create_vm_body))
self.start_workers()

return {"id":create_vm_body.vm_id}
def start_workers(self):
if not self.workers_active:
self.shutdown_event.clear()
print("new threadpoolexecutor")
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
self.workers_active = True
for _ in range(self.max_workers):
self.executor.submit(self.worker)

def shutdown(self):
def stop_workers(self):
if not self.shutdown_event.is_set():
print("shutting down...") # TODO
print("shutting down...") # TODO
try:
self.shutdown_event.set()
# self.task_queue.join()
self.workers_active = False
self.task_queue = None

print("joining executor") # TODO
if self.executor:
self.executor.shutdown(wait=False)
self.executor = None
print("shutdown") # TODO
except Exception as e:
print(f"Error shutting down: {e}")
else:
print("Already shutting down") # TODO

self.shutdown_event.set()
self.task_queue.join()
self.stop_workers()

self.executor.shutdown(wait=True)
print("shutdown") # TODO
c, err = client()
if err:
raise Exception(err)
pool = PooledVirtualMachinesApi(c)


def virtual_machines():
c, err = client()
if err:
raise Exception(err)
return PooledVirtualMachinesApi(c) #cudo.VirtualMachinesApi(c)
return pool

0 comments on commit 716d87b

Please sign in to comment.