Skip to content

Commit

Permalink
Merge pull request #48 from gismart/fix_exporter_partitioning
Browse files Browse the repository at this point in the history
fix partitioning in exporter, update locopy
  • Loading branch information
daniil-maltsev-gismart authored Nov 12, 2024
2 parents c51bb95 + 3b28224 commit e9b1341
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9']
python-version: ['3.9', '3.10']

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Add `--upgrade` option to update existing package to a new version
Specify package link in your `requirements.txt`:

```txt
git+https://github.com/gismart/[email protected].0#egg=bi-utils-gismart
git+https://github.com/gismart/[email protected].1#egg=bi-utils-gismart
```

### Usage
Expand Down
4 changes: 2 additions & 2 deletions bi_utils/aws/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def unload_data(
database: Optional[str] = None,
host: Optional[str] = None,
max_chunk_size_mb: int = 6000,
partition_by: list[str] | None = None,
partition_by: Optional[list[str]] = None,
) -> Sequence[str]:
"""Unload data from RedShift to S3 into csv or parquet files up to `max_chunk_size_mb`"""
unload_options = _get_unload_options(
Expand Down Expand Up @@ -405,7 +405,7 @@ def _get_unload_options(
file_format: str = "csv",
delete_s3_before: bool = False,
max_chunk_size_mb: int = 6000,
partition_by: list[str] | None = None,
partition_by: Optional[list[str]] = None,
) -> list[str]:
max_chunk_size_opt = f"MAXFILESIZE {max_chunk_size_mb} MB"
if file_format.lower() == "csv":
Expand Down
2 changes: 1 addition & 1 deletion bi_utils/queue_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ def _export_df(
if s3_bucket or s3_bucket_dir or not delete_file_after:
if ".csv" in file_path.lower():
df.to_csv(file_path, index=False)
elif ".parquet" in file_path.lower():
if partition_cols:
logger.warning(f"Partitions are not supported for csv files: {filename}")
elif ".parquet" in file_path.lower() or partition_cols:
df.to_parquet(
file_path,
partition_cols=partition_cols,
Expand Down
20 changes: 16 additions & 4 deletions bi_utils/transformers/quantile_clipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@
class QuantileClipper(BaseEstimator, TransformerMixin):
"""Clip grouped features by certain quantile"""

def __init__(self, *, cols: Optional[Sequence[str]] = None, q: float = 0.001) -> None:
def __init__(
self,
*,
cols: Optional[Sequence[str]] = None,
q: float = 0.001,
interpolation: str = "linear",
) -> None:
self.uq = 1 - q
self.lq = q
self.cols = cols
self.q = q
self.quantile_params = {
"interpolation": interpolation,
"numeric_only": True,
}

def _check_params(self, X: pd.DataFrame) -> None:
if not 0 < self.lq <= 0.5:
Expand All @@ -39,9 +49,11 @@ def fit(self, X: pd.DataFrame, y: Union[pd.Series, np.ndarray]) -> QuantileClipp
self._check_params(X)

X["target"] = y
groups = X.groupby(self.cols)
self.groups_l_ = groups["target"].quantile(self.lq).fillna(y.min()).rename("target_l")
self.groups_u_ = groups["target"].quantile(self.uq).fillna(y.max()).rename("target_u")
groups = X.groupby(self.cols, observed=True)
self.groups_u_ = groups["target"].quantile(self.uq, **self.quantile_params)
self.groups_u_ = self.groups_u_.fillna(y.max()).rename("target_u")
self.groups_l_ = groups["target"].quantile(self.lq, **self.quantile_params)
self.groups_l_ = self.groups_l_.fillna(y.min()).rename("target_l")
self.n_groups_ = len(groups)
return self

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ psycopg2-binary<3.0.0,>=2.9.0
scikit-learn<2.0.0,>=0.23.1
SQLAlchemy<2.0.0,>=1.4.46
pyarrow>=15.0.0
locopy==0.5.8
locopy==0.5.9
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@

setuptools.setup(
name="bi-utils-gismart",
version="0.17.0",
version="0.17.1",
author="gismart",
author_email="[email protected]",
description="Utils for BI team",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/gismart/bi-utils",
packages=setuptools.find_packages(),
python_requires=">=3.8",
python_requires=">=3.9",
install_requires=install_requires,
classifiers=[
"Programming Language :: Python :: 3",
Expand Down
37 changes: 27 additions & 10 deletions tests/transformers/test_quantile_clipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,34 @@
from bi_utils import transformers
from .. import utils

df = pd.DataFrame({
"conversion": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.6, 0.6, 0.6, 0.6],
"media_source": ["google", "google", "google", "google", "google", "fb", "fb", "fb", "fb", "fb"],
"campaign_id": ["1", "1", "1", "1", "1", "1", "1", "1", "1", "1"],
})

@pytest.mark.parametrize("cols", [["media_source", "campaign_id"], ["media_source"], None])
@pytest.mark.parametrize("q", [0.01, 0.2, 0.5])
def test_quantile_clipper(cols, q, data):
data = data.dropna()
target_data = pd.read_csv(utils.data_path("quantile_clipper.csv"))
target_data = target_data[(target_data.cols.fillna("None") == str(cols)) & (target_data.q == q)]
@pytest.mark.parametrize(
"cols, q, expected",
[
(["media_source"], 0.5, [0.3] * 5 + [0.6] * 5,), # 0.3 is the 0.5 quantile of google, 0.6 is the 0.5 quantile of fb
(["media_source", "campaign_id"], 0.5, [0.3] * 5 + [0.6] * 5,), # Same as above, because campaign_id is uniform
(None, 0.5, [0.3] * 5 + [0.6] * 5,), # Same as above, because both columns are used if cols is None
(["campaign_id"], 0.5, [0.55] * 10,), # 0.55 is the 0.5 quantile of the whole dataset
# In an array with n=10 values, the position of the 0.2 quantile is calculated as
# (n - 1) * 0.2 = 1.8
# Since the quantile falls between two values (0.2 and 0.3 in this case),
# numpy linearly interpolates between these values:
# 0.2 + 0.8 * (0.3 - 0.2) = 0.28
# The upper quantile is 0.6 since the upper part of the array is filled with 0.6
# Values between 0.28 and 0.6 remain unchanged, so:
(["campaign_id"], 0.2, [0.28, 0.28, 0.3, 0.4, 0.5] + [0.6] * 5,),
],
)
def test_quantile_clipper(cols, q, expected, *kwargs):
clipper = transformers.QuantileClipper(cols=cols, q=q)
X = data.drop(["conversion", "conversion_predict"], axis=1)
y = data["conversion"]
X = df.drop(["conversion"], axis=1)
y = df["conversion"]
clipper.fit(X, y)
result = clipper.transform(X, y)
expected_result = target_data["conversion"].values
assert np.allclose(result, expected_result)
expected_result = np.array(expected)
assert np.allclose(result, expected_result, atol=1e-8, rtol=1e-5)

0 comments on commit e9b1341

Please sign in to comment.