Skip to content

Commit

Permalink
Merge branch 'datacommonsorg:master' into EPA_Emission_Inventory_Level1
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit231998 authored Jan 23, 2025
2 parents 41955c4 + 08052bf commit b88a035
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 62 deletions.
12 changes: 9 additions & 3 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class ExecutorConfig:
user_script_args: List[str] = ()
# Environment variables for the user script
user_script_env: dict = None
# Invoke validations before upload.
invoke_import_validation: bool = False
# Import validation config file.
validation_config_file: str = 'tools/import_validation/validation_config.json'
# Maximum time venv creation can take in seconds.
venv_create_timeout: float = 3600
# Maximum time downloading a file can take in seconds.
Expand All @@ -125,17 +129,19 @@ class ExecutorConfig:
email_account: str = ''
# The corresponding password, app password, or access token.
email_token: str = ''
# Disbale email alert notifications.
# Disable email alert notifications.
disable_email_notifications: bool = False
# Skip uploading the data to GCS (for local testing).
skip_gcs_upload: bool = False
# Maximum time a blocking call to the importer to
# perform an import can take in seconds.
importer_import_timeout: float = 20 * 60
# Maximum time a blocking call to the importer to
# delete an import can take in seconds.
importer_delete_timeout: float = 10 * 60
# Executor type depends on where the executor runs
# Suppports one of: "GKE", "GAE"
executor_type: str = 'GAE'
# Suppports one of: "GKE", "GAE", "CLOUD_RUN"
executor_type: str = 'CLOUD_RUN'

def get_data_refresh_config(self):
"""Returns the config used for Cloud Scheduler data refresh jobs."""
Expand Down
10 changes: 6 additions & 4 deletions import-automation/executor/app/executor/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ def create_or_update_cloud_run_job(project_id: str, location: str, job_id: str,

res = run_v2.types.ResourceRequirements(limits=resources)
container = run_v2.Container(image=image, env=env, resources=res, args=args)
# Labels allow filtering of automated import cloud run jobs, used in log-based metrics.
exe_template = run_v2.ExecutionTemplate(
template=run_v2.TaskTemplate(containers=[container],
max_retries=2,
timeout=duration_pb2.Duration(
seconds=timeout)))
labels={"datacommons_cloud_run_job_type": "auto_import_job"},
template=run_v2.TaskTemplate(
containers=[container],
max_retries=2,
timeout=duration_pb2.Duration(seconds=timeout)))
new_job = run_v2.Job(template=exe_template)
logging.info(f"Creating job: {job_name}")

