Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebasing Yuvi's setup_py branch with main #110

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:

- name: Test with pytest
run: |
pytest -vvv -s --cov=pangeo_forge_runner tests/integration/test_flink.py
pytest -vvv -s --cov=pangeo_forge_runner tests/integration/test_flink_integration.py
kubectl get pod -A
kubectl describe pod

Expand Down
141 changes: 83 additions & 58 deletions pangeo_forge_runner/commands/bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,63 +222,88 @@ def start(self):
extra_options["requirements_file"] = str(requirements_path)

for name, recipe in recipes.items():
pipeline_options = bakery.get_pipeline_options(
job_name=self.job_name,
# FIXME: Bring this in from meta.yaml?
container_image=self.container_image,
extra_options=extra_options,
)

# Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline
# for pipeline options - we have traitlets doing that for us.
pipeline = Pipeline(options=pipeline_options, argv=[])
# Chain our recipe to the pipeline. This mutates the `pipeline` object!
# We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam'
# method that returns a transform.
if isinstance(recipe, PTransform):
# This means we are in pangeo-forge-recipes >=0.9
pipeline | recipe
elif hasattr(recipe, "to_beam"):
# We are in pangeo-forge-recipes <=0.9
# The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9
# NOTE: `StorageConfig` only requires a target; input and metadata caches are optional,
# so those are handled conditionally if provided.
from pangeo_forge_recipes.storage import StorageConfig

recipe.storage_config = StorageConfig(
target_storage.get_forge_target(job_name=self.job_name),
with feedstock.generate_setup_py() as setup_path:
extra_options["setup_file"] = setup_path
pipeline_options = bakery.get_pipeline_options(
job_name=self.job_name,
# FIXME: Bring this in from meta.yaml?
container_image=self.container_image,
extra_options=extra_options,
)
for attrname, optional_storage in zip(
("cache", "metadata"),
(input_cache_storage, metadata_cache_storage),
):
# `.root_path` is an empty string by default, so if the user has not setup this
# optional storage type in config, this block is skipped.
if optional_storage.root_path:
setattr(
recipe.storage_config,
attrname,
optional_storage.get_forge_target(
job_name=self.job_name
),
# Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline
# for pipeline options - we have traitlets doing that for us.
pipeline = Pipeline(options=pipeline_options, argv=[])
# Chain our recipe to the pipeline. This mutates the `pipeline` object!
# We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam'
# method that returns a transform.
if isinstance(recipe, PTransform):
# This means we are in pangeo-forge-recipes >=0.9
pipeline | recipe
elif hasattr(recipe, "to_beam"):
# We are in pangeo-forge-recipes <=0.9
# The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9
# NOTE: `StorageConfig` only requires a target; input and metadata caches are optional,
# so those are handled conditionally if provided.
from pangeo_forge_recipes.storage import StorageConfig

recipe.storage_config = StorageConfig(
target_storage.get_forge_target(job_name=self.job_name),
)
for attrname, optional_storage in zip(
("cache", "metadata"),
(input_cache_storage, metadata_cache_storage),
):
# `.root_path` is an empty string by default, so if the user has not setup this
# optional storage type in config, this block is skipped.
if optional_storage.root_path:
setattr(
recipe.storage_config,
attrname,
optional_storage.get_forge_target(
job_name=self.job_name
),
)
# with configured storage now attached, compile recipe to beam
pipeline | recipe.to_beam()

# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
# appropriate feedback to the user too.
extra = {"recipe": name, "job_name": self.job_name}
if bakery.blocking:
self.log.info(
f"Running job for recipe {name}\n",
extra=extra | {"status": "running"},
)
pipeline.run()
else:
result = pipeline.run()
job_id = result.job_id()
self.log.info(
f"Submitted job {job_id} for recipe {name}",
extra=extra | {"job_id": job_id, "status": "submitted"},
)

# Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline
# for pipeline options - we have traitlets doing that for us.
pipeline = Pipeline(options=pipeline_options, argv=[])
# Chain our recipe to the pipeline. This mutates the `pipeline` object!
pipeline | recipe.to_beam()

# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
# appropriate feedback to the user too.
extra = {"recipe": name, "job_name": self.job_name}
if bakery.blocking:
self.log.info(
f"Running job for recipe {name}\n",
extra=extra | {"status": "running"},
)
pipeline.run()
else:
result = pipeline.run()
job_id = result.job_id()
self.log.info(
f"Submitted job {job_id} for recipe {name}",
extra=extra | {"job_id": job_id, "status": "submitted"},
)
# with configured storage now attached, compile recipe to beam
pipeline | recipe.to_beam()

# Some bakeries are blocking - if Beam is configured to use them, calling
# pipeline.run() blocks. Some are not. We handle that here, and provide
# appropriate feedback to the user too.
extra = {"recipe": name, "job_name": self.job_name}
if bakery.blocking:
self.log.info(
f"Running job for recipe {name}\n",
extra=extra | {"status": "running"},
)
pipeline.run()
else:
result = pipeline.run()
job_id = result.job_id()
self.log.info(
f"Submitted job {job_id} for recipe {name}",
extra=extra | {"job_id": job_id, "status": "submitted"},
)
41 changes: 41 additions & 0 deletions pangeo_forge_runner/feedstock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import ast
import os
from contextlib import contextmanager
from copy import deepcopy
from pathlib import Path
from textwrap import dedent
from typing import Optional

from ruamel.yaml import YAML
Expand Down Expand Up @@ -86,6 +89,44 @@ def parse_recipes(self):

return recipes

@contextmanager
def generate_setup_py(self):
"""
Auto-generate a setup.py file for use with apache beam.

Beam sends all the user code we need to workers by creating an
sdist off a python package. However, our feedstocks only have a
few python files (at best) - mostly just one (recipe.py). We do not
want to impose creating a setup.py file manually for all our users,
so instead we autogenerate one here.
"""
file = dedent(
"""
import setuptools
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest moving this template out into its own file.

This generation also needs to be watched closely for code injection based attacks.


setuptools.setup(
name='recipe',
version='0.1',
# FIXME: Support all the files we need to here!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixme is basically the important part - it should contain all the files we wanna package and send into the runner.

py_modules=["recipe"]
)
"""
)

setup_path = self.feedstock_dir / "setup.py"
with open(setup_path, "w") as f:
f.write(file)

readme_path = self.feedstock_dir / "readme.md"

with open(readme_path, "w") as f:
f.write("")

try:
yield str(setup_path)
finally:
os.remove(setup_path)

def get_expanded_meta(self):
"""
Return full meta.yaml file, expanding recipes if needed.
Expand Down
Loading