Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Invoker] Reduce threads in async FaaS Invoker and resolve token bucket issue #1414

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
-

### Fixed
- [Standalone] Fix issue causing worker to stop prematurely in Consume mode
- [Standalone] Fixed an issue causing workers to stop prematurely in Consume mode
- [Invoker] Reduced the number of threads used in the async FaaS Invoker
- [Monitoring] Fixed token bucket issue that prevented generating the correct number of tokens


## [v3.5.1]
Expand Down
17 changes: 14 additions & 3 deletions lithops/invokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ def __init__(self, config, executor_id, internal_storage, compute_handler, job_m
self.should_run = False
self.sync = is_lithops_worker()

invoke_pool_threads = self.config[self.backend]['invoke_pool_threads']
self.executor = ThreadPoolExecutor(invoke_pool_threads)
self.invoke_pool_threads = self.config[self.backend]['invoke_pool_threads']
self.executor = ThreadPoolExecutor(self.invoke_pool_threads)

logger.debug(f'ExecutorID {self.executor_id} - Serverless invoker created')

Expand All @@ -315,7 +315,7 @@ def invoker_process(inv_id):
"""Run process that implements token bucket scheduling approach"""
logger.debug(f'ExecutorID {self.executor_id} - Async invoker {inv_id} started')

with ThreadPoolExecutor(max_workers=250) as executor:
with ThreadPoolExecutor(max_workers=min(64, self.invoke_pool_threads // 4)) as executor:
while self.should_run:
try:
self.job_monitor.token_bucket_q.get()
Expand All @@ -330,6 +330,7 @@ def invoker_process(inv_id):
logger.debug(f'ExecutorID {self.executor_id} - Async invoker {inv_id} finished')

for inv_id in range(self.ASYNC_INVOKERS):
self.job_monitor.token_bucket_q.put('#')
p = threading.Thread(target=invoker_process, args=(inv_id,))
self.invokers.append(p)
p.daemon = True
Expand Down Expand Up @@ -430,6 +431,16 @@ def _invoke_job(self, job):
self.should_run = True
self._start_async_invokers()

if self.running_workers > 0 and not self.job_monitor.token_bucket_q.empty():
while not self.job_monitor.token_bucket_q.empty():
try:
self.job_monitor.token_bucket_q.get(False)
self.running_workers -= 1
if self.running_workers == 0:
break
except Exception:
pass

if self.running_workers < self.max_workers:
free_workers = self.max_workers - self.running_workers
total_direct = free_workers * job.chunksize
Expand Down
28 changes: 16 additions & 12 deletions lithops/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,22 @@ def run(self):
logger.debug(f'ExecutorID {self.executor_id} - Starting Storage job monitor')

wait_dur_sec = self.monitoring_interval
prevoius_log = None
previous_log = None
log_time = 0

def process_callids():
nonlocal previous_log, log_time
callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id)
# verify if there are new callids_done and reduce the sleep
new_callids_done = callids_done - self.callids_done_processed_status
# generate tokens and mark futures as running/done
self._generate_tokens(callids_running, callids_done)
self._tag_future_as_running(callids_running)
self._tag_future_as_ready(callids_done)
previous_log, log_time = self._print_status_log(previous_log, log_time)

return new_callids_done

while not self._all_ready():
time.sleep(wait_dur_sec)
wait_dur_sec = self.monitoring_interval
Expand All @@ -440,19 +453,10 @@ def run(self):
if not self.should_run:
break

callids_running, callids_done = \
self.internal_storage.get_job_status(self.executor_id)

# verify if there are new callids_done and reduce the sleep
new_callids_done = callids_done - self.callids_done_processed_status
if len(new_callids_done) > 0:
if len(process_callids()) > 0:
wait_dur_sec = self.monitoring_interval / 5

# generate tokens and mark futures as running/done
self._generate_tokens(callids_running, callids_done)
self._tag_future_as_running(callids_running)
self._tag_future_as_ready(callids_done)
prevoius_log, log_time = self._print_status_log(prevoius_log, log_time)
process_callids()

logger.debug(f'ExecutorID {self.executor_id} - Storage job monitor finished')

Expand Down
Loading