Skip to content

Commit

Permalink
work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
dannymeijer committed Dec 2, 2024
1 parent cc40c4c commit 55b24d3
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 155 deletions.
23 changes: 23 additions & 0 deletions src/koheesio/spark/transformations/download_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pydantic import DirectoryPath

from koheesio.spark.transformations import ColumnsTransformationWithTarget


class DownloadFileFromUrlTransformation(ColumnsTransformationWithTarget):
"""
Downloads content from URLs in the specified column and stores the downloaded file paths in a new column.
Parameters
----------
columns : ListOfColumns
The column (or list of columns) containing the URLs to download.
target_column : Optional[str], optional, default=None
The name of the column to store the downloaded file paths. If not provided, the result will be stored in the source column.
download_path : DirectoryPath
The local directory path where the files will be downloaded.
"""

download_path: DirectoryPath

def func(self, column):
pass
42 changes: 15 additions & 27 deletions src/koheesio/steps/download_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class DownloadFileStep(HttpGetStep):
"""
Downloads a file from the given URL and saves it to the specified download path.
Examples
--------
# TODO: add examples
Parameters
----------
url : str
Expand Down Expand Up @@ -93,29 +97,14 @@ class Output(HttpGetStep.Output):
def handle_file_write_modes(self, _filepath: Path, _filename: str) -> Optional[str]:
"""Handle different write modes for the file and return the appropriate write mode."""
mode = FileWriteMode.from_string(self.mode) # Convert string to FileWriteMode
write_mode = mode.write_mode # Determine the write mode
write_mode = str(mode.write_mode) # Determine the write mode

# FIXME: logging is not working in the unit tests
import logging

local_logger = logging.getLogger("koheesio.DownloadFileStep")
local_logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
local_logger.addHandler(handler)

# OVERWRITE and APPEND modes will write the file irrespective of whether it exists or not
if _filepath.exists() and mode not in {FileWriteMode.OVERWRITE, FileWriteMode.APPEND}:
if mode == FileWriteMode.IGNORE:
# If the file exists in IGNORE mode, return without writing
print(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.")
self.log.info(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.")
local_logger.info(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.")
self.log.error(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.")
self.log.debug(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.")
self.log.warning(f"File {_filepath} already exists. Ignoring {_filename} based on IGNORE mode.")
self.output.download_file_path = _filepath
return None

Expand All @@ -136,6 +125,9 @@ def handle_file_write_modes(self, _filepath: Path, _filename: str) -> Optional[s
return write_mode

def execute(self) -> Output:
"""
Executes the file download process, handling different write modes and saving the file to the specified path.
"""
_filename = Path(self.url).name
_filepath = self.download_path / _filename

Expand All @@ -147,14 +139,10 @@ def execute(self) -> Output:
self.output.download_file_path = _filepath
self.output.download_file_path.touch(exist_ok=True)

# download the file
with self.request() as response:
# response = self.output.response_raw

# write the downloaded content to the file
with self.output.download_file_path.open(mode=mode) as f:
for chunk in response.iter_content(chunk_size=self.chunk_size):
self.log.debug(f"Downloading chunk of size {len(chunk)}")
self.log.debug(f"Writing to file {self.output.download_file_path}")
self.log.debug(f"Downloaded {f.tell()} bytes")
f.write(chunk)
# download the file content and write the downloaded content to the file
with self.request() as response, self.output.download_file_path.open(mode=mode) as f:
for chunk in response.iter_content(chunk_size=self.chunk_size):
self.log.debug(f"Downloading chunk of size {len(chunk)}")
self.log.debug(f"Writing to file {self.output.download_file_path}")
self.log.debug(f"Downloaded {f.tell()} bytes")
f.write(chunk)
236 changes: 108 additions & 128 deletions tests/steps/test_download_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,153 +11,133 @@


@pytest.fixture
def test_data_path(data_path):
def large_file_path(data_path):
return Path(f"{data_path}/transformations/string_data/100000_rows_with_strings.json")


@pytest.fixture
def test_data_bytes(test_data_path):
return test_data_path.read_bytes()
def download_path(tmp_path):
_path = tmp_path / "downloads"
_path.mkdir(exist_ok=True)
return _path


@pytest.fixture
def mock_response(test_data_bytes):
mock_resp = MagicMock()
mock_resp.iter_content = MagicMock(return_value=test_data_bytes)
return mock_resp
def downloaded_file(download_path):
_file = download_path / "testfile.txt"
_file.touch(exist_ok=True)
return _file


def test_download_file_step_large_file(mock_response, tmp_path, test_data_path, test_data_bytes):
with Mocker() as mocker:
# Arrange
with test_data_path.open("rb") as f:
mocker.get(URL, content=f.read())
download_path = tmp_path / "downloads"
download_path.mkdir(exist_ok=True)
class TestDownloadFileStep:
def test_large_file(self, download_path, large_file_path):
"""Test that a large file can be downloaded using default settings"""
with Mocker() as mocker:
# Arrange
mocker.get(URL, content=large_file_path.read_bytes())

# Act
step = DownloadFileStep(url=URL, download_path=download_path)
step.execute()
# Act
step = DownloadFileStep(url=URL, download_path=download_path)
step.execute()

# Assert
downloaded_file = download_path / "testfile.txt"
assert downloaded_file.exists()
assert downloaded_file.read_bytes() == test_data_bytes


def test_invalid_write_mode():
"""Test that an error is raised for an invalid write mode"""
with pytest.raises(ValueError):
DownloadFileStep(
mode="invalid_mode",
url=URL,
)


def test_download_file_step_overwrite_mode(mock_response, tmp_path):
"""In OVERWRITE mode, the file should be overwritten if it exists"""
# Arrange
download_path = tmp_path / "downloads"
download_path.mkdir(exist_ok=True)
downloaded_file = download_path / "testfile.txt"
downloaded_file.touch(exist_ok=True)
downloaded_file.write_bytes(b"foo")
with Mocker() as mocker:
mocker.get(URL, content=b"bar")

# Act
step = DownloadFileStep(url=URL, download_path=download_path, mode="overwrite")
step.execute()

# Assert
assert downloaded_file.exists()
assert downloaded_file.read_bytes() == b"bar"


def test_download_file_step_append_mode(mock_response, tmp_path):
"""In APPEND mode, the file should be appended to if it exists"""
# Arrange
download_path = tmp_path / "downloads"
download_path.mkdir(exist_ok=True)
downloaded_file = download_path / "testfile.txt"
downloaded_file.touch(exist_ok=True)
downloaded_file.write_bytes(b"foo")
with Mocker() as mocker:
mocker.get(URL, content=b"bar")

# Act
step = DownloadFileStep(url=URL, download_path=download_path, mode="append")
step.execute()

# Assert
assert downloaded_file.exists()
assert downloaded_file.read_bytes() == b"foobar"


def test_download_file_step_ignore_mode(mock_response, tmp_path, caplog):
"""In IGNORE mode, the class should return without writing anything if the file exists"""
# Arrange
download_path = tmp_path / "downloads"
download_path.mkdir(exist_ok=True)
downloaded_file = download_path / "testfile.txt"
downloaded_file.touch(exist_ok=True)
downloaded_file.write_bytes(b"foo")

# FIXME: logging is not working in the unit tests
with caplog.at_level("INFO"), Mocker() as mocker:
mocker.get(URL, content=b"bar")

# Act
step = DownloadFileStep(url=URL, download_path=download_path, mode="ignore")
print(f"1 {caplog.text = }")
step.execute()
print(f"2 {caplog.text = }")
assert downloaded_file.read_bytes() == large_file_path.read_bytes()

def test_invalid_write_mode(self):
"""Test that an error is raised for an invalid write mode"""
with pytest.raises(ValueError):
DownloadFileStep(
mode="invalid_mode",
url=URL,
)

def test_download_file_step_overwrite_mode(self, download_path, downloaded_file):
"""In OVERWRITE mode, the file should be overwritten if it exists"""
# Arrange
downloaded_file.write_bytes(b"foo")

with Mocker() as mocker:
mocker.get(URL, content=b"bar")

# Act
step = DownloadFileStep(url=URL, download_path=download_path, mode="overwrite")
step.execute()

# Assert
assert downloaded_file.exists()
assert downloaded_file.read_bytes() == b"bar"

def test_download_file_step_append_mode(self, download_path, downloaded_file):
"""In APPEND mode, the file should be appended to if it exists"""
# Arrange
downloaded_file.write_bytes(b"foo")

with Mocker() as mocker:
mocker.get(URL, content=b"bar")

# Act
step = DownloadFileStep(url=URL, download_path=download_path, mode="append")
step.execute()

# Assert
assert downloaded_file.exists()
assert downloaded_file.read_bytes() == b"foobar"

def test_download_file_step_ignore_mode(self, download_path, downloaded_file, caplog):
"""In IGNORE mode, the class should return without writing anything if the file exists"""
# Arrange
downloaded_file.write_bytes(b"foo")

with caplog.at_level("INFO"), Mocker() as mocker:
mocker.get(URL, content=b"bar")

# FIXME: logging is not working in the unit tests

# Act
step = DownloadFileStep(url=URL, download_path=download_path, mode="ignore")
step.log.setLevel("INFO")
step.execute()
print(f"2 {caplog.record_tuples = }")

# Assert
print(f"5 {caplog.text = }")
assert "Ignoring testfile.txt based on IGNORE mode." in caplog.text

print(f"3 {caplog.text = }")
assert downloaded_file.exists()
print(f"4 {caplog.text = }")
assert downloaded_file.read_bytes() == b"foo"
print(f"5 {caplog.text = }")
assert "Ignoring testfile.txt based on IGNORE mode." in caplog.text


def test_download_file_step_exclusive_mode(mock_response, tmp_path):
"""In EXCLUSIVE mode, an error should be raised if the file exists"""
# Arrange
download_path = tmp_path / "downloads"
download_path.mkdir(exist_ok=True)
downloaded_file = download_path / "testfile.txt"
downloaded_file.touch(exist_ok=True)
downloaded_file.write_bytes(b"foo")
with Mocker() as mocker:
mocker.get(URL, content=b"bar")

# Act and Assert
with pytest.raises(FileExistsError):
step = DownloadFileStep(url=URL, download_path=download_path, mode="exclusive")
step.execute()

def test_download_file_step_exclusive_mode(self, download_path, downloaded_file):
"""In EXCLUSIVE mode, an error should be raised if the file exists"""
# Arrange
downloaded_file.write_bytes(b"foo")

with Mocker() as mocker:
mocker.get(URL, content=b"bar")

def test_download_file_step_backup_mode(mock_response, tmp_path):
"""In BACKUP mode, the file should be backed up before being overwritten"""
# Arrange
download_path = tmp_path / "downloads"
download_path.mkdir(exist_ok=True)
downloaded_file = download_path / "testfile.txt"
downloaded_file.touch(exist_ok=True)
downloaded_file.write_bytes(b"foo")

with Mocker() as mocker:
mocker.get(URL, content=b"bar")

# Act
step = DownloadFileStep(url=URL, download_path=download_path, mode="backup")
step.execute()

# Assert
assert downloaded_file.exists()
assert downloaded_file.read_bytes() == "bar".encode()
backup_files = list(download_path.glob("testfile.txt.*.bak"))
assert len(backup_files) == 1
assert backup_files[0].read_bytes() == "foo".encode()
# Act and Assert
with pytest.raises(FileExistsError):
step = DownloadFileStep(url=URL, download_path=download_path, mode="exclusive")
step.execute()

def test_download_file_step_backup_mode(self, download_path, downloaded_file):
"""In BACKUP mode, the file should be backed up before being overwritten"""
# Arrange
downloaded_file.write_bytes(b"foo")

with Mocker() as mocker:
mocker.get(URL, content=b"bar")

# Act
step = DownloadFileStep(url=URL, download_path=download_path, mode="backup")
step.execute()

# Assert
assert downloaded_file.exists()
assert downloaded_file.read_bytes() == "bar".encode()
backup_files = list(download_path.glob("testfile.txt.*.bak"))
assert len(backup_files) == 1
assert backup_files[0].read_bytes() == "foo".encode()

0 comments on commit 55b24d3

Please sign in to comment.