-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue633 threaded download #708
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -606,27 +606,43 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No | |
df.loc[i, "status"] = "skipped" | ||
stats["start_job skipped"] += 1 | ||
|
||
|
||
def on_job_done(self, job: BatchJob, row): | ||
""" | ||
Handles jobs that have finished. Can be overridden to provide custom behaviour. | ||
|
||
Default implementation downloads the results into a folder containing the title. | ||
Default implementation runs the download in a separate thread. | ||
|
||
:param job: The job that has finished. | ||
:param row: DataFrame row containing the job's metadata. | ||
""" | ||
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd keep this TODO comment, it's not resolved yet as far as I know |
||
_log.info(f"Job {job.job_id} completed. Preparing to handle completion.") | ||
|
||
job_metadata = job.describe() | ||
job_dir = self.get_job_dir(job.job_id) | ||
metadata_path = self.get_job_metadata_path(job.job_id) | ||
|
||
self.ensure_job_dir_exists(job.job_id) | ||
job.get_results().download_files(target=job_dir) | ||
|
||
# Save metadata | ||
_log.info(f"Saving metadata for job {job.job_id} to {metadata_path}") | ||
with metadata_path.open("w", encoding="utf-8") as f: | ||
json.dump(job_metadata, f, ensure_ascii=False) | ||
|
||
# Define download logic inline | ||
def download_task(): | ||
try: | ||
_log.info(f"Starting download for job {job.job_id} to directory {job_dir}") | ||
job.get_results().download_files(target=job_dir) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you use A batch job object on it's own is not that complex except for the |
||
_log.info(f"Successfully downloaded job {job.job_id} results to {job_dir}") | ||
except Exception as e: | ||
_log.error(f"Error downloading job {job.job_id}: {e}") | ||
|
||
# Start the download in a separate thread | ||
_log.info(f"Starting download thread for job {job.job_id}") | ||
downloader = Thread(target=download_task, daemon=True) | ||
downloader.start() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you should keep track of the Thread objects, to properly |
||
|
||
def on_job_error(self, job: BatchJob, row): | ||
""" | ||
Handles jobs that stopped with errors. Can be overridden to provide custom behaviour. | ||
|
@@ -722,8 +738,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = | |
|
||
if new_status == "finished": | ||
stats["job finished"] += 1 | ||
self.on_job_done(the_job, active.loc[i]) | ||
|
||
self.on_job_done(the_job, active.loc[i]) | ||
if previous_status != "error" and new_status == "error": | ||
stats["job failed"] += 1 | ||
self.on_job_error(the_job, active.loc[i]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think threading should be opt-in and classic serial downloading should be the default (at least for now).
It's easy to get in trouble with threading (and other parallelism paradigms like async as used in notebooks).