-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ETL-644] Update drop_table_duplicates
function to use new method
#113
Conversation
… filter approach
Quality Gate passedIssues Measures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent work!
window_ordered = window_unordered.orderBy( | ||
col("export_end_date").desc() | ||
) | ||
table_no_duplicates = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fun fact, this is exactly what I do to create de-duplicated tables within the synapse data warehouse: https://github.com/Sage-Bionetworks/snowflake/blob/b4755ad226abe29d30308708539c99d632dd2337/synapse_data_warehouse/synapse/tables/V1.11.0__initial_creation_of_latest_tables.sql#L3-L17
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this way may be more common in databases, versus sorting and dropping as you would an in-memory dataframe. Perhaps it has something to do with the distributed (across partitions) nature of the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥 LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
As a sanity check, I compared the Fitbit Intraday Combined Parquet data produced by the main branch (which uses the older method of dropping duplicates) to this feature branch. Both were produced from the same set of JSON pilot data in the dev environment. While each contained the same number of unique records, I saw differences in which exports the records were sourced from, confirming that these two methods were dropping duplicates differently:
I then read the JSON data as a pandas dataframe and dropped duplicates. I understand and trust how duplicates are dropped in an in-memory dataframe more so than in a Spark dataframe, so this just a way to independently verify that we are dropping duplicates correctly. I saw the same distribution of records as the feature branch:
|
Primary changes are L192-208 where we swap the old (and faulty) sort + drop duplicate method of removing duplicates with the new window + sort + rank + filter method.
I also did some refactoring to make things a little more intuitive:
drop_table_duplicates
anddrop_deleted_healthkit_data
now return Spark DataFrames rather than Glue DynamicFrames. These functions already produced DataFrames so I removed the final cast as a DynamicFrame to simplify things. We cast back to a DynamicFrame on L662 since both branches of the conditional (relationalize+write else write) take a DynamicFrame as input.main
functions I derive the data type once inmain
and pass that as an argument to functions which reference the data type. I think this consolidates the number of places where a data type is derived from its table name (or from the job arguments, where the table name is derived), making things more consistent generally.