Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invoke validations from import executor #1141

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class ExecutorConfig:
user_script_args: List[str] = ()
# Environment variables for the user script
user_script_env: dict = None
# Invoke validations before upload.
invoke_import_validation: bool = False
# Import validation config file.
validation_config_file: str = 'tools/import_validation/validation_config.json'
# Maximum time venv creation can take in seconds.
venv_create_timeout: float = 3600
# Maximum time downloading a file can take in seconds.
Expand All @@ -125,17 +129,19 @@ class ExecutorConfig:
email_account: str = ''
# The corresponding password, app password, or access token.
email_token: str = ''
# Disbale email alert notifications.
# Disable email alert notifications.
disable_email_notifications: bool = False
# Skip uploading the data to GCS (for local testing).
skip_gcs_upload: bool = False
# Maximum time a blocking call to the importer to
# perform an import can take in seconds.
importer_import_timeout: float = 20 * 60
# Maximum time a blocking call to the importer to
# delete an import can take in seconds.
importer_delete_timeout: float = 10 * 60
# Executor type depends on where the executor runs
# Suppports one of: "GKE", "GAE"
executor_type: str = 'GAE'
# Suppports one of: "GKE", "GAE", "CLOUD_RUN"
executor_type: str = 'CLOUD_RUN'

def get_data_refresh_config(self):
"""Returns the config used for Cloud Scheduler data refresh jobs."""
Expand Down
157 changes: 128 additions & 29 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@
"""

import dataclasses
import glob
import json
import logging
import os
import sys
import subprocess
import tempfile
import time
import traceback
from typing import Callable, Dict, Iterable, List, Optional, Tuple

REPO_DIR = os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
sys.path.append(os.path.join(REPO_DIR, 'tools', 'import_differ'))
sys.path.append(os.path.join(REPO_DIR, 'tools', 'import_validation'))

from import_differ import ImportDiffer
from import_validation import ImportValidation
from app import configs
from app import utils
from app.executor import cloud_run_simple_import
Expand All @@ -34,6 +45,7 @@
from app.service import file_uploader
from app.service import github_api
from app.service import import_service
from google.cloud import storage

# Email address for status messages.
_DEBUG_EMAIL_ADDR = '[email protected]'
Expand Down Expand Up @@ -317,6 +329,97 @@ def _import_one(
)
raise exc

def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
absolute_import_dir: str,
import_spec: dict) -> None:
"""
Performs validations on import data.
"""
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
mcf_path = import_input['node_mcf']
if not mcf_path:
# TODO: Generate node mcf using dc-import tool
logging.error(
'Empty node_mcf in manifest, skipping validation.')
current_data_path = os.path.join(absolute_import_dir, mcf_path)
previous_data_path = os.path.join(absolute_import_dir,
'previous_data.mcf')
summary_stats = os.path.join(absolute_import_dir,
'summary_report.csv')
validation_output_path = os.path.join(absolute_import_dir,
'validation')
config_file = import_spec.get('validation_config_file', '')
if not config_file:
config_file = self.config.validation_config_file
config_file_path = os.path.join(REPO_DIR, config_file)
logging.info(f'Validation config file: {config_file_path}')

# Download previous import data.
bucket = storage.Client(self.config.gcs_project_id).bucket(
self.config.storage_prod_bucket_name)
folder = relative_import_dir + '/' + import_spec['import_name'] + '/'
blob = bucket.blob(folder + 'latest_version.txt')
if not blob:
logging.error(
f'Not able to download latest_version.txt from {folder}, skipping validation.'
)
return
latest_version = blob.download_as_text()
blob = bucket.blob(folder + latest_version + '/' + mcf_path)
if not blob:
ajaits marked this conversation as resolved.
Show resolved Hide resolved
logging.error(
f'Not able to download previous import from {latest_version}, skipping validation.'
)
return
# blob.download_to_filename(previous_data_path)

# Invoke differ script.
differ = ImportDiffer(current_data_path, previous_data_path,
validation_output_path)
differ.run_differ()

# Invoke validation script.
validation_output = os.path.join(validation_output_path,
'validation_output.csv')
differ_output = os.path.join(validation_output_path,
'point_analysis_summary.csv')
validation = ImportValidation(config_file_path, differ_output,
summary_stats, validation_output)
validation.run_validations()

def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict,
version: str, interpreter_path: str,
process: subprocess.CompletedProcess) -> None:
script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
script_interpreter = _get_script_interpreter(
script_path, interpreter_path)
process = _run_user_script(
interpreter_path=script_interpreter,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

def _import_one_helper(
self,
repo_dir: str,
Expand Down Expand Up @@ -350,35 +453,23 @@ def _import_one_helper(
_log_process(process=process)
process.check_returncode()

script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
script_interpreter = _get_script_interpreter(
vish-cs marked this conversation as resolved.
Show resolved Hide resolved
script_path, interpreter_path)
process = _run_user_script(
interpreter_path=script_interpreter,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
name=import_name,
)
_log_process(process=process)
process.check_returncode()
self._invoke_import_job(absolute_import_dir=absolute_import_dir,
import_spec=import_spec,
version=version,
interpreter_path=interpreter_path,
process=process)

if self.config.invoke_import_validation:
logging.info("Invoking import validations")
self._invoke_import_validation(
repo_dir=repo_dir,
relative_import_dir=relative_import_dir,
absolute_import_dir=absolute_import_dir,
import_spec=import_spec)

if self.config.skip_gcs_upload:
logging.info("Skipping GCS upload")
return

inputs = self._upload_import_inputs(
import_dir=absolute_import_dir,
Expand All @@ -387,6 +478,14 @@ def _import_one_helper(
import_spec=import_spec,
)

validation_output_path = os.path.join(absolute_import_dir, 'validation')
for filepath in glob.iglob(f'{validation_output_path}/*.csv'):
dest = f'{relative_import_dir}/{import_name}/{version}/validation/{os.path.basename(filepath)}'
self.uploader.upload_file(
src=filepath,
dest=dest,
)

if self.importer:
self.importer.delete_previous_output(relative_import_dir,
import_spec)
Expand Down
20 changes: 19 additions & 1 deletion import-automation/executor/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Import executor entrypoint.
"""
import logging
import json

