From 57861488c86d3679d704ddbdfb023a10dcf77be3 Mon Sep 17 00:00:00 2001 From: louis-paulvlx <90868690+louis-paulvlx@users.noreply.github.com> Date: Thu, 12 Dec 2024 11:45:24 +0100 Subject: [PATCH 1/9] BUGFIX-144-BOX-CSVREADER Fix bug on csvreader for box integration. --- src/koheesio/integrations/box.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/koheesio/integrations/box.py b/src/koheesio/integrations/box.py index 149d9898..47f5b96b 100644 --- a/src/koheesio/integrations/box.py +++ b/src/koheesio/integrations/box.py @@ -419,7 +419,7 @@ def execute(self) -> BoxReaderBase.Output: data = file.content().decode(self.file_encoding) data_buffer = StringIO(data) - temp_df_pandas = pd.read_csv(data_buffer, header=0, dtype=str if not self.schema_ else None, **self.params) # type: ignore + temp_df_pandas = pd.read_csv(data_buffer, header=0, dtype=self._schema if self._schema is not None else str, **self.params) # type: ignore temp_df = self.spark.createDataFrame(temp_df_pandas, schema=self.schema_) # type: ignore From d8d83b72e920195c3639b6ecc0075fde7635e335 Mon Sep 17 00:00:00 2001 From: louis-paulvlx <90868690+louis-paulvlx@users.noreply.github.com> Date: Thu, 12 Dec 2024 16:39:15 +0100 Subject: [PATCH 2/9] Update box.py --- src/koheesio/integrations/box.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/koheesio/integrations/box.py b/src/koheesio/integrations/box.py index 47f5b96b..43285831 100644 --- a/src/koheesio/integrations/box.py +++ b/src/koheesio/integrations/box.py @@ -419,7 +419,7 @@ def execute(self) -> BoxReaderBase.Output: data = file.content().decode(self.file_encoding) data_buffer = StringIO(data) - temp_df_pandas = pd.read_csv(data_buffer, header=0, dtype=self._schema if self._schema is not None else str, **self.params) # type: ignore + temp_df_pandas = pd.read_csv(data_buffer, header=0, dtype=self.schema_ if self.schema_ is not None else str, **self.params) # type: ignore temp_df = self.spark.createDataFrame(temp_df_pandas, schema=self.schema_) # type: ignore From 016763db73b8212400593c99e95f7a4d55c7f9fe Mon Sep 17 00:00:00 2001 From: louis-paulvlx <90868690+louis-paulvlx@users.noreply.github.com> Date: Thu, 12 Dec 2024 16:50:21 +0100 Subject: [PATCH 3/9] Remove dtype Proposing to put back the automatic dtype. Then the user can choose his own dtype on the params section. --- src/koheesio/integrations/box.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/koheesio/integrations/box.py b/src/koheesio/integrations/box.py index 43285831..145e2c5a 100644 --- a/src/koheesio/integrations/box.py +++ b/src/koheesio/integrations/box.py @@ -419,7 +419,7 @@ def execute(self) -> BoxReaderBase.Output: data = file.content().decode(self.file_encoding) data_buffer = StringIO(data) - temp_df_pandas = pd.read_csv(data_buffer, header=0, dtype=self.schema_ if self.schema_ is not None else str, **self.params) # type: ignore + temp_df_pandas = pd.read_csv(data_buffer, header=0, **self.params) # type: ignore temp_df = self.spark.createDataFrame(temp_df_pandas, schema=self.schema_) # type: ignore From 6b604f2624b5566bc07c272ad41e52bdbbb1a90b Mon Sep 17 00:00:00 2001 From: louis-paulvlx <90868690+louis-paulvlx@users.noreply.github.com> Date: Thu, 12 Dec 2024 17:50:19 +0100 Subject: [PATCH 4/9] Fix assert test_execute_wo_schema --- tests/spark/integrations/box/test_box.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/spark/integrations/box/test_box.py b/tests/spark/integrations/box/test_box.py index 42dfd7d7..aa2c879a 100644 --- a/tests/spark/integrations/box/test_box.py +++ b/tests/spark/integrations/box/test_box.py @@ -245,7 +245,7 @@ def test_execute_wo_schema(self, spark, dummy_box): assert bcr.df.count() == 1 assert bcr.df.dtypes == [ ("foo", "string"), - ("bar", "string"), + ("bar", "int"), ("meta_file_id", "string"), ("meta_file_name", "string"), ("meta_load_timestamp", "timestamp"), From 2947ef438f2d35a1e7ea63b9bfce244ea351729a Mon Sep 17 00:00:00 2001 From: louis-paulvlx <90868690+louis-paulvlx@users.noreply.github.com> Date: Thu, 12 Dec 2024 18:02:57 +0100 Subject: [PATCH 5/9] int to bigint --- tests/spark/integrations/box/test_box.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/spark/integrations/box/test_box.py b/tests/spark/integrations/box/test_box.py index aa2c879a..46b57116 100644 --- a/tests/spark/integrations/box/test_box.py +++ b/tests/spark/integrations/box/test_box.py @@ -262,7 +262,7 @@ def test_execute_w_files(self, spark, dummy_box): assert bcr.df.count() == 2 assert bcr.df.dtypes == [ ("foo", "string"), - ("bar", "int"), + ("bar", "bigint"), ("meta_file_id", "string"), ("meta_file_name", "string"), ("meta_load_timestamp", "timestamp"), From 3c1854603cbe79cd88270166016d20430a4302e7 Mon Sep 17 00:00:00 2001 From: louis-paulvlx <90868690+louis-paulvlx@users.noreply.github.com> Date: Mon, 16 Dec 2024 14:20:40 +0100 Subject: [PATCH 6/9] Update box.py --- src/koheesio/integrations/box.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/koheesio/integrations/box.py b/src/koheesio/integrations/box.py index 145e2c5a..9d24fc48 100644 --- a/src/koheesio/integrations/box.py +++ b/src/koheesio/integrations/box.py @@ -35,6 +35,7 @@ model_validator, ) from koheesio.spark.readers import Reader +from koheesio.spark.readers.memory import InMemoryReader from koheesio.utils import utc_now @@ -418,10 +419,8 @@ def execute(self) -> BoxReaderBase.Output: file = self.client.file(file_id=f) data = file.content().decode(self.file_encoding) - data_buffer = StringIO(data) - temp_df_pandas = pd.read_csv(data_buffer, header=0, **self.params) # type: ignore - temp_df = self.spark.createDataFrame(temp_df_pandas, schema=self.schema_) - + temp_df = InMemoryReader(data=data, schema=self.schema_, params=self.params, format="csv").read() + # type: ignore # noinspection PyUnresolvedReferences temp_df = ( From 810f1ab0d51bd1f65b22369d03e93c910f382530 Mon Sep 17 00:00:00 2001 From: louis-paulvlx <90868690+louis-paulvlx@users.noreply.github.com> Date: Mon, 16 Dec 2024 14:27:25 +0100 Subject: [PATCH 7/9] Update box.py --- src/koheesio/integrations/box.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/koheesio/integrations/box.py b/src/koheesio/integrations/box.py index 9d24fc48..30a804e5 100644 --- a/src/koheesio/integrations/box.py +++ b/src/koheesio/integrations/box.py @@ -35,7 +35,7 @@ model_validator, ) from koheesio.spark.readers import Reader -from koheesio.spark.readers.memory import InMemoryReader +from koheesio.spark.readers.memory import InMemoryDataReader from koheesio.utils import utc_now @@ -419,7 +419,7 @@ def execute(self) -> BoxReaderBase.Output: file = self.client.file(file_id=f) data = file.content().decode(self.file_encoding) - temp_df = InMemoryReader(data=data, schema=self.schema_, params=self.params, format="csv").read() + temp_df = InMemoryDataReader(data=data, schema=self.schema_, params=self.params, format="csv").read() # type: ignore # noinspection PyUnresolvedReferences From 6a85ffbbade84a51af6e0131ff43aba9d34cb321 Mon Sep 17 00:00:00 2001 From: Danny Meijer <10511979+dannymeijer@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:46:39 +0100 Subject: [PATCH 8/9] Change field type from integer to bigint in test_box.py --- tests/spark/integrations/box/test_box.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/spark/integrations/box/test_box.py b/tests/spark/integrations/box/test_box.py index 46b57116..27f1068e 100644 --- a/tests/spark/integrations/box/test_box.py +++ b/tests/spark/integrations/box/test_box.py @@ -39,7 +39,7 @@ { "fields": [ {"metadata": {}, "name": "foo", "nullable": True, "type": "string"}, - {"metadata": {}, "name": "bar", "nullable": True, "type": "integer"}, + {"metadata": {}, "name": "bar", "nullable": True, "type": "bigint"}, ], "type": "struct", } From e6b8a84f2217ab5614c0b521d5f2a1576373ce3c Mon Sep 17 00:00:00 2001 From: Danny Meijer <10511979+dannymeijer@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:58:56 +0100 Subject: [PATCH 9/9] Change field type of 'bar' from bigint to integer in test_box.py --- tests/spark/integrations/box/test_box.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/spark/integrations/box/test_box.py b/tests/spark/integrations/box/test_box.py index 27f1068e..e366a0d5 100644 --- a/tests/spark/integrations/box/test_box.py +++ b/tests/spark/integrations/box/test_box.py @@ -39,7 +39,7 @@ { "fields": [ {"metadata": {}, "name": "foo", "nullable": True, "type": "string"}, - {"metadata": {}, "name": "bar", "nullable": True, "type": "bigint"}, + {"metadata": {}, "name": "bar", "nullable": True, "type": "integer"}, ], "type": "struct", } @@ -245,7 +245,7 @@ def test_execute_wo_schema(self, spark, dummy_box): assert bcr.df.count() == 1 assert bcr.df.dtypes == [ ("foo", "string"), - ("bar", "int"), + ("bar", "bigint"), ("meta_file_id", "string"), ("meta_file_name", "string"), ("meta_load_timestamp", "timestamp"), @@ -262,7 +262,7 @@ def test_execute_w_files(self, spark, dummy_box): assert bcr.df.count() == 2 assert bcr.df.dtypes == [ ("foo", "string"), - ("bar", "bigint"), + ("bar", "int"), ("meta_file_id", "string"), ("meta_file_name", "string"), ("meta_load_timestamp", "timestamp"),