From 446e43ef39da037d5cccd6d1eccb16dc3278b087 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Mon, 4 Nov 2024 15:51:52 -0600 Subject: [PATCH] Modularize python script --- .../update_source_data_schema_changelog.yml | 26 +--- scripts/get_source_data_schema_changelog.py | 111 -------------- .../update_source_data_schema_changelog.py | 144 ++++++++++++++++++ 3 files changed, 151 insertions(+), 130 deletions(-) delete mode 100644 scripts/get_source_data_schema_changelog.py create mode 100644 scripts/update_source_data_schema_changelog.py diff --git a/.github/workflows/update_source_data_schema_changelog.yml b/.github/workflows/update_source_data_schema_changelog.yml index f49d5dfe..1418c41e 100644 --- a/.github/workflows/update_source_data_schema_changelog.yml +++ b/.github/workflows/update_source_data_schema_changelog.yml @@ -19,37 +19,25 @@ jobs: - name: Setup Python uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.12 - name: Run Bash Script run: | set -x cd $GITHUB_WORKSPACE + OLD_SCHEMAS_DIR=original_schemas cp -r schemas/ new_schemas/ git clone --branch master https://github.com/stellar/stellar-etl-airflow.git repo_master_copy - cp -r repo_master_copy/schemas/ original_schemas/ - output=$(python3 scripts/get_source_data_schema_changelog.py) - if [[ -n "$output" ]]; then - existing_changelog=$( changelog/source_data.md - echo "$existing_changelog" >> changelog/source_data.md - fi + cp -r repo_master_copy/schemas/ OLD_SCHEMAS_DIR + export OLD_SCHEMAS_DIR + python3 scripts/update_source_data_schema_changelog.py - - name: Commit changes - id: commit_changes + - name: Commit and Push Changes run: | git config --local user.email "action@github.com" git config --local user.name "GitHub Action" git add changelog/source_data.md if git commit -m "Update changelog for Source data"; then echo "Changes committed." - echo "changes_committed=true" >> $GITHUB_OUTPUT - else - echo "No changes to commit." - echo "changes_committed=false" >> $GITHUB_OUTPUT + git push fi - - - name: Push branch - if: steps.commit_changes.outputs.changes_committed == 'true' - run: | - git push diff --git a/scripts/get_source_data_schema_changelog.py b/scripts/get_source_data_schema_changelog.py deleted file mode 100644 index a2664753..00000000 --- a/scripts/get_source_data_schema_changelog.py +++ /dev/null @@ -1,111 +0,0 @@ -import json -import os -from datetime import datetime - -old_schemas_dir = "original_schemas" -new_schemas_dir = "new_schemas" - -if not os.path.exists(old_schemas_dir): - print(f"Directory {old_schemas_dir} does not exist.") - exit(1) - -if not os.path.exists(new_schemas_dir): - print(f"Directory {new_schemas_dir} does not exist.") - exit(1) - - -def read_json_file(filepath): - with open(filepath, "r") as f: - return json.load(f) - - -new_schemas = [file.replace("_schema.json", "") for file in os.listdir(new_schemas_dir)] -old_schemas = [file.replace("_schema.json", "") for file in os.listdir(old_schemas_dir)] - -tables_added = [model for model in new_schemas if model not in old_schemas] -tables_removed = [model for model in old_schemas if model not in new_schemas] -common_tables = [model for model in new_schemas if model in old_schemas] - -schema_changes = {} - -for schema in common_tables: - old_file_path = os.path.join(old_schemas_dir, schema + "_schema.json") - new_file_path = os.path.join(new_schemas_dir, schema + "_schema.json") - - new_data = read_json_file(new_file_path) - old_data = read_json_file(old_file_path) - - old_dict = {item["name"]: item for item in old_data} - new_dict = {item["name"]: item for item in new_data} - - added = [new_dict[name]["name"] for name in new_dict if name not in old_dict] - deleted = [old_dict[name]["name"] for name in old_dict if name not in new_dict] - type_changed = [ - (new_dict[name]["name"], new_dict[name]["type"], old_dict[name]["type"]) - for name in new_dict - if name in old_dict and new_dict[name]["type"] != old_dict[name]["type"] - ] - - if added: - if schema not in schema_changes: - schema_changes[schema] = {} - schema_changes[schema]["column_added"] = added - - if deleted: - if schema not in schema_changes: - schema_changes[schema] = {} - schema_changes[schema]["column_removed"] = deleted - - if type_changed: - if schema not in schema_changes: - schema_changes[schema] = {} - schema_changes[schema]["type_changed"] = type_changed - -if tables_added or tables_removed or schema_changes: - current_date = datetime.now().strftime("%Y-%m-%d") - print("") - print(f"## {current_date}") - -if tables_added: - print("") - print("## Tables Added:") - print([table for table in tables_added]) - -if tables_removed: - print("") - print("### Tables Removed:") - print([table for table in tables_removed]) - - -def sort_schema_changes(changes): - sorted_data = {} - - for table_name in sorted(changes.keys()): - sorted_operations = { - op_type: sorted(changes[table_name][op_type]) - for op_type in sorted(changes[table_name].keys()) - } - sorted_data[table_name] = sorted_operations - return sorted_data - - -if schema_changes: - sorted_schema_changes = sort_schema_changes(schema_changes) - print("") - print("### Schema Changes:") - - markdown_table = "| Table Name | Operation | Columns |\n" - markdown_table += "|---------------------------------|---------------|--------------------------|\n" - - for table_name, operations in sorted_schema_changes.items(): - for operation, columns in operations.items(): - if operation in ["column_added", "column_removed"]: - columns_str = ", ".join(columns) - if operation in ["type_changed"]: - columns_str = ", ".join( - [f"{column[0]} ({column[2]} -> {column[1]})" for column in columns] - ) - markdown_table += ( - f"| {table_name:<33} | {operation:<13} | {columns_str:<24} |\n" - ) - print(markdown_table) diff --git a/scripts/update_source_data_schema_changelog.py b/scripts/update_source_data_schema_changelog.py new file mode 100644 index 00000000..af28ced2 --- /dev/null +++ b/scripts/update_source_data_schema_changelog.py @@ -0,0 +1,144 @@ +import os +import sys +import json +from datetime import datetime + +OLD_SCHEMAS_DIR = os.environ.get(OLD_SCHEMAS_DIR) +NEW_SCHEMAS_DIR = "schemas" +CHANGELOG_FILEPATH = "changelog/source_data.md" + +def read_json_file(filepath: str) -> {}: + with open(filepath, "r") as rfp: + try: + return json.load(rfp) + except: + sys.exit(f"Not a valid JSON at filepath {filepath}.") + +def read_file(filepath: str) -> str: + with open(filepath, "r") as rfp: + return rfp.read() + +def write_file(filepath: str, content: str, mode="a") -> None: + with open(filepath, mode) as wfp: + wfp.write(content) + +def sort_schema_changes(schema_changes: {}) -> {}: + sorted_data = {} + + for table_name in sorted(schema_changes.keys()): + sorted_operations = { + op_type: sorted(schema_changes[table_name][op_type]) + for op_type in sorted(schema_changes[table_name].keys()) + } + sorted_data[table_name] = sorted_operations + return sorted_data + + +def get_filepaths(directory: str) -> []: + if not os.path.exists(directory): + sys.exit(f"Directory {directory} does not exist.") + return os.listdir(directory) + +def compare_lists(old_list=[], new_list=[]): + old_set = set(old_list) + new_set = set(new_list) + + common = old_set & new_set + added = new_set - old_set + deleted = old_set - new_set + + return list(common), list(added), list(deleted) + +def get_mapped_schema_json(directory: str, schema_name: str) -> {}: + schema_json = read_json_file(os.path.join(directory, schema_name)) + schema_json_by_col_name = {column["name"]: column for column in schema_json} + return schema_json_by_col_name + +def compare_schemas(schemas=[]) -> {}: + schema_changes = {} + + for schema in schemas: + old_schema_json_by_col_name = get_mapped_schema_json(directory=OLD_SCHEMAS_DIR, schema_name=schema) + new_schema_json_by_col_name = get_mapped_schema_json(directory=NEW_SCHEMAS_DIR, schema_name=schema) + + common, added, deleted = compare_lists(old_list=old_schema_json_by_col_name.keys(), new_list=new_schema_json_by_col_name.keys()) + + type_changed = [ + (col, old_schema_json_by_col_name[col]["type"], new_schema_json_by_col_name[col]["type"]) for col in common if old_schema_json_by_col_name[col]["type"] != new_schema_json_by_col_name[col]["type"] + ] + + if added or deleted or type_changed: + schema_changes[schema] = { + "column_added": added, + "column_removed": deleted, + "type_changed": type_changed, + } + + return schema_changes + +def print_label(label: str) -> str: + return f"\n## {label}:\n" + +def print_schemas(label="", schemas=[]) -> str: + print_string = "" + if not len(schemas): + return print_string + + print_string += print_label(label) + for schema in schemas: + print_string += f"- {schema}" + return print_string + +def print_schema_changes(label="", schema_changes={}) -> str: + print_string = "" + if not schema_changes: + return print_string + + print_string += print_label(label) + + markdown_table = "| Table Name | Operation | Columns |\n" + markdown_table += "|---------------------------------|---------------|--------------------------|\n" + + for table_name, operations in schema_changes.items(): + for operation, columns in operations.items(): + if len(columns): + if operation in ["column_added", "column_removed"]: + columns_str = ", ".join(columns) + if operation in ["type_changed"]: + columns_str = ", ".join( + [f"{column[0]} ({column[1]} -> {column[2]})" for column in columns] + ) + markdown_table += ( + f"| {table_name:<33} | {operation:<15} | {columns_str:<50} |\n" + ) + print_string += markdown_table + return print_string + +def generate_changelog(schemas_added=[], schemas_deleted=[], schemas_changes={}) -> str: + new_changelog = "" + if schemas_added or schemas_deleted or schemas_changes: + current_date = datetime.now().strftime("%Y-%m-%d") + new_changelog += print_label(current_date) + + new_changelog += print_schemas(label="Tables Added", schemas=schemas_added) + new_changelog += print_schemas(label="Tables Deleted", schemas=schemas_deleted) + + sorted_schema_changes = sort_schema_changes(schemas_changes) + new_changelog += print_schema_changes(label="Schema Changes", schema_changes=sorted_schema_changes) + return new_changelog + +def main(): + existing_changelog = read_file(filepath=CHANGELOG_FILEPATH) + old_schema_filepaths = get_filepaths(directory=OLD_SCHEMAS_DIR) + new_schema_filepaths = get_filepaths(directory=NEW_SCHEMAS_DIR) + + common, added, deleted = compare_lists(old_list=old_schema_filepaths, new_list=new_schema_filepaths) + schema_changes = compare_schemas(common) + new_changelog = generate_changelog(schemas_changes=schema_changes, schemas_added=added, schemas_deleted=deleted) + + if len(new_changelog): + write_file(filepath=CHANGELOG_FILEPATH, mode="w", content=new_changelog + '\n\n') + write_file(filepath=CHANGELOG_FILEPATH, mode="a", content=existing_changelog) + +if __name__ == "__main__": + main()