From 55b24d342efbca69d9cafccd5436a19b4b0ca5f6 Mon Sep 17 00:00:00 2001 From: Danny Meijer <10511979+dannymeijer@users.noreply.github.com> Date: Mon, 2 Dec 2024 10:20:33 +0100 Subject: [PATCH] work in progress --- .../spark/transformations/download_files.py | 23 ++ src/koheesio/steps/download_file.py | 42 ++-- tests/steps/test_download_file.py | 236 ++++++++---------- 3 files changed, 146 insertions(+), 155 deletions(-) create mode 100644 src/koheesio/spark/transformations/download_files.py diff --git a/src/koheesio/spark/transformations/download_files.py b/src/koheesio/spark/transformations/download_files.py new file mode 100644 index 0000000..fe3945a --- /dev/null +++ b/src/koheesio/spark/transformations/download_files.py @@ -0,0 +1,23 @@ +from pydantic import DirectoryPath + +from koheesio.spark.transformations import ColumnsTransformationWithTarget + + +class DownloadFileFromUrlTransformation(ColumnsTransformationWithTarget): + """ + Downloads content from URLs in the specified column and stores the downloaded file paths in a new column. + + Parameters + ---------- + columns : ListOfColumns + The column (or list of columns) containing the URLs to download. + target_column : Optional[str], optional, default=None + The name of the column to store the downloaded file paths. If not provided, the result will be stored in the source column. + download_path : DirectoryPath + The local directory path where the files will be downloaded. + """ + + download_path: DirectoryPath + + def func(self, column): + pass diff --git a/src/koheesio/steps/download_file.py b/src/koheesio/steps/download_file.py index c559252..d4cb01b 100644 --- a/src/koheesio/steps/download_file.py +++ b/src/koheesio/steps/download_file.py @@ -61,6 +61,10 @@ class DownloadFileStep(HttpGetStep): """ Downloads a file from the given URL and saves it to the specified download path. + Examples + -------- + # TODO: add examples + Parameters ---------- url : str @@ -93,29 +97,14 @@ class Output(HttpGetStep.Output): def handle_file_write_modes(self, _filepath: Path, _filename: str) -> Optional[str]: """Handle different write modes for the file and return the appropriate write mode.""" mode = FileWriteMode.from_string(self.mode) # Convert string to FileWriteMode - write_mode = mode.write_mode # Determine the write mode + write_mode = str(mode.write_mode) # Determine the write mode # FIXME: logging is not working in the unit tests - import logging - - local_logger = logging.getLogger("koheesio.DownloadFileStep") - local_logger.setLevel(logging.DEBUG) - handler = logging.StreamHandler() - handler.setLevel(logging.DEBUG) - formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - handler.setFormatter(formatter) - local_logger.addHandler(handler) - # OVERWRITE and APPEND modes will write the file irrespective of whether it exists or not if _filepath.exists() and mode not in {FileWriteMode.OVERWRITE, FileWriteMode.APPEND}: if mode == FileWriteMode.IGNORE: # If the file exists in IGNORE mode, return without writing - print(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.") self.log.info(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.") - local_logger.info(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.") - self.log.error(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.") - self.log.debug(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.") - self.log.warning(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.") self.output.download_file_path = _filepath return None @@ -136,6 +125,9 @@ def handle_file_write_modes(self, _filepath: Path, _filename: str) -> Optional[s return write_mode def execute(self) -> Output: + """ + Executes the file download process, handling different write modes and saving the file to the specified path. + """ _filename = Path(self.url).name _filepath = self.download_path / _filename @@ -147,14 +139,10 @@ def execute(self) -> Output: self.output.download_file_path = _filepath self.output.download_file_path.touch(exist_ok=True) - # download the file - with self.request() as response: - # response = self.output.response_raw - - # write the downloaded content to the file - with self.output.download_file_path.open(mode=mode) as f: - for chunk in response.iter_content(chunk_size=self.chunk_size): - self.log.debug(f"Downloading chunk of size {len(chunk)}") - self.log.debug(f"Writing to file {self.output.download_file_path}") - self.log.debug(f"Downloaded {f.tell()} bytes") - f.write(chunk) + # download the file content and write the downloaded content to the file + with self.request() as response, self.output.download_file_path.open(mode=mode) as f: + for chunk in response.iter_content(chunk_size=self.chunk_size): + self.log.debug(f"Downloading chunk of size {len(chunk)}") + self.log.debug(f"Writing to file {self.output.download_file_path}") + self.log.debug(f"Downloaded {f.tell()} bytes") + f.write(chunk) diff --git a/tests/steps/test_download_file.py b/tests/steps/test_download_file.py index cfb49d9..6d6269e 100644 --- a/tests/steps/test_download_file.py +++ b/tests/steps/test_download_file.py @@ -11,153 +11,133 @@ @pytest.fixture -def test_data_path(data_path): +def large_file_path(data_path): return Path(f"{data_path}/transformations/string_data/100000_rows_with_strings.json") @pytest.fixture -def test_data_bytes(test_data_path): - return test_data_path.read_bytes() +def download_path(tmp_path): + _path = tmp_path / "downloads" + _path.mkdir(exist_ok=True) + return _path @pytest.fixture -def mock_response(test_data_bytes): - mock_resp = MagicMock() - mock_resp.iter_content = MagicMock(return_value=test_data_bytes) - return mock_resp +def downloaded_file(download_path): + _file = download_path / "testfile.txt" + _file.touch(exist_ok=True) + return _file -def test_download_file_step_large_file(mock_response, tmp_path, test_data_path, test_data_bytes): - with Mocker() as mocker: - # Arrange - with test_data_path.open("rb") as f: - mocker.get(URL, content=f.read()) - download_path = tmp_path / "downloads" - download_path.mkdir(exist_ok=True) +class TestDownloadFileStep: + def test_large_file(self, download_path, large_file_path): + """Test that a large file can be downloaded using default settings""" + with Mocker() as mocker: + # Arrange + mocker.get(URL, content=large_file_path.read_bytes()) - # Act - step = DownloadFileStep(url=URL, download_path=download_path) - step.execute() + # Act + step = DownloadFileStep(url=URL, download_path=download_path) + step.execute() # Assert downloaded_file = download_path / "testfile.txt" assert downloaded_file.exists() - assert downloaded_file.read_bytes() == test_data_bytes - - -def test_invalid_write_mode(): - """Test that an error is raised for an invalid write mode""" - with pytest.raises(ValueError): - DownloadFileStep( - mode="invalid_mode", - url=URL, - ) - - -def test_download_file_step_overwrite_mode(mock_response, tmp_path): - """In OVERWRITE mode, the file should be overwritten if it exists""" - # Arrange - download_path = tmp_path / "downloads" - download_path.mkdir(exist_ok=True) - downloaded_file = download_path / "testfile.txt" - downloaded_file.touch(exist_ok=True) - downloaded_file.write_bytes(b"foo") - with Mocker() as mocker: - mocker.get(URL, content=b"bar") - - # Act - step = DownloadFileStep(url=URL, download_path=download_path, mode="overwrite") - step.execute() - - # Assert - assert downloaded_file.exists() - assert downloaded_file.read_bytes() == b"bar" - - -def test_download_file_step_append_mode(mock_response, tmp_path): - """In APPEND mode, the file should be appended to if it exists""" - # Arrange - download_path = tmp_path / "downloads" - download_path.mkdir(exist_ok=True) - downloaded_file = download_path / "testfile.txt" - downloaded_file.touch(exist_ok=True) - downloaded_file.write_bytes(b"foo") - with Mocker() as mocker: - mocker.get(URL, content=b"bar") - - # Act - step = DownloadFileStep(url=URL, download_path=download_path, mode="append") - step.execute() - - # Assert - assert downloaded_file.exists() - assert downloaded_file.read_bytes() == b"foobar" - - -def test_download_file_step_ignore_mode(mock_response, tmp_path, caplog): - """In IGNORE mode, the class should return without writing anything if the file exists""" - # Arrange - download_path = tmp_path / "downloads" - download_path.mkdir(exist_ok=True) - downloaded_file = download_path / "testfile.txt" - downloaded_file.touch(exist_ok=True) - downloaded_file.write_bytes(b"foo") - - # FIXME: logging is not working in the unit tests - with caplog.at_level("INFO"), Mocker() as mocker: - mocker.get(URL, content=b"bar") - - # Act - step = DownloadFileStep(url=URL, download_path=download_path, mode="ignore") - print(f"1 {caplog.text = }") - step.execute() - print(f"2 {caplog.text = }") + assert downloaded_file.read_bytes() == large_file_path.read_bytes() + + def test_invalid_write_mode(self): + """Test that an error is raised for an invalid write mode""" + with pytest.raises(ValueError): + DownloadFileStep( + mode="invalid_mode", + url=URL, + ) + + def test_download_file_step_overwrite_mode(self, download_path, downloaded_file): + """In OVERWRITE mode, the file should be overwritten if it exists""" + # Arrange + downloaded_file.write_bytes(b"foo") + + with Mocker() as mocker: + mocker.get(URL, content=b"bar") + + # Act + step = DownloadFileStep(url=URL, download_path=download_path, mode="overwrite") + step.execute() # Assert + assert downloaded_file.exists() + assert downloaded_file.read_bytes() == b"bar" + + def test_download_file_step_append_mode(self, download_path, downloaded_file): + """In APPEND mode, the file should be appended to if it exists""" + # Arrange + downloaded_file.write_bytes(b"foo") + + with Mocker() as mocker: + mocker.get(URL, content=b"bar") + + # Act + step = DownloadFileStep(url=URL, download_path=download_path, mode="append") + step.execute() + + # Assert + assert downloaded_file.exists() + assert downloaded_file.read_bytes() == b"foobar" + + def test_download_file_step_ignore_mode(self, download_path, downloaded_file, caplog): + """In IGNORE mode, the class should return without writing anything if the file exists""" + # Arrange + downloaded_file.write_bytes(b"foo") + + with caplog.at_level("INFO"), Mocker() as mocker: + mocker.get(URL, content=b"bar") + + # FIXME: logging is not working in the unit tests + + # Act + step = DownloadFileStep(url=URL, download_path=download_path, mode="ignore") + step.log.setLevel("INFO") + step.execute() + print(f"2 {caplog.record_tuples = }") + + # Assert + print(f"5 {caplog.text = }") + assert "Ignoring testfile.txt based on IGNORE mode." in caplog.text + print(f"3 {caplog.text = }") assert downloaded_file.exists() print(f"4 {caplog.text = }") assert downloaded_file.read_bytes() == b"foo" - print(f"5 {caplog.text = }") - assert "Ignoring testfile.txt based on IGNORE mode." in caplog.text - - -def test_download_file_step_exclusive_mode(mock_response, tmp_path): - """In EXCLUSIVE mode, an error should be raised if the file exists""" - # Arrange - download_path = tmp_path / "downloads" - download_path.mkdir(exist_ok=True) - downloaded_file = download_path / "testfile.txt" - downloaded_file.touch(exist_ok=True) - downloaded_file.write_bytes(b"foo") - with Mocker() as mocker: - mocker.get(URL, content=b"bar") - - # Act and Assert - with pytest.raises(FileExistsError): - step = DownloadFileStep(url=URL, download_path=download_path, mode="exclusive") - step.execute() + def test_download_file_step_exclusive_mode(self, download_path, downloaded_file): + """In EXCLUSIVE mode, an error should be raised if the file exists""" + # Arrange + downloaded_file.write_bytes(b"foo") + + with Mocker() as mocker: + mocker.get(URL, content=b"bar") -def test_download_file_step_backup_mode(mock_response, tmp_path): - """In BACKUP mode, the file should be backed up before being overwritten""" - # Arrange - download_path = tmp_path / "downloads" - download_path.mkdir(exist_ok=True) - downloaded_file = download_path / "testfile.txt" - downloaded_file.touch(exist_ok=True) - downloaded_file.write_bytes(b"foo") - - with Mocker() as mocker: - mocker.get(URL, content=b"bar") - - # Act - step = DownloadFileStep(url=URL, download_path=download_path, mode="backup") - step.execute() - - # Assert - assert downloaded_file.exists() - assert downloaded_file.read_bytes() == "bar".encode() - backup_files = list(download_path.glob("testfile.txt.*.bak")) - assert len(backup_files) == 1 - assert backup_files[0].read_bytes() == "foo".encode() + # Act and Assert + with pytest.raises(FileExistsError): + step = DownloadFileStep(url=URL, download_path=download_path, mode="exclusive") + step.execute() + + def test_download_file_step_backup_mode(self, download_path, downloaded_file): + """In BACKUP mode, the file should be backed up before being overwritten""" + # Arrange + downloaded_file.write_bytes(b"foo") + + with Mocker() as mocker: + mocker.get(URL, content=b"bar") + + # Act + step = DownloadFileStep(url=URL, download_path=download_path, mode="backup") + step.execute() + + # Assert + assert downloaded_file.exists() + assert downloaded_file.read_bytes() == "bar".encode() + backup_files = list(download_path.glob("testfile.txt.*.bak")) + assert len(backup_files) == 1 + assert backup_files[0].read_bytes() == "foo".encode()