Skip to content

Commit

Permalink
[FEATURE] Enable adding options to DeltaReader both streaming and wri…
Browse files Browse the repository at this point in the history
…ting (#111)

<!--- Provide a general summary of your changes in the Title above -->

## Description
Introduce private attributes for batch and stream readers to enable
adding options to DeltaReader both streaming and writing.

## Related Issue
#110 

## Motivation and Context
Provide possibility to override readers, e.g. add more options to
readers.

## How Has This Been Tested?
Current tests


## Types of changes
<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->
- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->
- [x] My code follows the code style of this project.
- [x] My change requires a change to the documentation.
- [x] I have updated the documentation accordingly.
- [x] I have read the **CONTRIBUTING** document.
- [x] I have added tests to cover my changes.
- [x] All new and existing tests passed.
  • Loading branch information
mikita-sakalouski authored Nov 22, 2024
1 parent 79f32aa commit 4596410
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,4 @@ out/**

# DevContainer
.devcontainer
uv.lock
22 changes: 14 additions & 8 deletions src/koheesio/spark/readers/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

from typing import Any, Dict, Optional, Union

from pydantic import PrivateAttr

from pyspark.sql import DataFrameReader
from pyspark.sql import functions as f

Expand Down Expand Up @@ -163,6 +165,7 @@ class DeltaTableReader(Reader):

# private attrs
__temp_view_name__: Optional[str] = None
__reader: Optional[Union[DataStreamReader, DataFrameReader]] = PrivateAttr(default=None)

@property
def temp_view_name(self) -> str:
Expand Down Expand Up @@ -286,23 +289,26 @@ def normalize(v: Union[str, bool]) -> str:
# Any options with `value == None` are filtered out
return {k: normalize(v) for k, v in options.items() if v is not None}

@property
def _stream_reader(self) -> DataStreamReader:
def __get_stream_reader(self) -> DataStreamReader:
"""Returns a basic DataStreamReader (streaming mode)"""
return self.spark.readStream.format("delta")

@property
def _batch_reader(self) -> DataFrameReader:
def __get_batch_reader(self) -> DataFrameReader:
"""Returns a basic DataFrameReader (batch mode)"""
return self.spark.read.format("delta")

@property
def reader(self) -> Union[DataStreamReader, DataFrameReader]:
"""Return the reader for the DeltaTableReader based on the `streaming` attribute"""
reader = self._stream_reader if self.streaming else self._batch_reader
for key, value in self.get_options().items():
reader = reader.option(key, value)
return reader
if not self.__reader:
self.__reader = self.__get_stream_reader() if self.streaming else self.__get_batch_reader()
self.__reader = self.__reader.options(**self.get_options())

return self.__reader

@reader.setter
def reader(self, value: Union[DataStreamReader, DataFrameReader]):
self.__reader = value

def execute(self) -> Reader.Output:
df = self.reader.table(self.table.table_name)
Expand Down

0 comments on commit 4596410

Please sign in to comment.