Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import validation updates #1227

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion import-automation/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ ENV PATH="${JAVA_HOME}/bin:${PATH}"

RUN git clone https://github.com/datacommonsorg/data.git
WORKDIR /data/import-automation/executor
RUN wget https://storage.googleapis.com/datacommons_public/import_tools/datacommons-import-tool-0.1-alpha.1-jar-with-dependencies.jar
RUN wget https://storage.googleapis.com/datacommons_public/import_tools/import-tool.jar
RUN pip install -r requirements.txt
ENTRYPOINT ["python", "main.py"]
6 changes: 5 additions & 1 deletion import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class ExecutorConfig:
scheduler_location: str = 'us-central1'
# Location of the local git data repo.
local_repo_dir: str = '/data'
# Location of the import tool jar.
import_tool_path: str = '/data/import-automation/executor/import-tool.jar'
# Maximum time a user script can run for in seconds.
user_script_timeout: float = 3600
# Arguments for the user script
Expand All @@ -117,7 +119,9 @@ class ExecutorConfig:
user_script_env: dict = None
# Invoke validations before upload.
invoke_import_validation: bool = False
# Import validation config file.
# Ignore validation status during import.
ignore_validation_status: bool = True
# Import validation config file path (relative to data repo).
validation_config_file: str = 'tools/import_validation/validation_config.json'
# Maximum time venv creation can take in seconds.
venv_create_timeout: float = 3600
Expand Down
187 changes: 114 additions & 73 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,67 +332,113 @@ def _import_one(
)
raise exc

def _get_latest_version(self, import_dir: str) -> str:
vish-cs marked this conversation as resolved.
Show resolved Hide resolved
"""
Find previous import data in GCS.
Returns:
GCS path for the latest import data.

"""
bucket = storage.Client(self.config.gcs_project_id).bucket(
self.config.storage_prod_bucket_name)
blob = bucket.get_blob(
f'{import_dir}/{self.config.storage_version_filename}')
if not blob or not blob.download_as_text():
logging.error(
f'Not able to find latest_version.txt in {folder}, skipping validation.'
)
return ''
latest_version = blob.download_as_text()
vish-cs marked this conversation as resolved.
Show resolved Hide resolved
blob = bucket.get_blob(f'{import_dir}/{latest_version}')
if not blob:
logging.error(
f'Not able to find previous import in {latest_version}, skipping validation.'
)
return ''
return f'gs://{bucket.name}/{blob.name}'

def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
absolute_import_dir: str,
import_spec: dict) -> None:
absolute_import_dir: str, import_spec: dict,
version: str) -> bool:
"""
Performs validations on import data.
"""
config_file = import_spec.get('validation_config_file', '')
if config_file:
config_file_path = os.path.join(absolute_import_dir, config_file)
else:
config_file_path = os.path.join(repo_dir,
self.config.validation_config_file)
logging.info(f'Validation config file: {config_file_path}')

import_dir = f'{relative_import_dir}/{import_spec["import_name"]}'
latest_version = self._get_latest_version(import_dir)
logging.info(f'Latest version: {latest_version}')

# Trigger validations for each tmcf/csv under import_inputs.
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
mcf_path = import_input['node_mcf']
if not mcf_path:
# TODO: Generate node mcf using dc-import tool
logging.error(
'Empty node_mcf in manifest, skipping validation.')
current_data_path = os.path.join(absolute_import_dir, mcf_path)
previous_data_path = os.path.join(absolute_import_dir,
'previous_data.mcf')
summary_stats = os.path.join(absolute_import_dir,
'summary_report.csv')
import_prefix = import_input['template_mcf'].split('.')[0]
validation_output_path = os.path.join(absolute_import_dir,
'validation')
config_file = import_spec.get('validation_config_file', '')
if config_file:
config_file_path = os.path.join(absolute_import_dir,
config_file)
import_prefix, 'validation')
current_data_path = os.path.join(validation_output_path, '*.mcf')
previous_data_path = latest_version + f'/{import_prefix}/validation/*.mcf'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous data path might not contain mcfs (for already onbaorded auto-imports). Should we skip if mcfs are missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are uploading mcfs as part of the import output upload step now. In that case, why shall we not find mcfs in GCS?

