Skip to content

Commit c67570b

Browse files
darynaishchenkooctavia-squidington-iiicoderabbitai[bot]
authored
chore(file-based-cdk): add UploadableRemoteFile, refactor upload method (#780)
Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 7ab013d commit c67570b

File tree

4 files changed

+165
-19
lines changed

4 files changed

+165
-19
lines changed

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
#
44

55
import logging
6+
import time
67
from abc import ABC, abstractmethod
78
from datetime import datetime
89
from enum import Enum
910
from io import IOBase
1011
from os import makedirs, path
11-
from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple
12+
from typing import Any, Iterable, List, MutableMapping, Optional, Set, Tuple
1213

14+
from airbyte_protocol_dataclasses.models import FailureType
1315
from wcmatch.glob import GLOBSTAR, globmatch
1416

1517
from airbyte_cdk.models import AirbyteRecordMessageFileReference
@@ -19,8 +21,9 @@
1921
preserve_directory_structure,
2022
use_file_transfer,
2123
)
24+
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
2225
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
23-
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
26+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile
2427

2528

2629
class FileReadMode(Enum):
@@ -34,6 +37,7 @@ class AbstractFileBasedStreamReader(ABC):
3437
FILE_NAME = "file_name"
3538
LOCAL_FILE_PATH = "local_file_path"
3639
FILE_FOLDER = "file_folder"
40+
FILE_SIZE_LIMIT = 1_500_000_000
3741

