Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix singer parsing for columns with dtype object #27

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions gluestick/etl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,15 @@ def parse_objs(x):
# if it's not a string, we just return the input
if type(x) != str:
return x

# if it's a numeric type we don't want to parse it by accident
if type(x) == str:
try:
float(x)
return x
except:
# it failed so it's not a float or int, we can proceed
pass

try:
return ast.literal_eval(x)
Expand Down Expand Up @@ -550,19 +559,24 @@ def __str__(self):
def __repr__(self):
return str(list(self.input_files.keys()))

def get(self, stream, default=None, catalog_types=False, **kwargs):
def get(self, stream, default=None, catalog_types=False, parse_objects=False, **kwargs):
"""Read the selected file."""
filepath = self.input_files.get(stream)
df = None
if not filepath:
return default
if filepath.endswith(".parquet"):
import pyarrow.parquet as pq
return pq.read_table(filepath).to_pandas(safe=False)
catalog = self.read_catalog()
if catalog and catalog_types:
types_params = self.get_types_from_catalog(catalog, stream)
kwargs.update(types_params)
return pd.read_csv(filepath, **kwargs)
df = pq.read_table(filepath).to_pandas(safe=False)
else:
catalog = self.read_catalog()
if catalog and catalog_types:
types_params = self.get_types_from_catalog(catalog, stream)
kwargs.update(types_params)
df = pd.read_csv(filepath, **kwargs)
if parse_objects and df is not None and not df.empty:
df = parse_object_cols(df)
return df

def get_metadata(self, stream):
"""Get metadata from parquet file."""
Expand Down Expand Up @@ -694,6 +708,16 @@ def localize_datetime(df, column_name):

return df[column_name]


def parse_object_cols(df):
object_columns = [column for column in df.columns if df[column].dtype == 'object']
for col in object_columns:
try:
df[col] = df[col].apply(lambda x: parse_objs(x))
except:
continue
return df

def exception(exception, root_dir, error_message=None):
"""
Stores an exception and a message into a file errors.txt,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name="gluestick",
version="2.1.21",
version="2.1.23",
description="ETL utility functions built on Pandas",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
Loading