diff --git a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py index ffa7d743d..f5713fbe9 100644 --- a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py +++ b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py @@ -31,6 +31,12 @@ class Config(OneOfOptionConfig): delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True) + preserve_subdirectories_directories: bool = Field( + title="Preserve Subdirectories in File Paths", + description="If enabled replicate source folder structure", + default=True, + ) + class AbstractFileBasedSpec(BaseModel): """ diff --git a/airbyte_cdk/sources/file_based/exceptions.py b/airbyte_cdk/sources/file_based/exceptions.py index 1c5ce0b16..832ce06fb 100644 --- a/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte_cdk/sources/file_based/exceptions.py @@ -111,6 +111,10 @@ class ErrorListingFiles(BaseFileBasedSourceError): pass +class DuplicatedFilesError(BaseFileBasedSourceError): + pass + + class CustomFileBasedException(AirbyteTracedException): """ A specialized exception for file-based connectors. @@ -123,3 +127,25 @@ class CustomFileBasedException(AirbyteTracedException): class FileSizeLimitError(CustomFileBasedException): pass + + +def format_duplicate_files_error_message( + stream_name: str, duplicated_files_names: List[dict[str, List[str]]] +) -> str: + duplicated_files_messages = [] + for duplicated_file in duplicated_files_names: + for duplicated_file_name, file_paths in duplicated_file.items(): + file_duplicated_message = ( + f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n" + + "".join(f"\n - {file_paths}") + ) + duplicated_files_messages.append(file_duplicated_message) + + error_message = ( + f"ERROR: Duplicate filenames found for stream {stream_name}. " + "Duplicate file names are not allowed if the Preserve Subdirectories in File Paths option is disabled. " + "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n" + + "\n".join(duplicated_files_messages) + ) + + return error_message diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 900e6a4da..d857a2f6e 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -242,7 +242,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: stream=self._make_default_stream( stream_config=stream_config, cursor=cursor, - use_file_transfer=self._use_file_transfer(parsed_config), + parsed_config=parsed_config, ), source=self, logger=self.logger, @@ -273,7 +273,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: stream=self._make_default_stream( stream_config=stream_config, cursor=cursor, - use_file_transfer=self._use_file_transfer(parsed_config), + parsed_config=parsed_config, ), source=self, logger=self.logger, @@ -285,7 +285,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: stream = self._make_default_stream( stream_config=stream_config, cursor=cursor, - use_file_transfer=self._use_file_transfer(parsed_config), + parsed_config=parsed_config, ) streams.append(stream) @@ -298,7 +298,7 @@ def _make_default_stream( self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor], - use_file_transfer: bool = False, + parsed_config: AbstractFileBasedSpec, ) -> AbstractFileBasedStream: return DefaultFileBasedStream( config=stream_config, @@ -310,7 +310,10 @@ def _make_default_stream( validation_policy=self._validate_and_get_validation_policy(stream_config), errors_collector=self.errors_collector, cursor=cursor, - use_file_transfer=use_file_transfer, + use_file_transfer=self._use_file_transfer(parsed_config), + preserve_subdirectories_directories=self._preserve_subdirectories_directories( + parsed_config + ), ) def _get_stream_from_catalog( @@ -385,3 +388,14 @@ def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool: and parsed_config.delivery_method.delivery_type == "use_file_transfer" ) return use_file_transfer + + @staticmethod + def _preserve_subdirectories_directories(parsed_config: AbstractFileBasedSpec) -> bool: + # fall back to preserve subdirectories if config is not present or incomplete + if ( + FileBasedSource._use_file_transfer(parsed_config) + and hasattr(parsed_config.delivery_method, "preserve_subdirectories_directories") + and parsed_config.delivery_method.preserve_subdirectories_directories is not None + ): + return parsed_config.delivery_method.preserve_subdirectories_directories + return True diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index ab1c428ce..ebedc19c8 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -135,6 +135,17 @@ def use_file_transfer(self) -> bool: return use_file_transfer return False + def preserve_subdirectories_directories(self) -> bool: + # fall back to preserve subdirectories if config is not present or incomplete + if ( + self.use_file_transfer() + and self.config + and hasattr(self.config.delivery_method, "preserve_subdirectories_directories") + and self.config.delivery_method.preserve_subdirectories_directories is not None + ): + return self.config.delivery_method.preserve_subdirectories_directories + return True + @abstractmethod def get_file( self, file: RemoteFile, local_directory: str, logger: logging.Logger @@ -159,10 +170,13 @@ def get_file( """ ... - @staticmethod - def _get_file_transfer_paths(file: RemoteFile, local_directory: str) -> List[str]: - # Remove left slashes from source path format to make relative path for writing locally - file_relative_path = file.uri.lstrip("/") + def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]: + preserve_subdirectories_directories = self.preserve_subdirectories_directories() + if preserve_subdirectories_directories: + # Remove left slashes from source path format to make relative path for writing locally + file_relative_path = file.uri.lstrip("/") + else: + file_relative_path = path.basename(file.uri) local_file_path = path.join(local_directory, file_relative_path) # Ensure the local directory exists diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index a5cae2e69..babb2825c 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -5,20 +5,24 @@ import asyncio import itertools import traceback +from collections import defaultdict from copy import deepcopy from functools import cache -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union +from os import path +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, FailureType, Level from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.exceptions import ( + DuplicatedFilesError, FileBasedSourceError, InvalidSchemaError, MissingSchemaError, RecordParseError, SchemaInferenceError, StopSyncPerValidationPolicy, + format_duplicate_files_error_message, ) from airbyte_cdk.sources.file_based.file_types import FileTransfer from airbyte_cdk.sources.file_based.remote_file import RemoteFile @@ -43,6 +47,8 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): """ FILE_TRANSFER_KW = "use_file_transfer" + PRESERVE_SUBDIRECTORIES_KW = "preserve_subdirectories_directories" + FILES_KEY = "files" DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" ab_last_mod_col = "_ab_source_file_last_modified" ab_file_name_col = "_ab_source_file_url" @@ -50,10 +56,14 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): source_file_url = "source_file_url" airbyte_columns = [ab_last_mod_col, ab_file_name_col] use_file_transfer = False + preserve_subdirectories_directories = True def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) + self.preserve_subdirectories_directories = kwargs.pop( + self.PRESERVE_SUBDIRECTORIES_KW, True + ) super().__init__(**kwargs) @property @@ -98,15 +108,43 @@ def _filter_schema_invalid_properties( else: return super()._filter_schema_invalid_properties(configured_catalog_json_schema) + def _duplicated_files_names( + self, slices: List[dict[str, List[RemoteFile]]] + ) -> List[dict[str, List[str]]]: + seen_file_names = set() + duplicates_file_names = set() + file_paths = defaultdict(list) + for file_slice in slices: + for file_found in file_slice[self.FILES_KEY]: + file_name = path.basename(file_found.uri) + if file_name not in seen_file_names: + seen_file_names.add(file_name) + else: + duplicates_file_names.add(file_name) + file_paths[file_name].append(file_found.uri) + return [ + {duplicated_file: file_paths[duplicated_file]} + for duplicated_file in duplicates_file_names + ] + def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: # Sort files by last_modified, uri and return them grouped by last_modified all_files = self.list_files() files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) slices = [ - {"files": list(group[1])} + {self.FILES_KEY: list(group[1])} for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) ] + if slices and not self.preserve_subdirectories_directories: + duplicated_files_names = self._duplicated_files_names(slices) + if duplicated_files_names: + raise DuplicatedFilesError( + format_duplicate_files_error_message( + stream_name=self.name, duplicated_files_names=duplicated_files_names + ), + stream=self.name, + ) return slices def transform_record( diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index 2f4f02cf8..043058501 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -523,7 +523,13 @@ "const": "use_file_transfer", "enum": ["use_file_transfer"], "type": "string", - } + }, + "preserve_subdirectories_directories": { + "default": True, + "description": "If enabled replicate source folder structure", + "title": "Preserve Subdirectories in File Paths", + "type": "boolean", + }, }, "description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.", "required": ["delivery_type"],