Skip to content

Commit

Permalink
Merge branch 'datacommonsorg:master' into NCES_HBCU_Enrollment
Browse files Browse the repository at this point in the history
  • Loading branch information
Bipnabraham authored Jan 28, 2025
2 parents 77c97ac + d7d7302 commit ec1784e
Show file tree
Hide file tree
Showing 54 changed files with 3,933 additions and 735 deletions.
13 changes: 4 additions & 9 deletions import-automation/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,8 @@ ENV JAVA_HOME=/usr/local/openjdk-17
COPY --from=openjdk:17-slim $JAVA_HOME $JAVA_HOME
ENV PATH="${JAVA_HOME}/bin:${PATH}"

WORKDIR /workspace

ADD requirements.txt /workspace/requirements.txt
RUN pip install -r /workspace/requirements.txt

RUN git clone https://github.com/datacommonsorg/data.git
RUN wget https://github.com/datacommonsorg/import/releases/download/0.1-alpha.1k/datacommons-import-tool-0.1-alpha.1-jar-with-dependencies.jar
COPY app/. /workspace/app/

CMD gunicorn --timeout 0 --workers 5 -b :$PORT app.main:FLASK_APP
WORKDIR /data/import-automation/executor
RUN wget https://storage.googleapis.com/datacommons_public/import_tools/datacommons-import-tool-0.1-alpha.1-jar-with-dependencies.jar
RUN pip install -r requirements.txt
ENTRYPOINT ["python", "main.py"]
16 changes: 13 additions & 3 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class ExecutorConfig:
dashboard_oauth_client_id: str = ''
# Oauth Client ID used to authenticate with the proxy.
importer_oauth_client_id: str = ''
# URL for the import executor container image.
importer_docker_image: str = 'gcr.io/datcom-ci/dc-import-executor:stable'
# Access token of the account used to authenticate with GitHub. This is not
# the account password. See
# https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token.
Expand All @@ -105,12 +107,18 @@ class ExecutorConfig:
requirements_filename: str = 'requirements.txt'
# ID of the location where Cloud Scheduler is hosted.
scheduler_location: str = 'us-central1'
# Location of the local git data repo.
local_repo_dir: str = '/data'
# Maximum time a user script can run for in seconds.
user_script_timeout: float = 3600
# Arguments for the user script
user_script_args: List[str] = ()
# Environment variables for the user script
user_script_env: dict = None
# Invoke validations before upload.
invoke_import_validation: bool = False
# Import validation config file.
validation_config_file: str = 'tools/import_validation/validation_config.json'
# Maximum time venv creation can take in seconds.
venv_create_timeout: float = 3600
# Maximum time downloading a file can take in seconds.
Expand All @@ -121,17 +129,19 @@ class ExecutorConfig:
email_account: str = ''
# The corresponding password, app password, or access token.
email_token: str = ''
# Disbale email alert notifications.
# Disable email alert notifications.
disable_email_notifications: bool = False
# Skip uploading the data to GCS (for local testing).
skip_gcs_upload: bool = False
# Maximum time a blocking call to the importer to
# perform an import can take in seconds.
importer_import_timeout: float = 20 * 60
# Maximum time a blocking call to the importer to
# delete an import can take in seconds.
importer_delete_timeout: float = 10 * 60
# Executor type depends on where the executor runs
# Suppports one of: "GKE", "GAE"
executor_type: str = 'GAE'
# Suppports one of: "GKE", "GAE", "CLOUD_RUN"
executor_type: str = 'CLOUD_RUN'

def get_data_refresh_config(self):
"""Returns the config used for Cloud Scheduler data refresh jobs."""
Expand Down
34 changes: 20 additions & 14 deletions import-automation/executor/app/executor/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@
from absl import logging
from google.api_core.exceptions import NotFound
from google.cloud import run_v2
from google.protobuf import duration_pb2


