Skip to content

Commit

Permalink
work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
dannymeijer committed Dec 2, 2024
1 parent 4cec1ab commit 08b73ba
Showing 1 changed file with 20 additions and 34 deletions.
54 changes: 20 additions & 34 deletions src/koheesio/spark/transformations/download_files.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
from typing import Optional
from typing import Optional, Union
from functools import partial

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

from koheesio.models import DirectoryPath, Field
from koheesio.models import DirectoryPath, Field, ListOfColumns
from koheesio.spark import Column
from koheesio.spark.transformations import ColumnsTransformationWithTarget
from koheesio.spark.transformations import Transformation
from koheesio.steps.download_file import DownloadFileStep, FileWriteMode


def download_file_udf(download_path: str, chunk_size: int, mode: FileWriteMode):
@udf(returnType=StringType())
def download_file(url: str) -> str:
try:
print(f"{download_path = }, {chunk_size = }, {mode = }, {url = }")
step = DownloadFileStep(url=url, download_path=download_path, chunk_size=chunk_size, mode=mode)
step.execute()
return str(step.output.download_file_path)
except Exception as e:
print(f"Error downloading file from URL {url}: {e}")
return None

return download_file


class DownloadFileFromUrlTransformation(ColumnsTransformationWithTarget):
class DownloadFileFromUrlTransformation(Transformation):
"""
Downloads content from URLs in the specified column and stores the downloaded file paths in a new column.
Expand Down Expand Up @@ -109,8 +94,6 @@ class DownloadFileFromUrlTransformation(ColumnsTransformationWithTarget):
----------
columns : ListOfColumns
The column (or list of columns) containing the URLs to download.
target_column : Optional[str], optional, default=None
The name of the column to store the downloaded file paths. If not provided, the result will be stored in the source column.
download_path : str
The local directory path where the file will be downloaded.
chunk_size : int, optional, default=8192
Expand All @@ -119,6 +102,10 @@ class DownloadFileFromUrlTransformation(ColumnsTransformationWithTarget):
Write mode: overwrite, append, ignore, exclusive, or backup.
"""

column: Union[Column, str] = Field(
default="",
description="The column that holds the URLs to download.",
)
download_path: DirectoryPath = Field(
..., description="The local directory path where the file will be downloaded to."
)
Expand All @@ -132,22 +119,21 @@ class DownloadFileFromUrlTransformation(ColumnsTransformationWithTarget):
description="Write mode: overwrite, append, ignore, exclusive, or backup.",
)

def func(self, col: Column) -> Column:
def execute(self) -> Transformation.Output:
"""
Download files from URLs in the specified column.
"""
Applies a UDF to download files from URLs in the specified column and returns the file paths.
import requests

Parameters
----------
col : Column
The column containing the URLs to download.
def download_file(row):
url = row.asDict()["url"]
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(self.download_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)

Returns
-------
Column
A new column with the paths of the downloaded files.
"""
download_file = download_file_udf(self.download_path, self.chunk_size, self.mode)
return download_file(col)
self.df.foreach(download_file)


if __name__ == "__main__":
Expand Down

0 comments on commit 08b73ba

Please sign in to comment.