summary_stats = os.path.join(validation_output_path,
'summary_report.csv')
validation_output_file = os.path.join(validation_output_path,
'validation_output.csv')
differ_output = os.path.join(validation_output_path,
'point_analysis_summary.csv')

# Run dc import tool to generate resolved mcf.
logging.info('Generating resolved mcf...')
vish-cs marked this conversation as resolved.
Show resolved Hide resolved
import_tool_args = [
f'-o={validation_output_path}', 'genmcf',
import_input['template_mcf'], import_input['cleaned_csv']
]
process = _run_user_script(
interpreter_path='java',
script_path='-jar ' + self.config.import_tool_path,
timeout=self.config.user_script_timeout,
args=import_tool_args,
cwd=absolute_import_dir,
env={},
)
_log_process(process=process)
process.check_returncode()
logging.info('Generated resolved mcf in %s', validation_output_path)

if latest_version:
# Invoke differ and validation scripts.
logging.info('Invoking differ tool...')
differ = ImportDiffer(current_data_path, previous_data_path,
validation_output_path)
differ.run_differ()

logging.info('Invoking validation script...')
validation = ImportValidation(config_file_path, differ_output,
summary_stats,
validation_output_file)
status = validation.run_validations()
else:
config_file_path = os.path.join(
repo_dir, self.config.validation_config_file)
logging.info(f'Validation config file: {config_file_path}')

# Download previous import data.
bucket = storage.Client(self.config.gcs_project_id).bucket(
self.config.storage_prod_bucket_name)
folder = relative_import_dir + '/' + import_spec['import_name'] + '/'
blob = bucket.blob(folder + 'latest_version.txt')
if not blob:
logging.error(
f'Not able to download latest_version.txt from {folder}, skipping validation.'
)
return
latest_version = blob.download_as_text()
blob = bucket.blob(folder + latest_version + '/' + mcf_path)
if not blob:
logging.error(
f'Not able to download previous import from {latest_version}, skipping validation.'
)
return
# blob.download_to_filename(previous_data_path)
'Skipping validation due to missing latest version.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the summary_report.csv be validated without the differ for the first run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will require some logic to only run certain validations (and skip the differ ones). We can do that as a follow up as needed.


# Invoke differ script.
differ = ImportDiffer(current_data_path, previous_data_path,
validation_output_path)
differ.run_differ()
if not self.config.skip_gcs_upload:
# Upload output to GCS.
gcs_output = f'{import_dir}/{version}/{import_prefix}/validation'
logging.info(
f'Uploading validation output to GCS path: {gcs_output}')
for filename in os.listdir(validation_output_path):
filepath = os.path.join(validation_output_path, filename)
if os.path.isfile(filepath):
dest = f'{gcs_output}/{filename}'
self.uploader.upload_file(
src=filepath,
dest=dest,
)

# Invoke validation script.
validation_output = os.path.join(validation_output_path,
'validation_output.csv')
differ_output = os.path.join(validation_output_path,
'point_analysis_summary.csv')
validation = ImportValidation(config_file_path, differ_output,
summary_stats, validation_output)
validation.run_validations()
return status

def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict,
version: str, interpreter_path: str,
Expand Down Expand Up @@ -468,11 +514,12 @@ def _import_one_helper(

if self.config.invoke_import_validation:
logging.info("Invoking import validations")
self._invoke_import_validation(
validation_status = self._invoke_import_validation(
repo_dir=repo_dir,
relative_import_dir=relative_import_dir,
absolute_import_dir=absolute_import_dir,
import_spec=import_spec)
import_spec=import_spec,
version=version)

if self.config.skip_gcs_upload:
logging.info("Skipping GCS upload")
Expand All @@ -483,15 +530,7 @@ def _import_one_helper(
output_dir=f'{relative_import_dir}/{import_name}',
version=version,
import_spec=import_spec,
)

validation_output_path = os.path.join(absolute_import_dir, 'validation')
for filepath in glob.iglob(f'{validation_output_path}/*.csv'):
dest = f'{relative_import_dir}/{import_name}/{version}/validation/{os.path.basename(filepath)}'
self.uploader.upload_file(
src=filepath,
dest=dest,
)
validation_status=validation_status)

