Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype of udp based job manager #1

Closed
wants to merge 1 commit into from
Closed

Conversation

jdries
Copy link
Contributor

@jdries jdries commented Jun 28, 2024

Proposal for a UDP based variant of the job manager, created as part of APEx upscaling service:
https://jdries-vito.quarto.pub/apex-design/upscaling.html

Related issue is to support output to geoparquet: Open-EO/openeo-gfmap#107

The currently used csv format is limited in the sense that complex parameter types fail to deserialize correctly, requiring custom handling in this class. GeoParquet might improve this:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#json

Copy link
Contributor

@soxofaan soxofaan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple of notes

If I understand correctly, this PR adds two separate features to the existing job manager:

  • producing jobs from a fixed (but parameterized) UDP and a user-provided dataframe of parameter
  • Running the job manager in a thread

These features seems to be totally unrelated, so I wonder if they can't be separated.

For example:

  • the producing of the jobs could be a factory for a standard job manager
  • the threaded running could a method on the standard job manager

if self.dataframe is None:
self.dataframe = jobs_dataframe
else:
raise ValueError("Jobs already added to the job manager.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if else raise pattern looks like this could have been a constructor argument

p.get("schema", {}).get("subtype", "") == "geojson"]


output_file = Path("jobs.csv")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This static file reference should be an argument I guess


cube = connection.datacube_from_process(row.udp_id,row.udp_namespace, **parameters)

title = row.get("title", f"Subjob {row.udp_id} - {str(parameters)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use row index instead of str(parameters) in title to avoid extremely large titles?




import multiprocessing, time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these imports can be toplevel I think

@soxofaan
Copy link
Contributor

Because of various changes to the "official" MultiBackendJobManager from the client (e.g. built-in theaded run_jobs, and new job db initialization features), I think this PR is dead end, and better be closed.
However, it served as inspiration to implement UDP based job management in the python client itself:

@soxofaan soxofaan closed this Oct 14, 2024
@jdries jdries deleted the udp_job_manager branch October 16, 2024 08:57
@soxofaan
Copy link
Contributor

just merged Open-EO/openeo-python-client#644

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants