Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workflow to calculate source data schema diff #528

Merged
merged 31 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9d5d5d7
Add workflow to calculate source data schema diff
amishas157 Oct 30, 2024
ee6b61a
Fix branch name
amishas157 Oct 30, 2024
79121d1
Try cloning repo
amishas157 Oct 30, 2024
b1d124b
typo
amishas157 Oct 30, 2024
c3c1ab8
write regardless
amishas157 Oct 30, 2024
0021160
append
amishas157 Oct 30, 2024
60aeb0a
update script to also display date
amishas157 Oct 30, 2024
66767d1
Reformat the md file
amishas157 Oct 30, 2024
9053a28
More formatting and testng edge cases
amishas157 Oct 30, 2024
f099229
update script
amishas157 Oct 30, 2024
77a1e1c
revert back all schema changes
amishas157 Oct 30, 2024
262c108
field addition and deletion
amishas157 Oct 30, 2024
a53552f
lint
amishas157 Oct 30, 2024
21b7229
cleanup
amishas157 Oct 30, 2024
446e43e
Modularize python script
amishas157 Nov 4, 2024
89aaf08
update script with black
amishas157 Nov 4, 2024
5236dbf
lint
amishas157 Nov 4, 2024
67d1d62
test changelog build
amishas157 Nov 4, 2024
100f039
push to branch
amishas157 Nov 4, 2024
d4fbe56
push to branch
amishas157 Nov 4, 2024
6e95960
Update changelog for Source data
actions-user Nov 4, 2024
a96ae3c
revert schema changes
amishas157 Nov 4, 2024
ca0961d
revert schema changes
amishas157 Nov 4, 2024
23ac7ed
Update .github/workflows/update_dbt_marts_schema_changelog.yml
amishas157 Nov 4, 2024
c0027f6
Add docstring
amishas157 Nov 4, 2024
1b2b332
Minor refactor
amishas157 Nov 4, 2024
4ae7e61
Update scripts/update_source_data_schema_changelog.py
amishas157 Nov 6, 2024
443de32
Update scripts/update_source_data_schema_changelog.py
amishas157 Nov 6, 2024
e2b1f53
Update scripts/update_source_data_schema_changelog.py
amishas157 Nov 6, 2024
d891230
Update scripts/update_source_data_schema_changelog.py
amishas157 Nov 6, 2024
0d59fd5
clear contents of changelog
amishas157 Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions .github/workflows/update_source_data_schema_changelog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Update changelog for Source Data

on:
push:
Comment on lines +3 to +4
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we trigger on pull request, git push was not working as it was losing the context of branch. Even providing branch name did not help

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting. I think leaving this as push is fine. I can't think of any issues other than it being odd and rerunning every push before a PR is merged


permissions:
contents: write

concurrency:
group: ${{ github.workflow }}-${{ github.ref_protected == 'true' && github.sha || github.ref }}-{{ github.event_name }}
cancel-in-progress: true

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout Repository
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: 3.12

- name: Run Bash Script
run: |
set -x
cd $GITHUB_WORKSPACE
OLD_SCHEMAS_DIR=original_schemas
git clone --branch master https://github.com/stellar/stellar-etl-airflow.git repo_master_copy
mkdir OLD_SCHEMAS_DIR
cp -r repo_master_copy/schemas/ $OLD_SCHEMAS_DIR/
export OLD_SCHEMAS_DIR
python3 scripts/update_source_data_schema_changelog.py
rm -rf $OLD_SCHEMAS_DIR
rm -rf repo_master_copy
- name: Commit and Push Changes
run: |
git config --local user.email "[email protected]"
git config --local user.name "GitHub Action"
git add changelog/source_data.md
if git commit -m "Update changelog for Source data"; then
echo "Changes committed."
git push
else
echo "No changes to commit."
fi
23 changes: 23 additions & 0 deletions changelog/source_data.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## 2024-11-04:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File to be deleted before merging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Deleted contents


## Tables Added:
- account_signers
## Tables Deleted:
- account_signers_schema1.json
## Schema Changes:
| Table Name | Operation | Columns |
|---------------------------------|---------------|--------------------------|
| - dimOffers | column_added | horizon_offer_id |



## 2024-11-04:

## Tables Added:
- account_signers
## Tables Deleted:
- account_signers_schema1.json
## Schema Changes:
| Table Name | Operation | Columns |
|---------------------------------|---------------|--------------------------|
| - dimOffers | column_added | horizon_offer_id |
198 changes: 198 additions & 0 deletions scripts/update_source_data_schema_changelog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import json
import os
import sys
from datetime import datetime

OLD_SCHEMAS_DIR = os.environ.get("OLD_SCHEMAS_DIR")
NEW_SCHEMAS_DIR = "schemas"
CHANGELOG_FILEPATH = "changelog/source_data.md"


def read_json_file(filepath: str) -> {}:
with open(filepath, "r") as rfp:
try:
return json.load(rfp)
except:
sys.exit(f"Not a valid JSON at filepath {filepath}.")


def read_file(filepath: str) -> str:
with open(filepath, "r") as rfp:
return rfp.read()


def write_file(filepath: str, content: str, mode="a") -> None:
with open(filepath, mode) as wfp:
wfp.write(content)


def sort_schema_changes(schema_changes: {}) -> {}:
sorted_data = {}

for table_name, op_types in sorted(schema_changes.items()):
sorted_operations = {
op_type: sorted(columns) for op_type, columns in sorted(op_types.items())
}
sorted_data[table_name] = sorted_operations
amishas157 marked this conversation as resolved.
Show resolved Hide resolved
return sorted_data


def list_files_in_dir(directory: str) -> []:
if not os.path.exists(directory):
sys.exit(f"Directory {directory} does not exist.")
return os.listdir(directory)


def compare_lists(old_list=[], new_list=[]):
old_set = set(old_list)
new_set = set(new_list)

common = old_set & new_set
added = new_set - old_set
deleted = old_set - new_set
amishas157 marked this conversation as resolved.
Show resolved Hide resolved

return list(common), list(added), list(deleted)


def get_mapped_schema_json(directory: str, schema_name: str) -> {}:
"""
Returns schema object indexed by field name.
Example:
{
"field_name_1": {
"name": field_name_1,
"type": STRING
},
"field_name_2": {
"name": field_name_2,
"type": STRING
},
}
"""
schema_json = read_json_file(os.path.join(directory, schema_name))
schema_json_by_col_name = {column["name"]: column for column in schema_json}
return schema_json_by_col_name


def compare_schemas(schemas=[]) -> {}:
schema_changes = {}

for schema in schemas:
old_schema_json_by_col_name = get_mapped_schema_json(
directory=OLD_SCHEMAS_DIR, schema_name=schema
)
new_schema_json_by_col_name = get_mapped_schema_json(
directory=NEW_SCHEMAS_DIR, schema_name=schema
)

common, added, deleted = compare_lists(
old_list=old_schema_json_by_col_name.keys(),
new_list=new_schema_json_by_col_name.keys(),
)

type_changed = [
(
col,
old_schema_json_by_col_name[col]["type"],
new_schema_json_by_col_name[col]["type"],
)
for col in common
if old_schema_json_by_col_name[col]["type"]
!= new_schema_json_by_col_name[col]["type"]
]

if added or deleted or type_changed:
schema_changes[schema] = {
"column_added": added,
"column_removed": deleted,
"type_changed": type_changed,
}

return schema_changes


def get_print_label_string(label: str) -> str:
return f"\n## {label}:\n" if label else label


def get_print_schemas_string(label="", schemas=[]) -> str:
print_string = ""
if not len(schemas):
return print_string

print_string += get_print_label_string(label)
for schema in schemas:
print_string += f"- {schema.replace('_schema.json', '')}"
return print_string


def get_print_schema_changes_string(label="", schema_changes={}) -> str:
print_string = ""
if not schema_changes:
return print_string

print_string += get_print_label_string(label)

markdown_table = "| Table Name | Operation | Columns |\n"
markdown_table += "|---------------------------------|---------------|--------------------------|\n"
amishas157 marked this conversation as resolved.
Show resolved Hide resolved

for schema_name, operations in schema_changes.items():
for operation, columns in operations.items():
if len(columns):
if operation in ["column_added", "column_removed"]:
columns_str = ", ".join(columns)
if operation in ["type_changed"]:
columns_str = ", ".join(
[
f"{column[0]} ({column[1]} -> {column[2]})"
for column in columns
]
)
markdown_table += f"| {get_print_schemas_string(schemas=[schema_name]):<33} | {operation:<15} | {columns_str:<50} |\n"
amishas157 marked this conversation as resolved.
Show resolved Hide resolved
print_string += markdown_table
return print_string


def generate_changelog(schemas_added=[], schemas_deleted=[], schemas_changes={}) -> str:
new_changelog = ""
if schemas_added or schemas_deleted or schemas_changes:
current_date = datetime.now().strftime("%Y-%m-%d")
new_changelog += get_print_label_string(current_date)

new_changelog += get_print_schemas_string(
label="Tables Added", schemas=schemas_added
)
new_changelog += get_print_schemas_string(
label="Tables Deleted", schemas=schemas_deleted
)

sorted_schema_changes = sort_schema_changes(schemas_changes)
new_changelog += get_print_schema_changes_string(
label="Schema Changes", schema_changes=sorted_schema_changes
)
return new_changelog


def main():
existing_changelog = read_file(filepath=CHANGELOG_FILEPATH)
old_schema_filepaths = list_files_in_dir(directory=OLD_SCHEMAS_DIR)
new_schema_filepaths = list_files_in_dir(directory=NEW_SCHEMAS_DIR)

common, added, deleted = compare_lists(
old_list=old_schema_filepaths, new_list=new_schema_filepaths
)
schema_changes = compare_schemas(common)
new_changelog = generate_changelog(
schemas_changes=schema_changes, schemas_added=added, schemas_deleted=deleted
amishas157 marked this conversation as resolved.
Show resolved Hide resolved
)

if len(new_changelog):
# TODO: Append to same date if multiple changelog commited in same day
write_file(
filepath=CHANGELOG_FILEPATH, mode="w", content=new_changelog + "\n\n"
)
write_file(filepath=CHANGELOG_FILEPATH, mode="a", content=existing_changelog)


if __name__ == "__main__":
main()
Loading