Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): bump nltk from 3.8.1 to 3.9.1 #43

Merged
merged 17 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ dist
.mypy_cache
.venv
.pytest_cache
.idea
**/__pycache__
88 changes: 69 additions & 19 deletions airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,25 @@
from airbyte_cdk.utils import is_cloud_environment
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unstructured.file_utils.filetype import (
EXT_TO_FILETYPE,
FILETYPE_TO_MIMETYPE,
STR_TO_FILETYPE,
FileType,
detect_filetype,
)
import nltk

unstructured_partition_pdf = None
unstructured_partition_docx = None
unstructured_partition_pptx = None

try:
nltk.data.find("tokenizers/punkt.zip")
nltk.data.find("tokenizers/punkt_tab.zip")
except LookupError:
nltk.download("punkt")
nltk.download("punkt_tab")


def optional_decode(contents: Union[str, bytes]) -> str:
if isinstance(contents, bytes):
Expand Down Expand Up @@ -108,9 +117,11 @@ async def infer_schema(
format = _extract_format(config)
with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
filetype = self._get_filetype(file_handle, file)

if filetype not in self._supported_file_types() and not format.skip_unprocessable_files:
raise self._create_parse_error(file, self._get_file_type_error_message(filetype))
raise self._create_parse_error(
file,
self._get_file_type_error_message(filetype),
)

return {
"content": {
Expand Down Expand Up @@ -159,6 +170,10 @@ def parse_records(
logger.warn(f"File {file.uri} cannot be parsed. Skipping it.")
else:
raise e
except Exception as e:
exception_str = str(e)
logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.")
raise e

def _read_file(
self,
Expand All @@ -176,20 +191,32 @@ def _read_file(
# check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point)
raise Exception("unstructured library is not available")

filetype = self._get_filetype(file_handle, remote_file)
filetype: FileType | None = self._get_filetype(file_handle, remote_file)

if filetype == FileType.MD or filetype == FileType.TXT:
if filetype is None or filetype not in self._supported_file_types():
raise self._create_parse_error(
remote_file,
self._get_file_type_error_message(filetype),
)
if filetype in {FileType.MD, FileType.TXT}:
file_content: bytes = file_handle.read()
decoded_content: str = optional_decode(file_content)
return decoded_content
if filetype not in self._supported_file_types():
raise self._create_parse_error(remote_file, self._get_file_type_error_message(filetype))
if format.processing.mode == "local":
return self._read_file_locally(file_handle, filetype, format.strategy, remote_file)
return self._read_file_locally(
file_handle,
filetype,
format.strategy,
remote_file,
)
elif format.processing.mode == "api":
try:
result: str = self._read_file_remotely_with_retries(
file_handle, format.processing, filetype, format.strategy, remote_file
file_handle,
format.processing,
filetype,
format.strategy,
remote_file,
)
except Exception as e:
# If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow.
Expand Down Expand Up @@ -336,7 +363,11 @@ def _read_file_locally(

return self._render_markdown([element.to_dict() for element in elements])

def _create_parse_error(self, remote_file: RemoteFile, message: str) -> RecordParseError:
def _create_parse_error(
self,
remote_file: RemoteFile,
message: str,
) -> RecordParseError:
return RecordParseError(
FileBasedSourceError.ERROR_PARSING_RECORD, filename=remote_file.uri, message=message
)
Expand All @@ -360,32 +391,51 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
# detect_filetype is either using the file name or file content
# if possible, try to leverage the file name to detect the file type
# if the file name is not available, use the file content
file_type = detect_filetype(
filename=remote_file.uri,
)
if file_type is not None and not file_type == FileType.UNK:
file_type: FileType | None = None
try:
file_type = detect_filetype(
filename=remote_file.uri,
)
except Exception:
# Path doesn't exist locally. Try something else...
pass

if file_type and file_type != FileType.UNK:
return file_type

type_based_on_content = detect_filetype(file=file)
file.seek(0) # detect_filetype is reading to read the file content, so we need to reset

# detect_filetype is reading to read the file content
file.seek(0)
if type_based_on_content and type_based_on_content != FileType.UNK:
return type_based_on_content

return type_based_on_content
extension = "." + remote_file.uri.split(".")[-1].lower()
if extension in EXT_TO_FILETYPE:
return EXT_TO_FILETYPE[extension]

return None

def _supported_file_types(self) -> List[Any]:
return [FileType.MD, FileType.PDF, FileType.DOCX, FileType.PPTX, FileType.TXT]

def _get_file_type_error_message(self, file_type: FileType) -> str:
def _get_file_type_error_message(
self,
file_type: FileType | None,
) -> str:
supported_file_types = ", ".join([str(type) for type in self._supported_file_types()])
return f"File type {file_type} is not supported. Supported file types are {supported_file_types}"
return f"File type {file_type or 'None'!s} is not supported. Supported file types are {supported_file_types}"

def _render_markdown(self, elements: List[Any]) -> str:
return "\n\n".join((self._convert_to_markdown(el) for el in elements))

def _convert_to_markdown(self, el: Dict[str, Any]) -> str:
if dpath.get(el, "type") == "Title":
heading_str = "#" * (dpath.get(el, "metadata/category_depth", default=1) or 1)
category_depth = dpath.get(el, "metadata/category_depth", default=1) or 1
if not isinstance(category_depth, int):
category_depth = (
int(category_depth) if isinstance(category_depth, (str, float)) else 1
)
heading_str = "#" * category_depth
return f"{heading_str} {dpath.get(el, 'text')}"
elif dpath.get(el, "type") == "ListItem":
return f"- {dpath.get(el, 'text')}"
Expand Down
Loading
Loading