Skip to content

Commit

Permalink
chore(deps): bump nltk from 3.8.1 to 3.9.1 (#43)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Steers <[email protected]>
Co-authored-by: Aldo Gonzalez <[email protected]>
  • Loading branch information
3 people authored Nov 18, 2024
1 parent e27cb81 commit 4592368
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 147 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ dist
.mypy_cache
.venv
.pytest_cache
.idea
**/__pycache__
88 changes: 69 additions & 19 deletions airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,25 @@
from airbyte_cdk.utils import is_cloud_environment
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from unstructured.file_utils.filetype import (
EXT_TO_FILETYPE,
FILETYPE_TO_MIMETYPE,
STR_TO_FILETYPE,
FileType,
detect_filetype,
)
import nltk

unstructured_partition_pdf = None
unstructured_partition_docx = None
unstructured_partition_pptx = None

try:
nltk.data.find("tokenizers/punkt.zip")
nltk.data.find("tokenizers/punkt_tab.zip")
except LookupError:
nltk.download("punkt")
nltk.download("punkt_tab")


def optional_decode(contents: Union[str, bytes]) -> str:
if isinstance(contents, bytes):
Expand Down Expand Up @@ -108,9 +117,11 @@ async def infer_schema(
format = _extract_format(config)
with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
filetype = self._get_filetype(file_handle, file)

if filetype not in self._supported_file_types() and not format.skip_unprocessable_files:
raise self._create_parse_error(file, self._get_file_type_error_message(filetype))
raise self._create_parse_error(
file,
self._get_file_type_error_message(filetype),
)

return {
"content": {
Expand Down Expand Up @@ -159,6 +170,10 @@ def parse_records(
logger.warn(f"File {file.uri} cannot be parsed. Skipping it.")
else:
raise e
except Exception as e:
exception_str = str(e)
logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.")
raise e

def _read_file(
self,
Expand All @@ -176,20 +191,32 @@ def _read_file(
# check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point)
raise Exception("unstructured library is not available")

filetype = self._get_filetype(file_handle, remote_file)
filetype: FileType | None = self._get_filetype(file_handle, remote_file)

if filetype == FileType.MD or filetype == FileType.TXT:
if filetype is None or filetype not in self._supported_file_types():
raise self._create_parse_error(
remote_file,
self._get_file_type_error_message(filetype),
)
if filetype in {FileType.MD, FileType.TXT}:
file_content: bytes = file_handle.read()
decoded_content: str = optional_decode(file_content)
return decoded_content
if filetype not in self._supported_file_types():
raise self._create_parse_error(remote_file, self._get_file_type_error_message(filetype))
if format.processing.mode == "local":
return self._read_file_locally(file_handle, filetype, format.strategy, remote_file)
return self._read_file_locally(
file_handle,
filetype,
format.strategy,
remote_file,
)
elif format.processing.mode == "api":
try:
result: str = self._read_file_remotely_with_retries(
file_handle, format.processing, filetype, format.strategy, remote_file
file_handle,
format.processing,
filetype,
format.strategy,
remote_file,
)
except Exception as e:
# If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow.
Expand Down Expand Up @@ -336,7 +363,11 @@ def _read_file_locally(

return self._render_markdown([element.to_dict() for element in elements])

def _create_parse_error(self, remote_file: RemoteFile, message: str) -> RecordParseError:
def _create_parse_error(
self,
remote_file: RemoteFile,
message: str,
) -> RecordParseError:
return RecordParseError(
FileBasedSourceError.ERROR_PARSING_RECORD, filename=remote_file.uri, message=message
)
Expand All @@ -360,32 +391,51 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
# detect_filetype is either using the file name or file content
# if possible, try to leverage the file name to detect the file type
# if the file name is not available, use the file content
file_type = detect_filetype(
filename=remote_file.uri,
)
if file_type is not None and not file_type == FileType.UNK:
file_type: FileType | None = None
try:
file_type = detect_filetype(
filename=remote_file.uri,
)
except Exception:
# Path doesn't exist locally. Try something else...
pass

if file_type and file_type != FileType.UNK:
return file_type

type_based_on_content = detect_filetype(file=file)
file.seek(0) # detect_filetype is reading to read the file content, so we need to reset

# detect_filetype is reading to read the file content
file.seek(0)
if type_based_on_content and type_based_on_content != FileType.UNK:
return type_based_on_content

return type_based_on_content
extension = "." + remote_file.uri.split(".")[-1].lower()
if extension in EXT_TO_FILETYPE:
return EXT_TO_FILETYPE[extension]

return None

def _supported_file_types(self) -> List[Any]:
return [FileType.MD, FileType.PDF, FileType.DOCX, FileType.PPTX, FileType.TXT]

def _get_file_type_error_message(self, file_type: FileType) -> str:
def _get_file_type_error_message(
self,
file_type: FileType | None,
) -> str:
supported_file_types = ", ".join([str(type) for type in self._supported_file_types()])
return f"File type {file_type} is not supported. Supported file types are {supported_file_types}"
return f"File type {file_type or 'None'!s} is not supported. Supported file types are {supported_file_types}"

def _render_markdown(self, elements: List[Any]) -> str:
return "\n\n".join((self._convert_to_markdown(el) for el in elements))

def _convert_to_markdown(self, el: Dict[str, Any]) -> str:
if dpath.get(el, "type") == "Title":
heading_str = "#" * (dpath.get(el, "metadata/category_depth", default=1) or 1)
category_depth = dpath.get(el, "metadata/category_depth", default=1) or 1
if not isinstance(category_depth, int):
category_depth = (
int(category_depth) if isinstance(category_depth, (str, float)) else 1
)
heading_str = "#" * category_depth
return f"{heading_str} {dpath.get(el, 'text')}"
elif dpath.get(el, "type") == "ListItem":
return f"- {dpath.get(el, 'text')}"
Expand Down
Loading

0 comments on commit 4592368

Please sign in to comment.