diff --git a/import-automation/executor/Dockerfile b/import-automation/executor/Dockerfile index c62d6e043..251fc610f 100644 --- a/import-automation/executor/Dockerfile +++ b/import-automation/executor/Dockerfile @@ -25,6 +25,6 @@ ENV PATH="${JAVA_HOME}/bin:${PATH}" RUN git clone https://github.com/datacommonsorg/data.git WORKDIR /data/import-automation/executor -RUN wget https://storage.googleapis.com/datacommons_public/import_tools/datacommons-import-tool-0.1-alpha.1-jar-with-dependencies.jar +RUN wget https://storage.googleapis.com/datacommons_public/import_tools/import-tool.jar RUN pip install -r requirements.txt ENTRYPOINT ["python", "main.py"] diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index 96973f6e5..135414d6a 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -110,7 +110,7 @@ class ExecutorConfig: # Location of the local git data repo. local_repo_dir: str = '/data' # Location of the import tool jar. - import_tool_path: str = '/data/import-automation/executor/datacommons-import-tool-0.1-alpha.1-jar-with-dependencies.jar' + import_tool_path: str = '/data/import-automation/executor/import-tool.jar' # Maximum time a user script can run for in seconds. user_script_timeout: float = 3600 # Arguments for the user script @@ -121,7 +121,7 @@ class ExecutorConfig: invoke_import_validation: bool = False # Ignore validation status during import. ignore_validation_status: bool = True - # Import validation config file. + # Import validation config file path (relative to data repo). validation_config_file: str = 'tools/import_validation/validation_config.json' # Maximum time venv creation can take in seconds. venv_create_timeout: float = 3600 diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 37f163dcd..12fdd13ed 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -333,21 +333,26 @@ def _import_one( raise exc def _get_latest_version(self, import_dir: str) -> str: - # Download previous import data. + """ + Find previous import data in GCS. + Returns: + GCS path for the latest import data. + + """ bucket = storage.Client(self.config.gcs_project_id).bucket( self.config.storage_prod_bucket_name) - blob = bucket.blob( + blob = bucket.get_blob( f'{import_dir}/{self.config.storage_version_filename}') - if not blob: + if not blob or not blob.download_as_text(): logging.error( - f'Not able to download latest_version.txt from {folder}, skipping validation.' + f'Not able to find latest_version.txt in {folder}, skipping validation.' ) return '' latest_version = blob.download_as_text() - blob = bucket.blob(f'{import_dir}/{latest_version}') + blob = bucket.get_blob(f'{import_dir}/{latest_version}') if not blob: logging.error( - f'Not able to download previous import from {latest_version}, skipping validation.' + f'Not able to find previous import in {latest_version}, skipping validation.' ) return '' return f'gs://{bucket.name}/{blob.name}' @@ -401,7 +406,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, ) _log_process(process=process) process.check_returncode() - logging.info('Generated resolved mcf') + logging.info('Generated resolved mcf in %s', validation_output_path) if latest_version: # Invoke differ and validation scripts. diff --git a/tools/import_validation/import_validation.py b/tools/import_validation/import_validation.py index ff1a7136b..8d5a4cde0 100644 --- a/tools/import_validation/import_validation.py +++ b/tools/import_validation/import_validation.py @@ -21,7 +21,7 @@ import os import json -FLAGS = flags.FLAGS +_FLAGS = flags.FLAGS flags.DEFINE_string('validation_config_file', 'validation_config.json', 'Path to the validation config file.') flags.DEFINE_string('differ_output_location', '.', @@ -118,26 +118,26 @@ def _run_validation(self, config) -> ValidationResult: def run_validations(self) -> bool: # Returns false if any validation fails. - output_file = open(self.validation_output, mode='w', encoding='utf-8') - output_file.write('test,status,message\n') status = True - for config in self.validation_config: - result = self._run_validation(config) - output_file.write( - f'{result.name},{result.status},{result.message}\n') - self.validation_result.append(result) - if result.status == 'FAILED': - status = False - output_file.close() + with open(self.validation_output, mode='w', + encoding='utf-8') as output_file: + output_file.write('test,status,message\n') + for config in self.validation_config: + result = self._run_validation(config) + output_file.write( + f'{result.name},{result.status},{result.message}\n') + self.validation_result.append(result) + if result.status == 'FAILED': + status = False return status def main(_): validation = ImportValidation( - FLAGS.validation_config_file, - os.path.join(FLAGS.differ_output_location, POINT_ANALAYSIS_FILE), - os.path.join(FLAGS.stats_summary_location, STATS_SUMMARY_FILE), - os.paht.join(FLAGS.validation_output_location, VALIDATION_OUTPUT_FILE)) + _FLAGS.validation_config_file, + os.path.join(_FLAGS.differ_output_location, POINT_ANALAYSIS_FILE), + os.path.join(_FLAGS.stats_summary_location, STATS_SUMMARY_FILE), + os.paht.join(_FLAGS.validation_output_location, VALIDATION_OUTPUT_FILE)) validation.run_validations()