diff --git a/CHANGELOG.md b/CHANGELOG.md index d1478746..156e5c16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,9 @@ - ### Fixed -- [Standalone] Fix issue causing worker to stop prematurely in Consume mode +- [Standalone] Fixed an issue causing workers to stop prematurely in Consume mode +- [Invoker] Reduced the number of threads used in the async FaaS Invoker +- [Monitoring] Fixed token bucket issue that prevented generating the correct number of tokens ## [v3.5.1] diff --git a/lithops/invokers.py b/lithops/invokers.py index a512b229..60730831 100644 --- a/lithops/invokers.py +++ b/lithops/invokers.py @@ -301,8 +301,8 @@ def __init__(self, config, executor_id, internal_storage, compute_handler, job_m self.should_run = False self.sync = is_lithops_worker() - invoke_pool_threads = self.config[self.backend]['invoke_pool_threads'] - self.executor = ThreadPoolExecutor(invoke_pool_threads) + self.invoke_pool_threads = self.config[self.backend]['invoke_pool_threads'] + self.executor = ThreadPoolExecutor(self.invoke_pool_threads) logger.debug(f'ExecutorID {self.executor_id} - Serverless invoker created') @@ -315,7 +315,7 @@ def invoker_process(inv_id): """Run process that implements token bucket scheduling approach""" logger.debug(f'ExecutorID {self.executor_id} - Async invoker {inv_id} started') - with ThreadPoolExecutor(max_workers=250) as executor: + with ThreadPoolExecutor(max_workers=min(64, self.invoke_pool_threads // 4)) as executor: while self.should_run: try: self.job_monitor.token_bucket_q.get() @@ -330,6 +330,7 @@ def invoker_process(inv_id): logger.debug(f'ExecutorID {self.executor_id} - Async invoker {inv_id} finished') for inv_id in range(self.ASYNC_INVOKERS): + self.job_monitor.token_bucket_q.put('#') p = threading.Thread(target=invoker_process, args=(inv_id,)) self.invokers.append(p) p.daemon = True @@ -430,6 +431,16 @@ def _invoke_job(self, job): self.should_run = True self._start_async_invokers() + if self.running_workers > 0 and not self.job_monitor.token_bucket_q.empty(): + while not self.job_monitor.token_bucket_q.empty(): + try: + self.job_monitor.token_bucket_q.get(False) + self.running_workers -= 1 + if self.running_workers == 0: + break + except Exception: + pass + if self.running_workers < self.max_workers: free_workers = self.max_workers - self.running_workers total_direct = free_workers * job.chunksize diff --git a/lithops/monitor.py b/lithops/monitor.py index c0695966..bdaaff06 100644 --- a/lithops/monitor.py +++ b/lithops/monitor.py @@ -429,9 +429,22 @@ def run(self): logger.debug(f'ExecutorID {self.executor_id} - Starting Storage job monitor') wait_dur_sec = self.monitoring_interval - prevoius_log = None + previous_log = None log_time = 0 + def process_callids(): + nonlocal previous_log, log_time + callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id) + # verify if there are new callids_done and reduce the sleep + new_callids_done = callids_done - self.callids_done_processed_status + # generate tokens and mark futures as running/done + self._generate_tokens(callids_running, callids_done) + self._tag_future_as_running(callids_running) + self._tag_future_as_ready(callids_done) + previous_log, log_time = self._print_status_log(previous_log, log_time) + + return new_callids_done + while not self._all_ready(): time.sleep(wait_dur_sec) wait_dur_sec = self.monitoring_interval @@ -440,19 +453,10 @@ def run(self): if not self.should_run: break - callids_running, callids_done = \ - self.internal_storage.get_job_status(self.executor_id) - - # verify if there are new callids_done and reduce the sleep - new_callids_done = callids_done - self.callids_done_processed_status - if len(new_callids_done) > 0: + if len(process_callids()) > 0: wait_dur_sec = self.monitoring_interval / 5 - # generate tokens and mark futures as running/done - self._generate_tokens(callids_running, callids_done) - self._tag_future_as_running(callids_running) - self._tag_future_as_ready(callids_done) - prevoius_log, log_time = self._print_status_log(prevoius_log, log_time) + process_callids() logger.debug(f'ExecutorID {self.executor_id} - Storage job monitor finished')