Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enforce types when exploding columns #51

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion gluestick/pandas_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
import pandas as pd
from pandas.io.json._normalize import nested_to_record
from gluestick.reader import Reader


def json_tuple_to_cols(
Expand Down Expand Up @@ -110,8 +111,67 @@ def rename(df, target_columns):
return df[target_column_names].rename(columns=target_columns)
return df

def enforce_exploded_col_types(df, column_name, stream=None):
"""Enforce types for columns created by exploded fields for better consistency.

def explode_json_to_rows(df, column_name, drop=True, **kwargs):
Notes
-----
Enforce types for columns created by exploded fields using catalog if defined there
or enforce nullable booleans for consistency

Parameters
----------
df: pd.DataFrame
The input pandas data frame.
column_name: str
The name of the column that should be exploded.
stream: str
Stream name to enforce types using catalog typing
"""

# enforce types for booleans and integers
field_schema = None
exploded_columns = [col for col in df.columns if col.startswith(f"{column_name}.")]

if stream:
input = Reader()
catalog = input.read_catalog()
stream_schema = [s for s in catalog["streams"] if s["tap_stream_id"] == stream]
if stream_schema:
field_schema = stream_schema[0].get("schema", {}).get("properties", {}).get(column_name)

if field_schema and "properties" in field_schema:
for col in exploded_columns:
col_name = col.split(".")[-1]
col_type = field_schema.get("properties").get(col_name, {}).get("type")
if isinstance(col_type, list) and col_type:
col_type = next(iter([t for t in col_type if t != "null"]), None)
if col_type:
if col_type in ["bool", "boolean"]:
df[col] = df[col].astype("boolean")
elif col_type in ["int", "integer"]:
df[col] = df[col].astype("Int64")

else:
for col in exploded_columns:
# if all column values are false let pandas infere type
if df[col].dropna().empty:
continue

first_non_null_value = df[col].dropna().iloc[0]
hsyyid marked this conversation as resolved.
Show resolved Hide resolved
if type(first_non_null_value) in [list, dict, str]:
continue
# if all not null fields are bool type column as boolean
are_all_boolean = df[col].dropna().apply(lambda x: isinstance(x, bool)).all()
if are_all_boolean:
df[col] = df[col].astype("boolean")
continue
# Enforcing only boolean types if "field_schema" is not present,
# as pandas automatically converts integers with NaN values (e.g., 2 to 2.0),
return df


def explode_json_to_rows(df, column_name, drop=True, stream=None, **kwargs):
"""Explodes a column with an array of objects into multiple rows.

Notes
Expand All @@ -128,6 +188,8 @@ def explode_json_to_rows(df, column_name, drop=True, **kwargs):
The name of the column that should be exploded.
drop: boolean
To drop or not the exploded column.
stream: str
Stream name to enforce types using catalog typing
**kwargs:
Additional arguments.

Expand Down Expand Up @@ -227,6 +289,8 @@ def flatten(y):
if drop:
df.drop(column_name, axis=1, inplace=True)

# enforce types
df = enforce_exploded_col_types(df, column_name, stream)
return df


Expand Down
Loading