Skip to content

Commit

Permalink
Extract deploy logic into seperate function
Browse files Browse the repository at this point in the history
Both deploy_command and recalculate_checksum use very similar body. Only
differences are:

1) Deploy deals with V, R, A migrations, while recalculation deals only with
R migrations.
2) Deploy also applies the script, while recalculation on updates
schemachangetables.
  • Loading branch information
riiwo committed Feb 5, 2024
1 parent a5267d2 commit 90395c8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 104 deletions.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Jinja2==3.0.0
pandas==1.3.0
PyYAML==5.4
snowflake-connector-python==2.8.0
PyYAML==6.0
snowflake-connector-python==3.6.0
139 changes: 39 additions & 100 deletions schemachange/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,7 @@ def record_change_script(self, script, script_content, change_history_table, exe
query = self._q_ch_log.format(**frmt_args)
self.execute_snowflake_query(query)


def deploy_command(config):
def setup_session(config):
req_args = set(['snowflake_account','snowflake_user','snowflake_role','snowflake_warehouse'])
validate_auth_config(config, req_args)

Expand All @@ -481,11 +480,9 @@ def deploy_command(config):
print(_log_config_details.format(**config))

#connect to snowflake and maintain connection
session = SnowflakeSchemachangeSession(config)

scripts_skipped = 0
scripts_applied = 0
return SnowflakeSchemachangeSession(config)

def calculate_repeatable_migration_checksum(config, session):
# Deal with the change history table (create if specified)
change_history_table = get_change_history_table_details(config['change_history_table'])
change_history_metadata = session.fetch_change_history_metadata(change_history_table)
Expand Down Expand Up @@ -515,15 +512,20 @@ def deploy_command(config):
max_published_version_display = 'None'
print(_log_ch_max_version.format(max_published_version_display=max_published_version_display))

# Find all scripts in the root folder (recursively) and sort them correctly
all_scripts = get_all_scripts_recursively(config['root_folder'], config['verbose'])
all_script_names = list(all_scripts.keys())
# Sort scripts such that versioned scripts get applied first and then the repeatable ones.
all_script_names_sorted = sorted_alphanumeric([script for script in all_script_names if script[0] == 'V']) \
+ sorted_alphanumeric([script for script in all_script_names if script[0] == 'R']) \
+ sorted_alphanumeric([script for script in all_script_names if script[0] == 'A'])
return [change_history_table, r_scripts_checksum, max_published_version]

def apply_scripts(config, all_scripts, all_script_names_sorted, apply = True):
session = setup_session(config)

scripts_skipped = 0
scripts_applied = 0

[
change_history_table,
r_scripts_checksum,
max_published_version
] = calculate_repeatable_migration_checksum(config, session)

# Loop through each script in order and apply any required changes
for script_name in all_script_names_sorted:
script = all_scripts[script_name]

Expand Down Expand Up @@ -557,26 +559,36 @@ def deploy_command(config):
scripts_skipped += 1
continue

print(_log_apply.format(**script))
if apply:
print(_log_apply.format(**script))
else:
print(_log_recalculate.format(**script))

if not config['dry_run']:
execution_time = session.apply_change_script(script, content, change_history_table)
execution_time = 0
if apply:
execution_time = session.apply_change_script(script, content, change_history_table)
session.record_change_script(script, content, change_history_table, execution_time)
scripts_applied += 1

print(_log_apply_set_complete.format(scripts_applied=scripts_applied, scripts_skipped=scripts_skipped))
return [scripts_skipped, scripts_applied]

def undo_command(config):
req_args = set(['snowflake_account','snowflake_user','snowflake_role','snowflake_warehouse', 'step'])
validate_auth_config(config, req_args)
def deploy_command(config):
# Find all scripts in the root folder (recursively) and sort them correctly
all_scripts = get_all_scripts_recursively(config['root_folder'], config['verbose'])
all_script_names = list(all_scripts.keys())
# Sort scripts such that versioned scripts get applied first and then the repeatable ones.
all_script_names_sorted = sorted_alphanumeric([script for script in all_script_names if script[0] == 'V']) \
+ sorted_alphanumeric([script for script in all_script_names if script[0] == 'R']) \
+ sorted_alphanumeric([script for script in all_script_names if script[0] == 'A'])

# Log some additional details
if config['dry_run']:
print("Running in dry-run mode")
print(_log_config_details.format(**config))
# Loop through each script in order and apply any required changes
[scripts_skipped, scripts_applied] = apply_scripts(config, all_scripts, all_script_names_sorted, True)

#connect to snowflake and maintain connection
session = SnowflakeSchemachangeSession(config)
print(_log_apply_set_complete.format(scripts_applied=scripts_applied, scripts_skipped=scripts_skipped))

def undo_command(config):
session = setup_session(config)

# Deal with the change history table (raise if not provided)
change_history_table = get_change_history_table_details(config['change_history_table'])
Expand Down Expand Up @@ -615,86 +627,13 @@ def undo_command(config):
print(_log_undo_set_complete.format(scripts_applied=scripts_applied))

def recalculate_checksum_command(config):
req_args = set(['snowflake_account','snowflake_user','snowflake_role','snowflake_warehouse'])
validate_auth_config(config, req_args)

# Log some additional details
if config['dry_run']:
print("Running in dry-run mode")
print(_log_config_details.format(**config))

#connect to snowflake and maintain connection
session = SnowflakeSchemachangeSession(config)

scripts_skipped = 0
scripts_applied = 0

# Deal with the change history table (create if specified)
change_history_table = get_change_history_table_details(config['change_history_table'])
change_history_metadata = session.fetch_change_history_metadata(change_history_table)
if change_history_metadata:
print(_log_ch_use.format(last_altered=change_history_metadata['last_altered'], **change_history_table))
elif config['create_change_history_table']:
# Create the change history table (and containing objects) if it don't exist.
if not config['dry_run']:
session.create_change_history_table_if_missing(change_history_table)
print(_log_ch_create.format(**change_history_table))
else:
raise ValueError(_err_ch_missing.format(**change_history_table))

# Find the max published version
max_published_version = ''

change_history = None
r_scripts_checksum = None
if (config['dry_run'] and change_history_metadata) or not config['dry_run']:
change_history = session.fetch_change_history(change_history_table)
r_scripts_checksum = session.fetch_r_scripts_checksum(change_history_table)

if change_history:
max_published_version = change_history[0]
max_published_version_display = max_published_version
if max_published_version_display == '':
max_published_version_display = 'None'
print(_log_ch_max_version.format(max_published_version_display=max_published_version_display))

# Find all scripts in the root folder (recursively) and sort them correctly
all_scripts = get_all_scripts_recursively(config['root_folder'], config['verbose'])
all_script_names = list(all_scripts.keys())
# Sort scripts such that versioned scripts get applied first and then the repeatable ones.
all_script_names_sorted = sorted_alphanumeric([script for script in all_script_names if script[0] == 'R'])

# Loop through each script in order and apply any required changes
for script_name in all_script_names_sorted:
script = all_scripts[script_name]

# Always process with jinja engine
jinja_processor = JinjaTemplateProcessor(project_root = config['root_folder'], modules_folder = config['modules_folder'])
content = jinja_processor.render(jinja_processor.relpath(script['script_full_path']), config['vars'], config['verbose'])

# Apply only R scripts where the checksum changed compared to the last execution of snowchange
if script_name[0] == 'R':
# Compute the checksum for the script
checksum_current = hashlib.sha224(content.encode('utf-8')).hexdigest()

# check if R file was already executed
if (r_scripts_checksum is not None) and script_name in list(r_scripts_checksum['script_name']):
checksum_last = list(r_scripts_checksum.loc[r_scripts_checksum['script_name'] == script_name, 'checksum'])[0]
else:
checksum_last = ''

# check if there is a change of the checksum in the script
if checksum_current == checksum_last:
if config['verbose']:
print(_log_skip_r.format(**script))
scripts_skipped += 1
continue

print(_log_recalculate.format(**script))

if not config['dry_run']:
session.record_change_script(script, content, change_history_table, 0)
scripts_applied += 1
[scripts_applied, scripts_skipped] = apply_scripts(config, all_scripts, all_script_names_sorted, False)

print(_log_apply_set_complete.format(scripts_applied=scripts_applied, scripts_skipped=scripts_skipped))

Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ classifiers =
packages = schemachange
python_requires = >=3.7
install_requires =
snowflake-connector-python~=2.8
snowflake-connector-python~=3.6
pandas~=1.3
pyyaml~=5.4
pyyaml~=6.0
jinja2~=3.0
include_package_data = True

Expand Down

0 comments on commit 90395c8

Please sign in to comment.