Skip to content

Commit

Permalink
Invoke validations from import executor
Browse files Browse the repository at this point in the history
  • Loading branch information
vish-cs committed Dec 27, 2024
1 parent 0df1d01 commit a586501
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 30 deletions.
6 changes: 6 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ class ExecutorConfig:
user_script_args: List[str] = ()
# Environment variables for the user script
user_script_env: dict = None
# Skip uploading the data to GCS (for local testing).
skip_gcs_upload: bool = True
# Invoke validations before upload.
invoke_import_validation: bool = True
# Import validation config file.
validation_config_file: str = 'tools/validation/config.json'
# Maximum time venv creation can take in seconds.
venv_create_timeout: float = 3600
# Maximum time downloading a file can take in seconds.
Expand Down
142 changes: 112 additions & 30 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from app.service import file_uploader
from app.service import github_api
from app.service import import_service
from google.cloud import storage

# Email address for status messages.
_DEBUG_EMAIL_ADDR = '[email protected]'
Expand Down Expand Up @@ -318,6 +319,100 @@ def _import_one(
)
raise exc

def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
absolute_import_dir: str, import_spec: dict,
interpreter_path: str,
process: subprocess.CompletedProcess) -> None:
"""
Performs validations on import data.
"""
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
mcf_path = import_input['node_mcf']
current_data_path = os.path.join(absolute_import_dir, mcf_path)
previous_data_path = os.path.join(absolute_import_dir,
mcf_path) + '.old'
differ_results_path = os.path.join(absolute_import_dir, 'results')
config_file_path = os.path.join(absolute_import_dir,
self.config.validation_config_file)

# Download previous import data.
bucket = storage.Client(self.config.gcs_project_id).bucket(
self.config.storage_prod_bucket_name)
folder = relative_import_dir + '/' + import_spec['import_name'] + '/'
blob = bucket.blob(folder + 'latest_version.txt')
blob = bucket.blob(folder + blob.download_as_text() + '/' +
mcf_path)
blob.download_to_filename(previous_data_path)

# Invoke data differ script.
differ_script_path = os.path.join(repo_dir, 'tools', 'differ',
'differ.py')
differ_script_args: List[str] = ('--current_data=' +
current_data_path,
'--previous_data=' +
previous_data_path,
'--output_location=' +
differ_results_path)
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=differ_script_path,
timeout=self.config.user_script_timeout,
args=differ_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

# Invoke data validation script.
validation_script_path = os.path.join(repo_dir, 'tools',
'validation', 'validation.py')
validation_script_args: List[str] = ('--differ_output_location=' +
differ_results_path,
'--config_file=' +
config_file_path)
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=validation_script_path,
timeout=self.config.user_script_timeout,
args=validation_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict,
version: str, interpreter_path: str,
process: subprocess.CompletedProcess) -> None:
script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

def _import_one_helper(
self,
repo_dir: str,
Expand Down Expand Up @@ -351,36 +446,23 @@ def _import_one_helper(
_log_process(process=process)
process.check_returncode()

script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
script_interpreter = _get_script_interpreter(
script_path, interpreter_path)
process = _run_user_script(
interpreter_path=script_interpreter,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
name=import_name,
)
_log_process(process=process)
process.check_returncode()

self._invoke_import_job(absolute_import_dir=absolute_import_dir,
import_spec=import_spec,
version=version,
interpreter_path=interpreter_path,
process=process)

if self.config.invoke_import_validation:
self._invoke_import_validation(
repo_dir=repo_dir,
relative_import_dir=relative_import_dir,
absolute_import_dir=absolute_import_dir,
import_spec=import_spec,
interpreter_path=interpreter_path,
process=process)

if self.config.skip_gcs_upload:
return
inputs = self._upload_import_inputs(
import_dir=absolute_import_dir,
output_dir=f'{relative_import_dir}/{import_name}',
Expand Down

0 comments on commit a586501

Please sign in to comment.