Expand All @@ -16,10 +32,12 @@


def scheduled_updates(absolute_import_name: str, import_config: str):
"""
Invokes import update workflow.
"""
logging.info(absolute_import_name)
cfg = json.loads(import_config)
config = configs.ExecutorConfig(**cfg)
logging.info(config)
executor = import_executor.ImportExecutor(
uploader=file_uploader.GCSFileUploader(
project_id=config.gcs_project_id,
Expand Down
1 change: 1 addition & 0 deletions import-automation/executor/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ gunicorn
pytz
absl-py
croniter
pandas
27 changes: 16 additions & 11 deletions tools/import_differ/import_differ.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

import differ_utils

SAMPLE_COUNT = 3
GROUPBY_COLUMNS = 'variableMeasured,observationAbout,observationDate,measurementMethod,unit,observationPeriod'
VALUE_COLUMNS = 'value,scalingFactor'

FLAGS = flags.FLAGS
flags.DEFINE_string(
'current_data', '', 'Path to the current MCF data \
Expand All @@ -34,15 +38,12 @@
'Path to the output data folder.')

flags.DEFINE_string(
'groupby_columns',
'variableMeasured,observationAbout,observationDate,measurementMethod,unit',
'groupby_columns', GROUPBY_COLUMNS,
'Columns to group data for diff analysis in the order (var,place,time etc.).'
)
flags.DEFINE_string('value_columns', 'value,scalingFactor',
flags.DEFINE_string('value_columns', VALUE_COLUMNS,
'Columns with statvar value for diff analysis.')

SAMPLE_COUNT = 3


class ImportDiffer:
"""
Expand All @@ -69,8 +70,12 @@ class ImportDiffer:

"""

def __init__(self, current_data, previous_data, output_location,
groupby_columns, value_columns):
def __init__(self,
current_data,
previous_data,
output_location,
groupby_columns=GROUPBY_COLUMNS,
value_columns=VALUE_COLUMNS):
self.current_data = current_data
self.previous_data = previous_data
self.output_location = output_location
Expand All @@ -89,8 +94,8 @@ def _cleanup_data(self, df: pd.DataFrame):
def _get_samples(self, row):
years = sorted(row[self.time_column])
if len(years) > SAMPLE_COUNT:
return years[0] + random.sample(years[1:-1],
SAMPLE_COUNT - 2) + years[-1]
return [years[0]] + random.sample(years[1:-1],
SAMPLE_COUNT - 2) + [years[-1]]
else:
return years

Expand Down Expand Up @@ -213,8 +218,8 @@ def series_analysis(self,
return summary, result

def run_differ(self):
if not os.path.exists(FLAGS.output_location):
os.makedirs(FLAGS.output_location)
if not os.path.exists(self.output_location):
os.makedirs(self.output_location)
logging.info('Loading data...')
current_df = differ_utils.load_data(self.current_data,
self.output_location)
Expand Down
14 changes: 9 additions & 5 deletions tools/import_validation/import_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
flags.DEFINE_string('differ_output_location', '.',
'Path to the differ output data folder.')
flags.DEFINE_string('stats_summary_location', '.',
'Path to the stats summary report.')
'Path to the stats summary report folder.')
flags.DEFINE_string('validation_output_location', '.',
'Path to the validation output folder.')

POINT_ANALAYSIS_FILE = 'point_analysis_summary.csv'
STATS_SUMMARY_FILE = 'summary_report.csv'
Expand Down Expand Up @@ -67,8 +69,8 @@ class ImportValidation:
Sample config and output files can be found in test folder.
"""

def __init__(self, config_file: str, differ_output: str,
stats_summary: str):
def __init__(self, config_file: str, differ_output: str, stats_summary: str,
validation_output: str):
logging.info('Reading config from %s', config_file)
self.differ_results = pd.read_csv(differ_output)
self.validation_map = {
Expand All @@ -77,6 +79,7 @@ def __init__(self, config_file: str, differ_output: str,
Validation.DELETED_COUNT: self._deleted_count_validation,
Validation.UNMODIFIED_COUNT: self._unmodified_count_validation
}
self.validation_output = validation_output
self.validation_result = []
with open(config_file, encoding='utf-8') as fd:
self.validation_config = json.load(fd)
Expand Down Expand Up @@ -114,7 +117,7 @@ def _run_validation(self, config) -> ValidationResult:
return ValidationResult('FAILED', config['validation'], repr(exc))

def run_validations(self):
output_file = open(VALIDATION_OUTPUT_FILE, mode='w', encoding='utf-8')
output_file = open(self.validation_output, mode='w', encoding='utf-8')
output_file.write('test,status,message\n')
for config in self.validation_config:
result = self._run_validation(config)
Expand All @@ -128,7 +131,8 @@ def main(_):
validation = ImportValidation(
FLAGS.config_file,
os.path.join(FLAGS.differ_output_location, POINT_ANALAYSIS_FILE),
os.path.join(FLAGS.stats_summary_location, STATS_SUMMARY_FILE))
os.path.join(FLAGS.stats_summary_location, STATS_SUMMARY_FILE),
os.paht.join(FLAGS.validation_output_location, VALIDATION_OUTPUT_FILE))
validation.run_validations()


Expand Down
Loading
Loading