def create_or_update_cloud_run_job(
project_id: str,
location: str,
job_id: str,
image: str,
env_vars: dict,
) -> run_v2.Job:
def create_or_update_cloud_run_job(project_id: str, location: str, job_id: str,
image: str, env_vars: dict, args: list,
resources: dict, timeout: int) -> run_v2.Job:
"""Creates a new cloud run job or updates an existing one.
If the jobs exists, the container is updated with new image and environment
Expand All @@ -45,6 +42,9 @@ def create_or_update_cloud_run_job(
job_id: Name of the job
image: Container image URL such as 'gcr.io/your-project/your-image:latest'
env_vars: dict of environment variables as {'VAR': '<value>'}
args: list of command line arguments
resources: cpu/memory resources
timeout: duration in seconds
Returns:
Job created as a dict.
Expand All @@ -59,17 +59,23 @@ def create_or_update_cloud_run_job(
for var, value in env_vars.items():
env.append(run_v2.EnvVar(name=var, value=value))

container = run_v2.Container(image=image, env=env)
exe_template = run_v2.ExecutionTemplate(template=run_v2.TaskTemplate(
containers=[container]))
res = run_v2.types.ResourceRequirements(limits=resources)
container = run_v2.Container(image=image, env=env, resources=res, args=args)
# Labels allow filtering of automated import cloud run jobs, used in log-based metrics.
exe_template = run_v2.ExecutionTemplate(
labels={"datacommons_cloud_run_job_type": "auto_import_job"},
template=run_v2.TaskTemplate(
containers=[container],
max_retries=2,
timeout=duration_pb2.Duration(seconds=timeout)))
new_job = run_v2.Job(template=exe_template)
logging.info(f"Creating job {job_name}: {new_job}")
logging.info(f"Creating job: {job_name}")

# Look for existing job to update
job = None
try:
job = client.get_job(request=run_v2.GetJobRequest(name=job_name))
logging.info(f"Found existing job {job_name}: {job}")
logging.info(f"Found existing job: {job_name}")
except NotFound:
logging.info(f"No existing job, creating new job: {job_name}")

Expand All @@ -85,11 +91,11 @@ def create_or_update_cloud_run_job(
# Update existing Cloud Run job
# Overrides container settings including image, env
job.template.template.containers = new_job.template.template.containers
logging.info(f"Updating job {job_name}: {job}")
logging.info(f"Updating job: {job_name}")
update_request = run_v2.UpdateJobRequest(job=job)
update_operation = client.update_job(request=update_request)
result = update_operation.result() # Blocks until update completes
logging.info(f"Job updated {job_name}: {result}")
logging.info(f"Job updated: {job_name}")
return result


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,11 @@ def cloud_run_simple_import_job(
logging.info(
f'Setting up simple import cloud run {project_id}:{job_id} for'
f' {config_file} with output: {gcs_output_dir}, env: {env_vars}')
resources = {}
args = []
job = cloud_run.create_or_update_cloud_run_job(project_id, location, job_id,
image, env_vars)
image, env_vars, args,
resources)
if not job:
logging.error(
f'Failed to setup cloud run job {job_id} for {config_file}')
Expand Down
36 changes: 30 additions & 6 deletions import-automation/executor/app/executor/cloud_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.protobuf import json_format
from google.api_core.exceptions import AlreadyExists, NotFound

CLOUD_RUN_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
GKE_SERVICE_DOMAIN = os.getenv('GKE_SERVICE_DOMAIN',
'importautomation.datacommons.org')
GKE_CALLER_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
Expand All @@ -50,15 +51,38 @@ def _base_job_request(absolute_import_name, schedule: str):
# 30m is the max allowed deadline
'seconds': 60 * 30
}
# <'http_request'|'appengine_job_request'>: {...}
# <'gke_job_request'|'appengine_job_request'|'cloud_run_job_request'>: {...}
}


def http_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
def cloud_run_job_request(absolute_import_name, schedule,
cloud_run_job_url: str,
cloud_run_service_account: str) -> Dict:
"""Cloud Scheduler request that targets jobs in CLOUD_RUN."""
json_encoded_job_body = json.dumps({}).encode("utf-8")
job = _base_job_request(absolute_import_name, schedule)
job_name = absolute_import_name.split(':')[1]
job['name'] = f'{job_name}'
job['http_target'] = {
'uri': f'https://{cloud_run_job_url}',
'http_method': 'POST',
'headers': {
'Content-Type': 'application/json',
},
'body': json_encoded_job_body,
'oauth_token': {
'service_account_email': f'{cloud_run_service_account}',
'scope': 'https://www.googleapis.com/auth/cloud-platform'
}
}
return job


def gke_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
"""Cloud Scheduler request that targets executors launched in GKE."""

# If the service account and oauth audience are provided as
Expand Down
Loading

0 comments on commit ec1784e

Please sign in to comment.