if self.importer:
self.importer.delete_previous_output(relative_import_dir,
Expand All @@ -517,12 +556,9 @@ def _import_one_helper(
)

def _upload_import_inputs(
self,
import_dir: str,
output_dir: str,
version: str,
import_spec: dict,
) -> import_service.ImportInputs:
self, import_dir: str, output_dir: str, version: str,
import_spec: dict,
validation_status: bool) -> import_service.ImportInputs:
"""Uploads the generated import data files.

Data files are uploaded to <output_dir>/<version>/, where <version> is a
Expand Down Expand Up @@ -569,12 +605,17 @@ def _upload_import_inputs(
dest=dest,
)

self.uploader.upload_string(
version,
os.path.join(output_dir, self.config.storage_version_filename))
self.uploader.upload_string(
self._import_metadata_mcf_helper(import_spec),
os.path.join(output_dir, self.config.import_metadata_mcf_filename))
if self.config.ignore_validation_status or validation_status:
self.uploader.upload_string(
version,
os.path.join(output_dir, self.config.storage_version_filename))
self.uploader.upload_string(
self._import_metadata_mcf_helper(import_spec),
os.path.join(output_dir,
self.config.import_metadata_mcf_filename))
else:
logging.error(
"Skipping latest version update due to validation failure.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be better to upload output to GCS even if validation fails to help with debugging.
We can skip updating latest_version.txt if validation fails so the next diff is with the previous valid output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the case actually. We are uploading validation output files as the last step in _invoke_import_validation but not updating latest_version in case of failure.

return uploaded

def _upload_file_helper(self, src: str, dest: str) -> None:
Expand Down
40 changes: 23 additions & 17 deletions tools/import_validation/import_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import os
import json

FLAGS = flags.FLAGS
_FLAGS = flags.FLAGS
flags.DEFINE_string('validation_config_file', 'validation_config.json',
'Path to the validation config file.')
flags.DEFINE_string('differ_output_location', '.',
Expand All @@ -31,9 +31,9 @@
flags.DEFINE_string('validation_output_location', '.',
'Path to the validation output folder.')

POINT_ANALAYSIS_FILE = 'point_analysis_summary.csv'
STATS_SUMMARY_FILE = 'summary_report.csv'
VALIDATION_OUTPUT_FILE = 'validation_output.csv'
_POINT_ANALAYSIS_FILE = 'point_analysis_summary.csv'
_STATS_SUMMARY_FILE = 'summary_report.csv'
_VALIDATION_OUTPUT_FILE = 'validation_output.csv'

Validation = Enum('Validation', [
('MODIFIED_COUNT', 1),
Expand Down Expand Up @@ -116,23 +116,29 @@ def _run_validation(self, config) -> ValidationResult:
logging.error(repr(exc))
return ValidationResult('FAILED', config['validation'], repr(exc))

def run_validations(self):
output_file = open(self.validation_output, mode='w', encoding='utf-8')
output_file.write('test,status,message\n')
for config in self.validation_config:
result = self._run_validation(config)
output_file.write(
f'{result.name},{result.status},{result.message}\n')
self.validation_result.append(result)
output_file.close()
def run_validations(self) -> bool:
# Returns false if any validation fails.
status = True
with open(self.validation_output, mode='w',
encoding='utf-8') as output_file:
output_file.write('test,status,message\n')
for config in self.validation_config:
result = self._run_validation(config)
output_file.write(
f'{result.name},{result.status},{result.message}\n')
self.validation_result.append(result)
if result.status == 'FAILED':
status = False
return status


def main(_):
validation = ImportValidation(
FLAGS.validation_config_file,
os.path.join(FLAGS.differ_output_location, POINT_ANALAYSIS_FILE),
os.path.join(FLAGS.stats_summary_location, STATS_SUMMARY_FILE),
os.paht.join(FLAGS.validation_output_location, VALIDATION_OUTPUT_FILE))
_FLAGS.validation_config_file,
os.path.join(_FLAGS.differ_output_location, _POINT_ANALAYSIS_FILE),
os.path.join(_FLAGS.stats_summary_location, _STATS_SUMMARY_FILE),
os.paht.join(_FLAGS.validation_output_location,
_VALIDATION_OUTPUT_FILE))
validation.run_validations()


Expand Down
Loading