Expand Down
157 changes: 128 additions & 29 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@
"""

import dataclasses
import glob
import json
import logging
import os
import sys
import subprocess
import tempfile
import time
import traceback
from typing import Callable, Dict, Iterable, List, Optional, Tuple

REPO_DIR = os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
sys.path.append(os.path.join(REPO_DIR, 'tools', 'import_differ'))
sys.path.append(os.path.join(REPO_DIR, 'tools', 'import_validation'))

from import_differ import ImportDiffer
from import_validation import ImportValidation
from app import configs
from app import utils
from app.executor import cloud_run_simple_import
Expand All @@ -34,6 +45,7 @@
from app.service import file_uploader
from app.service import github_api
from app.service import import_service
from google.cloud import storage

# Email address for status messages.
_DEBUG_EMAIL_ADDR = '[email protected]'
Expand Down Expand Up @@ -317,6 +329,97 @@ def _import_one(
)
raise exc

def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
absolute_import_dir: str,
import_spec: dict) -> None:
"""
Performs validations on import data.
"""
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
mcf_path = import_input['node_mcf']
if not mcf_path:
# TODO: Generate node mcf using dc-import tool
logging.error(
'Empty node_mcf in manifest, skipping validation.')
current_data_path = os.path.join(absolute_import_dir, mcf_path)
previous_data_path = os.path.join(absolute_import_dir,
'previous_data.mcf')
summary_stats = os.path.join(absolute_import_dir,
'summary_report.csv')
validation_output_path = os.path.join(absolute_import_dir,
'validation')
config_file = import_spec.get('validation_config_file', '')
if not config_file:
config_file = self.config.validation_config_file
config_file_path = os.path.join(REPO_DIR, config_file)
logging.info(f'Validation config file: {config_file_path}')

# Download previous import data.
bucket = storage.Client(self.config.gcs_project_id).bucket(
self.config.storage_prod_bucket_name)
folder = relative_import_dir + '/' + import_spec['import_name'] + '/'
blob = bucket.blob(folder + 'latest_version.txt')
if not blob:
logging.error(
f'Not able to download latest_version.txt from {folder}, skipping validation.'
)
return
latest_version = blob.download_as_text()
blob = bucket.blob(folder + latest_version + '/' + mcf_path)
if not blob:
logging.error(
f'Not able to download previous import from {latest_version}, skipping validation.'
)
return
# blob.download_to_filename(previous_data_path)

# Invoke differ script.
differ = ImportDiffer(current_data_path, previous_data_path,
validation_output_path)
differ.run_differ()

# Invoke validation script.
validation_output = os.path.join(validation_output_path,
'validation_output.csv')
differ_output = os.path.join(validation_output_path,
'point_analysis_summary.csv')
validation = ImportValidation(config_file_path, differ_output,
summary_stats, validation_output)
validation.run_validations()

def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict,
version: str, interpreter_path: str,
process: subprocess.CompletedProcess) -> None:
script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
script_interpreter = _get_script_interpreter(
script_path, interpreter_path)
process = _run_user_script(
interpreter_path=script_interpreter,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

def _import_one_helper(
self,
repo_dir: str,
Expand Down Expand Up @@ -350,35 +453,23 @@ def _import_one_helper(
_log_process(process=process)
process.check_returncode()

script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
script_interpreter = _get_script_interpreter(
script_path, interpreter_path)
process = _run_user_script(
interpreter_path=script_interpreter,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
name=import_name,
)
_log_process(process=process)
process.check_returncode()
self._invoke_import_job(absolute_import_dir=absolute_import_dir,
import_spec=import_spec,
version=version,
interpreter_path=interpreter_path,
process=process)

if self.config.invoke_import_validation:
logging.info("Invoking import validations")
self._invoke_import_validation(
repo_dir=repo_dir,
relative_import_dir=relative_import_dir,
absolute_import_dir=absolute_import_dir,
import_spec=import_spec)

if self.config.skip_gcs_upload:
logging.info("Skipping GCS upload")
return

inputs = self._upload_import_inputs(
import_dir=absolute_import_dir,
Expand All @@ -387,6 +478,14 @@ def _import_one_helper(
import_spec=import_spec,
)

validation_output_path = os.path.join(absolute_import_dir, 'validation')
for filepath in glob.iglob(f'{validation_output_path}/*.csv'):
dest = f'{relative_import_dir}/{import_name}/{version}/validation/{os.path.basename(filepath)}'
self.uploader.upload_file(
src=filepath,
dest=dest,
)

if self.importer:
self.importer.delete_previous_output(relative_import_dir,
import_spec)
Expand Down
8 changes: 4 additions & 4 deletions import-automation/executor/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
steps:
# Docker Build
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '${_DOCKER_IMAGE}:latest', '.']
args: ['build', '-t', '${_DOCKER_IMAGE}:${COMMIT_SHA}', '-t', '${_DOCKER_IMAGE}:latest', '.']
dir: 'import-automation/executor'

# Docker push to Google Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_DOCKER_IMAGE}:latest']
args: ['push', '${_DOCKER_IMAGE}', '--all-tags']

# Install dependencies
- name: python:3.11.11
Expand All @@ -32,5 +32,5 @@ steps:
args:
- '-c'
- |
docker tag ${_DOCKER_IMAGE}:latest ${_DOCKER_IMAGE}:stable \
&& docker push ${_DOCKER_IMAGE}:stable
docker tag ${_DOCKER_IMAGE}:${COMMIT_SHA} ${_DOCKER_IMAGE}:stable \
&& docker push ${_DOCKER_IMAGE}:stable
45 changes: 44 additions & 1 deletion import-automation/executor/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Import executor entrypoint.
"""
import logging
import json
import os
import time

from absl import flags
from absl import app
Expand All @@ -14,12 +32,23 @@
flags.DEFINE_string('import_name', '', 'Absoluate import name.')
flags.DEFINE_string('import_config', '', 'Import executor configuration.')

CLOUD_RUN_JOB_NAME = os.getenv("CLOUD_RUN_JOB")
# The `log_type` label helps filter log lines, which is useful for creating
# log-based metrics. Each log type has a similar set of fields for easier parsing.
LOG_TYPE_LABEL = "log_type"
# log_type for capturing status of auto import cloud run jobs.
# Required fields - log_type, message, status, latency_secs.
AUTO_IMPORT_JOB_STATUS_LOG_TYPE = "auto-import-job-status"


def scheduled_updates(absolute_import_name: str, import_config: str):
"""
Invokes import update workflow.
"""
start_time = time.time()
logging.info(absolute_import_name)
cfg = json.loads(import_config)
config = configs.ExecutorConfig(**cfg)
logging.info(config)
executor = import_executor.ImportExecutor(
uploader=file_uploader.GCSFileUploader(
project_id=config.gcs_project_id,
Expand All @@ -35,6 +64,20 @@ def scheduled_updates(absolute_import_name: str, import_config: str):
local_repo_dir=config.local_repo_dir)
result = executor.execute_imports_on_update(absolute_import_name)
logging.info(result)
elapsed_time_secs = int(time.time() - start_time)
message = (f"Cloud Run Job [{CLOUD_RUN_JOB_NAME}] completed with status= "
f"[{result.status}] in [{elapsed_time_secs}] seconds.)")
# With Python logging lib, json is interpreted as text (populates textPayload field).
# Using print to populate json as structured logs (populate jsonPayload field).
# Ref: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs
print(
json.dumps({
LOG_TYPE_LABEL: AUTO_IMPORT_JOB_STATUS_LOG_TYPE,
"message": message,
"severity": "INFO" if result.status == 'succeeded' else "ERROR",
"status": result.status,
"latency_secs": elapsed_time_secs,
}))
if result.status == 'failed':
return 1
return 0
Expand Down
1 change: 1 addition & 0 deletions import-automation/executor/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ gunicorn
pytz
absl-py
croniter
pandas
Loading

0 comments on commit b88a035

Please sign in to comment.