3842
def __init__(self) -> None:
3943
self._config = None
@@ -113,16 +117,6 @@ def filter_files_by_globs_and_start_date(
113117
seen.add(file.uri)
114118
yield file
115119

116-
@abstractmethod
117-
def file_size(self, file: RemoteFile) -> int:
118-
"""Utility method to get size of the remote file.
119-
120-
This is required for connectors that will support writing to
121-
files. If the connector does not support writing files, then the
122-
subclass can simply `return 0`.
123-
"""
124-
...
125-
126120
@staticmethod
127121
def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
128122
# Use the GLOBSTAR flag to enable recursive ** matching
@@ -153,9 +147,8 @@ def include_identities_stream(self) -> bool:
153147
return include_identities_stream(self.config)
154148
return False
155149

156-
@abstractmethod
157150
def upload(
158-
self, file: RemoteFile, local_directory: str, logger: logging.Logger
151+
self, file: UploadableRemoteFile, local_directory: str, logger: logging.Logger
159152
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
160153
"""
161154
This is required for connectors that will support writing to
@@ -173,7 +166,53 @@ def upload(
173166
- file_size_bytes (int): The size of the referenced file in bytes.
174167
- source_file_relative_path (str): The relative path to the referenced file in source.
175168
"""
176-
...
169+
if not isinstance(file, UploadableRemoteFile):
170+
raise TypeError(f"Expected UploadableRemoteFile, got {type(file)}")
171+
172+
file_size = file.size
173+
174+
if file_size > self.FILE_SIZE_LIMIT:
175+
message = f"File size exceeds the {self.FILE_SIZE_LIMIT / 1e9} GB limit."
176+
raise FileSizeLimitError(
177+
message=message, internal_message=message, failure_type=FailureType.config_error
178+
)
179+
180+
file_paths = self._get_file_transfer_paths(
181+
source_file_relative_path=file.source_file_relative_path,
182+
staging_directory=local_directory,
183+
)
184+
local_file_path = file_paths[self.LOCAL_FILE_PATH]
185+
file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
186+
file_name = file_paths[self.FILE_NAME]
187+
188+
logger.info(
189+
f"Starting to download the file {file.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
190+
)
191+
start_download_time = time.time()
192+
193+
file.download_to_local_directory(local_file_path)
194+
195+
write_duration = time.time() - start_download_time
196+
logger.info(
197+
f"Finished downloading the file {file.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
198+
)
199+
200+
file_record_data = FileRecordData(
201+
folder=file_paths[self.FILE_FOLDER],
202+
file_name=file_name,
203+
bytes=file_size,
204+
id=file.id,
205+
mime_type=file.mime_type,
206+
created_at=file.created_at,
207+
updated_at=file.updated_at,
208+
source_uri=file.uri,
209+
)
210+
file_reference = AirbyteRecordMessageFileReference(
211+
staging_file_url=local_file_path,
212+
source_file_relative_path=file_relative_path,
213+
file_size_bytes=file_size,
214+
)
215+
return file_record_data, file_reference
177216

178217
def _get_file_transfer_paths(
179218
self, source_file_relative_path: str, staging_directory: str

airbyte_cdk/sources/file_based/file_types/file_transfer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from airbyte_cdk.models import AirbyteRecordMessageFileReference
88
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
99
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
10-
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
10+
from airbyte_cdk.sources.file_based.remote_file import UploadableRemoteFile
1111
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1212

1313

@@ -17,7 +17,7 @@ def __init__(self) -> None:
1717

1818
def upload(
1919
self,
20-
file: RemoteFile,
20+
file: UploadableRemoteFile,
2121
stream_reader: AbstractFileBasedStreamReader,
2222
logger: logging.Logger,
2323
) -> Iterable[Tuple[FileRecordData, AirbyteRecordMessageFileReference]]:

airbyte_cdk/sources/file_based/remote_file.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
from abc import ABC, abstractmethod
55
from datetime import datetime
66
from typing import Optional
77

@@ -16,3 +16,42 @@ class RemoteFile(BaseModel):
1616
uri: str
1717
last_modified: datetime
1818
mime_type: Optional[str] = None
19+
20+
21+
class UploadableRemoteFile(RemoteFile, ABC):
22+
"""
23+
A file in a file-based stream that supports uploading(file transferring).
24+
"""
25+
26+
id: Optional[str] = None
27+
created_at: Optional[str] = None
28+
updated_at: Optional[str] = None
29+
30+
@property
31+
@abstractmethod
32+
def size(self) -> int:
33+
"""
34+
Returns the file size in bytes.
35+
"""
36+
...
37+
38+
@abstractmethod
39+
def download_to_local_directory(self, local_file_path: str) -> None:
40+
"""
41+
Download the file from remote source to local storage.
42+
"""
43+
...
44+
45+
@property
46+
def source_file_relative_path(self) -> str:
47+
"""
48+
Returns the relative path of the source file.
49+
"""
50+
return self.uri
51+
52+
@property
53+
def file_uri_for_logging(self) -> str:
54+
"""
55+
Returns the URI for the file being logged.
56+
"""
57+
return self.uri

unit_tests/sources/file_based/test_file_based_stream_reader.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
from io import IOBase
88
from os import path
99
from typing import Any, ClassVar, Dict, Iterable, List, Mapping, Optional, Set
10+
from unittest.mock import MagicMock
1011

1112
import pytest
1213
from pydantic.v1 import AnyUrl
1314

1415
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
16+
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
1517
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
16-
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
18+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile
1719
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1820
from unit_tests.sources.file_based.helpers import make_remote_files
1921

@@ -64,6 +66,38 @@
6466
}
6567

6668

69+
class TestStreamReaderWithDefaultUpload(AbstractFileBasedStreamReader):
70+
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name
71+
72+
@property
73+
def config(self) -> Optional[AbstractFileBasedSpec]:
74+
return self._config
75+
76+
@config.setter
77+
def config(self, value: AbstractFileBasedSpec) -> None:
78+
self._config = value
79+
80+
def get_matching_files(self, globs: List[str]) -> Iterable[RemoteFile]:
81+
pass
82+
83+
def open_file(self, file: RemoteFile) -> IOBase:
84+
pass
85+
86+
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
87+
return {}
88+
89+
def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
90+
return [{}]
91+
92+
@property
93+
def file_permissions_schema(self) -> Dict[str, Any]:
94+
return {"type": "object", "properties": {}}
95+
96+
@property
97+
def identities_schema(self) -> Dict[str, Any]:
98+
return {"type": "object", "properties": {}}
99+
100+
67101
class TestStreamReader(AbstractFileBasedStreamReader):
68102
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name
69103

@@ -458,3 +492,37 @@ def test_preserve_sub_directories_scenarios(
458492
assert file_paths[AbstractFileBasedStreamReader.LOCAL_FILE_PATH] == expected_local_file_path
459493
assert file_paths[AbstractFileBasedStreamReader.FILE_NAME] == path.basename(source_file_path)
460494
assert file_paths[AbstractFileBasedStreamReader.FILE_FOLDER] == path.dirname(source_file_path)
495+
496+
497+
def test_upload_with_file_transfer_reader():
498+
stream_reader = TestStreamReaderWithDefaultUpload()
499+
500+
class TestUploadableRemoteFile(UploadableRemoteFile):
501+
blob: Any
502+
503+
@property
504+
def size(self) -> int:
505+
return self.blob.size
506+
507+
def download_to_local_directory(self, local_file_path: str) -> None:
508+
pass
509+
510+
blob = MagicMock()
511+
blob.size = 200
512+
uploadable_remote_file = TestUploadableRemoteFile(
513+
uri="test/uri", last_modified=datetime.now(), blob=blob
514+
)
515+
516+
logger = logging.getLogger("airbyte")
517+
518+
file_record_data, file_reference = stream_reader.upload(
519+
uploadable_remote_file, "test_directory", logger
520+
)
521+
assert file_record_data
522+
assert file_reference
523+
524+
blob.size = 2_500_000_000
525+
with pytest.raises(FileSizeLimitError):
526+
stream_reader.upload(uploadable_remote_file, "test_directory", logger)
527+
with pytest.raises(FileSizeLimitError):
528+
stream_reader.upload(uploadable_remote_file, "test_directory", logger)

0 commit comments

Comments
 (0)