Skip to content

Commit

Permalink
step back
Browse files Browse the repository at this point in the history
  • Loading branch information
HansVRP committed Feb 11, 2025
1 parent 5819dca commit d4c39e6
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,13 @@ def run_jobs(
stats["sleep"] += 1


# Wait for any remaining tasks to complete. TODO make sure that queue is empty as well
executor.shutdown(wait=True)
# Signal the output thread to stop.

# Drain the queue before shutting down the worker
while not self.job_output_queue.empty():
time.sleep(0.1)

# Signal the output thread to stop
self.job_output_queue.put(None)
output_thread.join()

Expand Down Expand Up @@ -585,7 +589,7 @@ def _job_update_loop(
stats["track_statuses"] += 1

#TODO: Move the not started logic into get_jobs to launch, once we no longer parse the whole dataframe
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
not_started = job_db.get_by_status(statuses=["not_started"], max=200)
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created", "queued", "running"])
Expand All @@ -607,7 +611,6 @@ def _job_update_loop(
with self.df_lock:
job_db.persist(not_started.loc[i : i + 1])
stats["job_db persist"] += 1
total_added += 1

except Exception as e:
_log.error(f"Job launch failed for index {i}: {e}")
Expand Down

0 comments on commit d4c39e6

Please sign in to comment.