Skip to content

Commit

Permalink
first version of output thread
Browse files Browse the repository at this point in the history
  • Loading branch information
HansVRP committed Feb 10, 2025
1 parent 5819dca commit d21c445
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,28 +544,20 @@ def _output_queue_worker(self, df, job_db):
print(f"Failed to load dataframe from job_db: {e}", flush=True)
return


while True:
item = self.job_output_queue.get(timeout=1)
if item is None:
print("Output worker received shutdown signal.", flush=True)
self.job_output_queue.task_done()
break

i, status = item
print(f"Output worker processing row {i}: '{status}'", flush=True)

i, updates = item # Now expecting a dictionary of updates
print(f"Processing updates for row {i}: {updates}", flush=True)
with self.df_lock:
# Update the dataframe
df.loc[i, "status"] = status
print(f"Updated dataframe row {i}: {df.loc[i].to_dict()}", flush=True)
# Persist the updated row to the job database
for col, value in updates.items():
df.loc[i, col] = value
job_db.persist(df.loc[[i]])

print(f"Persisted row {i} to job database.", flush=True)
self.job_output_queue.task_done()

except Exception as e:
print(f"Output worker crashed: {e}", flush=True)
print(f"Output worker error: {e}", flush=True)

def _job_update_loop(
self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None
Expand Down Expand Up @@ -607,7 +599,6 @@ def _job_update_loop(
with self.df_lock:
job_db.persist(not_started.loc[i : i + 1])
stats["job_db persist"] += 1
total_added += 1

except Exception as e:
_log.error(f"Job launch failed for index {i}: {e}")
Expand Down Expand Up @@ -710,7 +701,7 @@ def _launch_job(self, i, connection, job_id, stats):
stats["job start error"] += 1

# Instead of updating the dataframe, push the outcome to the output queue.
self.job_output_queue.put((i, status))
self.job_output_queue.put((i, {'status': status, 'id': job_id}))


def on_job_done(self, job: BatchJob, row):
Expand All @@ -724,6 +715,23 @@ def on_job_done(self, job: BatchJob, row):
"""
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?


# final metric update to queue
updates = {
'status': 'finished',
'cpu': row.get('cpu'),
'memory': row.get('memory'),
'duration': row.get('duration'),
'costs': row.get('costs')
}

with self.df_lock:
for key, value in updates.items():
row[key] = value
# Send updates to the output queue
self.job_output_queue.put((row.name, updates))

#download the data
job_metadata = job.describe()
job_dir = self.get_job_dir(job.job_id)
metadata_path = self.get_job_metadata_path(job.job_id)
Expand Down

0 comments on commit d21c445

Please sign in to comment.