Skip to content

Commit

Permalink
Import validation updates
Browse files Browse the repository at this point in the history
- Invoke dc import tool to generate resolved mcf
- Upload validation output files to GCS
- Validation status check for upload
- Default executor type: CLOUD_RUN
  • Loading branch information
vish-cs committed Feb 4, 2025
1 parent f056d77 commit 63babc5
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 74 deletions.
4 changes: 4 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class ExecutorConfig:
scheduler_location: str = 'us-central1'
# Location of the local git data repo.
local_repo_dir: str = '/data'
# Location of the import tool jar.
import_tool_path: str = '/data/import-automation/executor/datacommons-import-tool-0.1-alpha.1-jar-with-dependencies.jar'
# Maximum time a user script can run for in seconds.
user_script_timeout: float = 3600
# Arguments for the user script
Expand All @@ -117,6 +119,8 @@ class ExecutorConfig:
user_script_env: dict = None
# Invoke validations before upload.
invoke_import_validation: bool = False
# Ignore validation status during import.
ignore_validation_status: bool = True
# Import validation config file.
validation_config_file: str = 'tools/import_validation/validation_config.json'
# Maximum time venv creation can take in seconds.
Expand Down
182 changes: 109 additions & 73 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,67 +332,108 @@ def _import_one(
)
raise exc

def _get_latest_version(self, import_dir: str) -> str:
# Download previous import data.
bucket = storage.Client(self.config.gcs_project_id).bucket(
self.config.storage_prod_bucket_name)
blob = bucket.blob(
f'{import_dir}/{self.config.storage_version_filename}')
if not blob:
logging.error(
f'Not able to download latest_version.txt from {folder}, skipping validation.'
)
return ''
latest_version = blob.download_as_text()
blob = bucket.blob(f'{import_dir}/{latest_version}')
if not blob:
logging.error(
f'Not able to download previous import from {latest_version}, skipping validation.'
)
return ''
return f'gs://{bucket.name}/{blob.name}'

def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
absolute_import_dir: str,
import_spec: dict) -> None:
absolute_import_dir: str, import_spec: dict,
version: str) -> bool:
"""
Performs validations on import data.
"""
config_file = import_spec.get('validation_config_file', '')
if config_file:
config_file_path = os.path.join(absolute_import_dir, config_file)
else:
config_file_path = os.path.join(repo_dir,
self.config.validation_config_file)
logging.info(f'Validation config file: {config_file_path}')

import_dir = f'{relative_import_dir}/{import_spec["import_name"]}'
latest_version = self._get_latest_version(import_dir)
logging.info(f'Latest version: {latest_version}')

# Trigger validations for each tmcf/csv under import_inputs.
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
mcf_path = import_input['node_mcf']
if not mcf_path:
# TODO: Generate node mcf using dc-import tool
logging.error(
'Empty node_mcf in manifest, skipping validation.')
current_data_path = os.path.join(absolute_import_dir, mcf_path)
previous_data_path = os.path.join(absolute_import_dir,
'previous_data.mcf')
summary_stats = os.path.join(absolute_import_dir,
'summary_report.csv')
import_prefix = import_input['template_mcf'].split('.')[0]
validation_output_path = os.path.join(absolute_import_dir,
'validation')
config_file = import_spec.get('validation_config_file', '')
if config_file:
config_file_path = os.path.join(absolute_import_dir,
config_file)
import_prefix, 'validation')
current_data_path = os.path.join(validation_output_path, '*.mcf')
previous_data_path = latest_version + f'/{import_prefix}/validation/*.mcf'
summary_stats = os.path.join(validation_output_path,
'summary_report.csv')
validation_output_file = os.path.join(validation_output_path,
'validation_output.csv')
differ_output = os.path.join(validation_output_path,
'point_analysis_summary.csv')

# Run dc import tool to generate resolved mcf.
logging.info('Generating resolved mcf...')
import_tool_args = [
f'-o={validation_output_path}', 'genmcf',
import_input['template_mcf'], import_input['cleaned_csv']
]
process = _run_user_script(
interpreter_path='java',
script_path='-jar ' + self.config.import_tool_path,
timeout=self.config.user_script_timeout,
args=import_tool_args,
cwd=absolute_import_dir,
env={},
)
_log_process(process=process)
process.check_returncode()
logging.info('Generated resolved mcf')

if latest_version:
# Invoke differ and validation scripts.
logging.info('Invoking differ tool...')
differ = ImportDiffer(current_data_path, previous_data_path,
validation_output_path)
differ.run_differ()

logging.info('Invoking validation script...')
validation = ImportValidation(config_file_path, differ_output,
summary_stats,
validation_output_file)
status = validation.run_validations()
else:
config_file_path = os.path.join(
repo_dir, self.config.validation_config_file)
logging.info(f'Validation config file: {config_file_path}')

