From 6d3ccdb065e6c59bbb0006cf0221164292acee6f Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Wed, 24 Jan 2024 18:55:11 +0100 Subject: [PATCH 1/8] update requirements --- requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index c200ba5..1a9c19b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ numpy<2.0.0,>=1.19.2 -pandas<2.0.0,>=1.1.0 +pandas<3.0.0,>=1.1.0 psutil<6.0.0,>=5.7.0 psycopg2-binary<3.0.0,>=2.9.0 scikit-learn<2.0.0,>=0.23.1 SQLAlchemy<2.0.0,>=1.4.46 fastparquet==2023.2.0 -locopy==0.5.1 +locopy==0.5.7 From 801a214b5b0ac34a1e043ccde47b7d521bcd9134 Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Wed, 24 Jan 2024 18:55:32 +0100 Subject: [PATCH 2/8] bump version to 0.16.1 --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index eed0f86..a186626 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Add `--upgrade` option to update existing package to a new version Specify package link in your `requirements.txt`: ```txt -git+https://github.com/gismart/bi-utils@0.16.0#egg=bi-utils-gismart +git+https://github.com/gismart/bi-utils@0.16.1#egg=bi-utils-gismart ``` ### Usage diff --git a/setup.py b/setup.py index d3a3f50..55fdecb 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setuptools.setup( name="bi-utils-gismart", - version="0.16.0", + version="0.16.1", author="gismart", author_email="info@gismart.com", description="Utils for BI team", From 12b82d9ce1fd8730f5758fdbb52ab58a030fd33c Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Thu, 25 Jan 2024 09:41:46 +0100 Subject: [PATCH 3/8] fix tests --- tests/aws/test_db.py | 1 + tests/transformers/test_hierarchical_encoder.py | 2 +- tests/transformers/test_quantile_clipper.py | 2 +- tests/transformers/test_target_encoder.py | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/aws/test_db.py b/tests/aws/test_db.py index d0f0477..a634b04 100644 --- a/tests/aws/test_db.py +++ b/tests/aws/test_db.py @@ -31,6 +31,7 @@ def test_upload_download_delete(file_format): } ) data.predict_dt = pd.to_datetime(data.predict_dt) + data.load_dttm = pd.to_datetime(data.load_dttm) db.upload_data(data, f"/tmp/data.{file_format}", schema=schema, table=table) query = f""" SELECT text, predict_dt, version, load_dttm diff --git a/tests/transformers/test_hierarchical_encoder.py b/tests/transformers/test_hierarchical_encoder.py index 0829831..084f0ed 100644 --- a/tests/transformers/test_hierarchical_encoder.py +++ b/tests/transformers/test_hierarchical_encoder.py @@ -11,7 +11,7 @@ def test_hierarchical_encoder(cols, C, data): data = data.dropna() target_data = pd.read_csv(utils.data_path("hierarchical_encoder.csv")) - target_data = target_data[(target_data.cols == str(cols)) & (target_data.C == C)] + target_data = target_data[(target_data.cols.fillna("None") == str(cols)) & (target_data.C == C)] clipper = transformers.HierarchicalEncoder(cols=cols, C=C) X = data.drop(["conversion", "conversion_predict"], axis=1) y = data["conversion"] diff --git a/tests/transformers/test_quantile_clipper.py b/tests/transformers/test_quantile_clipper.py index 8adef7c..5797a0a 100644 --- a/tests/transformers/test_quantile_clipper.py +++ b/tests/transformers/test_quantile_clipper.py @@ -11,7 +11,7 @@ def test_quantile_clipper(cols, q, data): data = data.dropna() target_data = pd.read_csv(utils.data_path("quantile_clipper.csv")) - target_data = target_data[(target_data.cols == str(cols)) & (target_data.q == q)] + target_data = target_data[(target_data.cols.fillna("None") == str(cols)) & (target_data.q == q)] clipper = transformers.QuantileClipper(cols=cols, q=q) X = data.drop(["conversion", "conversion_predict"], axis=1) y = data["conversion"] diff --git a/tests/transformers/test_target_encoder.py b/tests/transformers/test_target_encoder.py index ee1a194..c2b844a 100644 --- a/tests/transformers/test_target_encoder.py +++ b/tests/transformers/test_target_encoder.py @@ -11,7 +11,7 @@ def test_target_encoder(cols, C, data): data = data.dropna() target_data = pd.read_csv(utils.data_path("target_encoder.csv")) - target_data = target_data[(target_data.cols == str(cols)) & (target_data.C == C)] + target_data = target_data[(target_data.cols.fillna("None") == str(cols)) & (target_data.C == C)] clipper = transformers.TargetEncoder(cols=cols, C=C) X = data.drop(["conversion", "conversion_predict"], axis=1) y = data["conversion"] From 220757e73047a4db7fd88157357ca9637125b8e0 Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Thu, 25 Jan 2024 10:05:18 +0100 Subject: [PATCH 4/8] fix timestamp units --- tests/aws/test_db.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/aws/test_db.py b/tests/aws/test_db.py index a634b04..34931da 100644 --- a/tests/aws/test_db.py +++ b/tests/aws/test_db.py @@ -21,7 +21,7 @@ def test_delete_wo_conditions(): def test_upload_download_delete(file_format): version = 1 db.delete(table, schema=schema, version=version) - timestamp = pd.Timestamp.now() + timestamp = pd.Timestamp.now().as_unit("ns") data = pd.DataFrame( { "text": ["hello", "bye"], @@ -45,6 +45,7 @@ def test_upload_download_delete(file_format): dtype={"version": "int"}, remove_files=False, ).sort_values("predict_dt", ignore_index=True) + breakpoint() assert downloaded_data.equals(data) db.delete(table, schema=schema, version=version) downloaded_data = db.download_data(query, parse_dates=["predict_dt"]) @@ -60,7 +61,7 @@ def test_upload_update_download(file_format): new_version = 2 db.delete(table, schema=schema, version=version) db.delete(table, schema=schema, version=new_version) - timestamp = pd.Timestamp.now() + timestamp = pd.Timestamp.now().as_unit("ns") data = pd.DataFrame( { "text": ["hello", "bye"], From d1206135631b1405840dd5a336d75347ff9e8ca5 Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Thu, 25 Jan 2024 10:06:27 +0100 Subject: [PATCH 5/8] remove debug breakpoint --- tests/aws/test_db.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/aws/test_db.py b/tests/aws/test_db.py index 34931da..d5d5c06 100644 --- a/tests/aws/test_db.py +++ b/tests/aws/test_db.py @@ -45,7 +45,6 @@ def test_upload_download_delete(file_format): dtype={"version": "int"}, remove_files=False, ).sort_values("predict_dt", ignore_index=True) - breakpoint() assert downloaded_data.equals(data) db.delete(table, schema=schema, version=version) downloaded_data = db.download_data(query, parse_dates=["predict_dt"]) From b859b7ad4de4d3dd57a0299f4e476763ff3ac9da Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Thu, 25 Jan 2024 10:17:06 +0100 Subject: [PATCH 6/8] update fastparquet --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 1a9c19b..a7b1f8c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,5 @@ psutil<6.0.0,>=5.7.0 psycopg2-binary<3.0.0,>=2.9.0 scikit-learn<2.0.0,>=0.23.1 SQLAlchemy<2.0.0,>=1.4.46 -fastparquet==2023.2.0 +fastparquet>=2023.10.1 locopy==0.5.7 From 40c74a83a8b1ae41f4c378c4b87b60f08539bb43 Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Thu, 25 Jan 2024 12:00:03 +0100 Subject: [PATCH 7/8] remove times parquet export arg --- bi_utils/aws/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bi_utils/aws/db.py b/bi_utils/aws/db.py index d8f30c9..fb8bfa9 100644 --- a/bi_utils/aws/db.py +++ b/bi_utils/aws/db.py @@ -175,7 +175,7 @@ def upload_data( logger.warning(f"Partitions are not supported for csv files: {filename}") data.to_csv(file_path, index=False, sep=separator) elif file_path.lower().endswith(".parquet"): - data.to_parquet(file_path, partition_cols=partition_cols, times="int96", index=False) + data.to_parquet(file_path, partition_cols=partition_cols, index=False) else: raise ValueError(f"{filename} file extension is not supported") logger.info(f"Data is saved to {filename} ({len(data)} rows)") From 79f93c5ed81d16752d456660caa86e4ab7b3370a Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Thu, 25 Jan 2024 13:33:36 +0100 Subject: [PATCH 8/8] use pyarrow instead of fastparquet --- bi_utils/aws/db.py | 11 ++++++++--- bi_utils/queue_exporter.py | 7 ++++++- requirements.txt | 2 +- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/bi_utils/aws/db.py b/bi_utils/aws/db.py index fb8bfa9..6fcadb0 100644 --- a/bi_utils/aws/db.py +++ b/bi_utils/aws/db.py @@ -6,8 +6,8 @@ import posixpath import pandas as pd import datetime as dt -import fastparquet as fp from typing import Any, Iterable, Iterator, Sequence, Optional, Union +import pyarrow.parquet as pp from .. import files, sql from . import connection @@ -44,7 +44,7 @@ def upload_file( copy_options.append("PARQUET") separator = None if not columns: - columns = fp.ParquetFile(file_path).columns + columns = pp.ParquetFile(file_path).schema.names else: raise ValueError(f"{os.path.basename(file_path)} file extension is not supported") table_name = f"{schema}.{table}" @@ -175,7 +175,12 @@ def upload_data( logger.warning(f"Partitions are not supported for csv files: {filename}") data.to_csv(file_path, index=False, sep=separator) elif file_path.lower().endswith(".parquet"): - data.to_parquet(file_path, partition_cols=partition_cols, index=False) + data.to_parquet( + file_path, + partition_cols=partition_cols, + coerce_timestamps="us", + index=False, + ) else: raise ValueError(f"{filename} file extension is not supported") logger.info(f"Data is saved to {filename} ({len(data)} rows)") diff --git a/bi_utils/queue_exporter.py b/bi_utils/queue_exporter.py index ba42b9a..f4917d1 100644 --- a/bi_utils/queue_exporter.py +++ b/bi_utils/queue_exporter.py @@ -172,7 +172,12 @@ def _export_df( elif ".parquet" in file_path.lower(): if partition_cols: logger.warning(f"Partitions are not supported for csv files: {filename}") - df.to_parquet(file_path, partition_cols=partition_cols, times="int96", index=False) + df.to_parquet( + file_path, + partition_cols=partition_cols, + coerce_timestamps="us", + index=False, + ) else: df.to_pickle(file_path) logger.info(f"Saved df to {filename} ({len(df)} rows)") diff --git a/requirements.txt b/requirements.txt index a7b1f8c..674cc7f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,5 @@ psutil<6.0.0,>=5.7.0 psycopg2-binary<3.0.0,>=2.9.0 scikit-learn<2.0.0,>=0.23.1 SQLAlchemy<2.0.0,>=1.4.46 -fastparquet>=2023.10.1 +pyarrow>=15.0.0 locopy==0.5.7