Skip to content

Commit e1664ec

Browse files
maxi297octavia-squidington-iii
andauthored
chore: have declarative availability check support AbstractStream (#686)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent e849b98 commit e1664ec

File tree

16 files changed

+176
-261
lines changed

16 files changed

+176
-261
lines changed

airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
#
44

55
import logging
6-
import traceback
76
from dataclasses import InitVar, dataclass
8-
from typing import Any, List, Mapping, Tuple
7+
from typing import Any, List, Mapping, Tuple, Union
98

10-
from airbyte_cdk import AbstractSource
9+
from airbyte_cdk.sources.abstract_source import AbstractSource
10+
from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
12+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1213
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1314

1415

@@ -34,20 +35,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3435
def check_connection(
3536
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
3637
) -> Tuple[bool, Any]:
37-
streams = source.streams(config=config)
38+
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
3839

3940
if len(streams) == 0:
4041
return False, f"No streams to connect to from source {source}"
4142
if not self.use_check_availability:
4243
return True, None
4344

44-
availability_strategy = HttpAvailabilityStrategy()
45-
4645
try:
4746
for stream in streams[: min(self.stream_count, len(streams))]:
48-
stream_is_available, reason = availability_strategy.check_availability(
49-
stream, logger
50-
)
47+
stream_is_available, reason = evaluate_availability(stream, logger)
5148
if not stream_is_available:
5249
logger.warning(f"Stream {stream.name} is not available: {reason}")
5350
return False, reason

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,30 @@
55
import logging
66
import traceback
77
from dataclasses import InitVar, dataclass
8-
from typing import Any, Dict, List, Mapping, Optional, Tuple
8+
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
99

10-
from airbyte_cdk import AbstractSource
10+
from airbyte_cdk.sources.abstract_source import AbstractSource
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
12+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
13+
from airbyte_cdk.sources.streams.core import Stream
1214
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1315

1416

17+
def evaluate_availability(
18+
stream: Union[Stream, AbstractStream], logger: logging.Logger
19+
) -> Tuple[bool, Optional[str]]:
20+
"""
21+
As a transition period, we want to support both Stream and AbstractStream until we migrate everything to AbstractStream.
22+
"""
23+
if isinstance(stream, Stream):
24+
return HttpAvailabilityStrategy().check_availability(stream, logger)
25+
elif isinstance(stream, AbstractStream):
26+
availability = stream.check_availability()
27+
return availability.is_available, availability.reason
28+
else:
29+
raise ValueError(f"Unsupported stream type {type(stream)}")
30+
31+
1532
@dataclass(frozen=True)
1633
class DynamicStreamCheckConfig:
1734
"""Defines the configuration for dynamic stream during connection checking. This class specifies
@@ -51,7 +68,7 @@ def check_connection(
5168
) -> Tuple[bool, Any]:
5269
"""Checks the connection to the source and its streams."""
5370
try:
54-
streams = source.streams(config=config)
71+
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
5572
if not streams:
5673
return False, f"No streams to connect to from source {source}"
5774
except Exception as error:
@@ -82,13 +99,15 @@ def check_connection(
8299
return True, None
83100

84101
def _check_stream_availability(
85-
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
102+
self,
103+
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
104+
stream_name: str,
105+
logger: logging.Logger,
86106
) -> Tuple[bool, Any]:
87107
"""Checks if streams are available."""
88-
availability_strategy = HttpAvailabilityStrategy()
89108
try:
90109
stream = stream_name_to_stream[stream_name]
91-
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
110+
stream_is_available, reason = evaluate_availability(stream, logger)
92111
if not stream_is_available:
93112
message = f"Stream {stream_name} is not available: {reason}"
94113
logger.warning(message)
@@ -98,7 +117,10 @@ def _check_stream_availability(
98117
return True, None
99118

100119
def _check_dynamic_streams_availability(
101-
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
120+
self,
121+
source: AbstractSource,
122+
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
123+
logger: logging.Logger,
102124
) -> Tuple[bool, Any]:
103125
"""Checks the availability of dynamic streams."""
104126
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
@@ -135,18 +157,15 @@ def _map_generated_streams(
135157
def _check_generated_streams_availability(
136158
self,
137159
generated_streams: List[Dict[str, Any]],
138-
stream_name_to_stream: Dict[str, Any],
160+
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
139161
logger: logging.Logger,
140162
max_count: int,
141163
) -> Tuple[bool, Any]:
142164
"""Checks availability of generated dynamic streams."""
143-
availability_strategy = HttpAvailabilityStrategy()
144165
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
145166
stream = stream_name_to_stream[declarative_stream["name"]]
146167
try:
147-
stream_is_available, reason = availability_strategy.check_availability(
148-
stream, logger
149-
)
168+
stream_is_available, reason = evaluate_availability(stream, logger)
150169
if not stream_is_available:
151170
message = f"Dynamic Stream {stream.name} is not available: {reason}"
152171
logger.warning(message)

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@
5252
from airbyte_cdk.sources.streams import Stream
5353
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
5454
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
55-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
56-
AlwaysAvailableAvailabilityStrategy,
57-
)
5855
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
5956
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
6057
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
@@ -325,7 +322,6 @@ def _group_streams(
325322
partition_generator=partition_generator,
326323
name=declarative_stream.name,
327324
json_schema=declarative_stream.get_json_schema(),
328-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
329325
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
330326
cursor_field=cursor.cursor_field.cursor_field_key
331327
if hasattr(cursor, "cursor_field")
@@ -362,7 +358,6 @@ def _group_streams(
362358
partition_generator=partition_generator,
363359
name=declarative_stream.name,
364360
json_schema=declarative_stream.get_json_schema(),
365-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
366361
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
367362
cursor_field=None,
368363
logger=self.logger,
@@ -417,7 +412,6 @@ def _group_streams(
417412
partition_generator=partition_generator,
418413
name=declarative_stream.name,
419414
json_schema=declarative_stream.get_json_schema(),
420-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
421415
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
422416
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
423417
logger=self.logger,
Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
from .abstract_file_based_availability_strategy import (
2-
AbstractFileBasedAvailabilityStrategy,
3-
AbstractFileBasedAvailabilityStrategyWrapper,
4-
)
1+
from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy
52
from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
63

74
__all__ = [
85
"AbstractFileBasedAvailabilityStrategy",
9-
"AbstractFileBasedAvailabilityStrategyWrapper",
106
"DefaultFileBasedAvailabilityStrategy",
117
]

airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,6 @@
1010

1111
from airbyte_cdk.sources import Source
1212
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
13-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
14-
AbstractAvailabilityStrategy,
15-
StreamAvailability,
16-
StreamAvailable,
17-
StreamUnavailable,
18-
)
1913
from airbyte_cdk.sources.streams.core import Stream
2014

2115
if TYPE_CHECKING:
@@ -28,7 +22,7 @@ def check_availability( # type: ignore[override] # Signature doesn't match bas
2822
self,
2923
stream: Stream,
3024
logger: logging.Logger,
31-
_: Optional[Source],
25+
source: Optional[Source] = None,
3226
) -> Tuple[bool, Optional[str]]:
3327
"""
3428
Perform a connection check for the stream.
@@ -51,23 +45,3 @@ def check_availability_and_parsability(
5145
Returns (True, None) if successful, otherwise (False, <error message>).
5246
"""
5347
...
54-
55-
56-
class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy):
57-
def __init__(self, stream: AbstractFileBasedStream) -> None:
58-
self.stream = stream
59-
60-
def check_availability(self, logger: logging.Logger) -> StreamAvailability:
61-
is_available, reason = self.stream.availability_strategy.check_availability(
62-
self.stream, logger, None
63-
)
64-
if is_available:
65-
return StreamAvailable()
66-
return StreamUnavailable(reason or "")
67-
68-
def check_availability_and_parsability(
69-
self, logger: logging.Logger
70-
) -> Tuple[bool, Optional[str]]:
71-
return self.stream.availability_strategy.check_availability_and_parsability(
72-
self.stream, logger, None
73-
)

airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
179179
)
180180

181181
@cached_property
182-
@deprecated("Deprecated as of CDK version 3.7.0.")
183182
def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
184183
return self._availability_strategy
185184

airbyte_cdk/sources/file_based/stream/concurrent/adapters.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
2222
from airbyte_cdk.sources.file_based.availability_strategy import (
2323
AbstractFileBasedAvailabilityStrategy,
24-
AbstractFileBasedAvailabilityStrategyWrapper,
2524
)
2625
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
2726
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
@@ -97,7 +96,6 @@ def create_from_stream(
9796
),
9897
name=stream.name,
9998
json_schema=stream.get_json_schema(),
100-
availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream),
10199
primary_key=pk,
102100
cursor_field=cursor_field,
103101
logger=logger,

airbyte_cdk/sources/streams/availability_strategy.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from airbyte_cdk.sources import Source
1515

1616

17+
# FIXME this
1718
class AvailabilityStrategy(ABC):
1819
"""
1920
Abstract base class for checking stream availability.

airbyte_cdk/sources/streams/concurrent/abstract_stream.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,6 @@ def cursor_field(self) -> Optional[str]:
6464
:return: The name of the field used as a cursor. Nested cursor fields are not supported.
6565
"""
6666

67-
@abstractmethod
68-
def check_availability(self) -> StreamAvailability:
69-
"""
70-
:return: The stream's availability
71-
"""
72-
7367
@abstractmethod
7468
def get_json_schema(self) -> Mapping[str, Any]:
7569
"""
@@ -94,3 +88,9 @@ def cursor(self) -> Cursor:
9488
"""
9589
:return: The cursor associated with this stream.
9690
"""
91+
92+
@abstractmethod
93+
def check_availability(self) -> StreamAvailability:
94+
"""
95+
:return: If the stream is available and if not, why
96+
"""

airbyte_cdk/sources/streams/concurrent/adapters.py

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,7 @@
2424
from airbyte_cdk.sources.message import MessageRepository
2525
from airbyte_cdk.sources.source import ExperimentalClassWarning
2626
from airbyte_cdk.sources.streams import Stream
27-
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
2827
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
29-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
30-
AbstractAvailabilityStrategy,
31-
AlwaysAvailableAvailabilityStrategy,
32-
)
3328
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
3429
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
3530
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
@@ -101,7 +96,6 @@ def create_from_stream(
10196
name=stream.name,
10297
namespace=stream.namespace,
10398
json_schema=stream.get_json_schema(),
104-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
10599
primary_key=pk,
106100
cursor_field=cursor_field,
107101
logger=logger,
@@ -210,18 +204,6 @@ def get_json_schema(self) -> Mapping[str, Any]:
210204
def supports_incremental(self) -> bool:
211205
return self._legacy_stream.supports_incremental
212206

213-
def check_availability(
214-
self, logger: logging.Logger, source: Optional["Source"] = None
215-
) -> Tuple[bool, Optional[str]]:
216-
"""
217-
Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
218-
:param logger: (ignored)
219-
:param source: (ignored)
220-
:return:
221-
"""
222-
availability = self._abstract_stream.check_availability()
223-
return availability.is_available(), availability.message()
224-
225207
def as_airbyte_stream(self) -> AirbyteStream:
226208
return self._abstract_stream.as_airbyte_stream()
227209

@@ -370,28 +352,3 @@ def generate(self) -> Iterable[Partition]:
370352
self._cursor_field,
371353
self._state,
372354
)
373-
374-
375-
@deprecated(
376-
"Availability strategy has been soft deprecated. Do not use. Class is subject to removal",
377-
category=ExperimentalClassWarning,
378-
)
379-
class AvailabilityStrategyFacade(AvailabilityStrategy):
380-
def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
381-
self._abstract_availability_strategy = abstract_availability_strategy
382-
383-
def check_availability(
384-
self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
385-
) -> Tuple[bool, Optional[str]]:
386-
"""
387-
Checks stream availability.
388-
389-
Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
390-
391-
:param stream: (unused)
392-
:param logger: logger object to use
393-
:param source: (unused)
394-
:return: A tuple of (boolean, str). If boolean is true, then the stream
395-
"""
396-
stream_availability = self._abstract_availability_strategy.check_availability(logger)
397-
return stream_availability.is_available(), stream_availability.message()

0 commit comments

Comments
 (0)