# Download previous import data.
bucket = storage.Client(self.config.gcs_project_id).bucket(
self.config.storage_prod_bucket_name)
folder = relative_import_dir + '/' + import_spec['import_name'] + '/'
blob = bucket.blob(folder + 'latest_version.txt')
if not blob:
logging.error(
f'Not able to download latest_version.txt from {folder}, skipping validation.'
)
return
latest_version = blob.download_as_text()
blob = bucket.blob(folder + latest_version + '/' + mcf_path)
if not blob:
logging.error(
f'Not able to download previous import from {latest_version}, skipping validation.'
)
return
# blob.download_to_filename(previous_data_path)
'Skipping validation due to missing latest version.')

# Invoke differ script.
differ = ImportDiffer(current_data_path, previous_data_path,
validation_output_path)
differ.run_differ()
if not self.config.skip_gcs_upload:
# Upload output to GCS.
gcs_output = f'{import_dir}/{version}/{import_prefix}/validation'
logging.info(
f'Uploading validation output to GCS path: {gcs_output}')
for filename in os.listdir(validation_output_path):
filepath = os.path.join(validation_output_path, filename)
if os.path.isfile(filepath):
dest = f'{gcs_output}/{filename}'
self.uploader.upload_file(
src=filepath,
dest=dest,
)

# Invoke validation script.
validation_output = os.path.join(validation_output_path,
'validation_output.csv')
differ_output = os.path.join(validation_output_path,
'point_analysis_summary.csv')
validation = ImportValidation(config_file_path, differ_output,
summary_stats, validation_output)
validation.run_validations()
return status

def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict,
version: str, interpreter_path: str,
Expand Down Expand Up @@ -468,11 +509,12 @@ def _import_one_helper(

if self.config.invoke_import_validation:
logging.info("Invoking import validations")
self._invoke_import_validation(
validation_status = self._invoke_import_validation(
repo_dir=repo_dir,
relative_import_dir=relative_import_dir,
absolute_import_dir=absolute_import_dir,
import_spec=import_spec)
import_spec=import_spec,
version=version)

if self.config.skip_gcs_upload:
logging.info("Skipping GCS upload")
Expand All @@ -483,15 +525,7 @@ def _import_one_helper(
output_dir=f'{relative_import_dir}/{import_name}',
version=version,
import_spec=import_spec,
)

validation_output_path = os.path.join(absolute_import_dir, 'validation')
for filepath in glob.iglob(f'{validation_output_path}/*.csv'):
dest = f'{relative_import_dir}/{import_name}/{version}/validation/{os.path.basename(filepath)}'
self.uploader.upload_file(
src=filepath,
dest=dest,
)
validation_status=validation_status)

if self.importer:
self.importer.delete_previous_output(relative_import_dir,
Expand All @@ -517,12 +551,9 @@ def _import_one_helper(
)

def _upload_import_inputs(
self,
import_dir: str,
output_dir: str,
version: str,
import_spec: dict,
) -> import_service.ImportInputs:
self, import_dir: str, output_dir: str, version: str,
import_spec: dict,
validation_status: bool) -> import_service.ImportInputs:
"""Uploads the generated import data files.
Data files are uploaded to <output_dir>/<version>/, where <version> is a
Expand Down Expand Up @@ -569,12 +600,17 @@ def _upload_import_inputs(
dest=dest,
)

self.uploader.upload_string(
version,
os.path.join(output_dir, self.config.storage_version_filename))
self.uploader.upload_string(
self._import_metadata_mcf_helper(import_spec),
os.path.join(output_dir, self.config.import_metadata_mcf_filename))
if self.config.ignore_validation_status or validation_status:
self.uploader.upload_string(
version,
os.path.join(output_dir, self.config.storage_version_filename))
self.uploader.upload_string(
self._import_metadata_mcf_helper(import_spec),
os.path.join(output_dir,
self.config.import_metadata_mcf_filename))
else:
logging.error(
"Skipping latest version update due to validation failure.")
return uploaded

def _upload_file_helper(self, src: str, dest: str) -> None:
Expand Down
7 changes: 6 additions & 1 deletion tools/import_validation/import_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,20 @@ def _run_validation(self, config) -> ValidationResult:
logging.error(repr(exc))
return ValidationResult('FAILED', config['validation'], repr(exc))

def run_validations(self):
def run_validations(self) -> bool:
# Returns false if any validation fails.
output_file = open(self.validation_output, mode='w', encoding='utf-8')
output_file.write('test,status,message\n')
status = True
for config in self.validation_config:
result = self._run_validation(config)
output_file.write(
f'{result.name},{result.status},{result.message}\n')
self.validation_result.append(result)
if result.status == 'FAILED':
status = False
output_file.close()
return status


def main(_):
Expand Down

0 comments on commit 63babc5

Please sign in to comment.