Skip to content

Commit 8764296

Browse files
authored
chore: Remove cursors, stream slicing, and resumable full refresh from declarative Retrievers (#827)
1 parent 18a0218 commit 8764296

File tree

9 files changed

+194
-675
lines changed

9 files changed

+194
-675
lines changed

airbyte_cdk/legacy/sources/declarative/declarative_stream.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from dataclasses import InitVar, dataclass, field
66
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
77

8+
from typing_extensions import deprecated
9+
810
from airbyte_cdk.legacy.sources.declarative.incremental import (
911
GlobalSubstreamCursor,
1012
PerPartitionCursor,
@@ -13,7 +15,6 @@
1315
from airbyte_cdk.models import SyncMode
1416
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1517
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
16-
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
1718
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
1819
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1920
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
@@ -28,6 +29,7 @@
2829
from airbyte_cdk.sources.types import Config, StreamSlice
2930

3031

32+
@deprecated("DeclarativeStream has been deprecated in favor of the concurrent DefaultStream")
3133
@dataclass
3234
class DeclarativeStream(Stream):
3335
"""
@@ -198,8 +200,6 @@ def state_checkpoint_interval(self) -> Optional[int]:
198200
return None
199201

200202
def get_cursor(self) -> Optional[Cursor]:
201-
if self.retriever and isinstance(self.retriever, SimpleRetriever):
202-
return self.retriever.cursor
203203
return None
204204

205205
def _get_checkpoint_reader(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@
3535
from airbyte_cdk.connector_builder.models import (
3636
LogMessage as ConnectorBuilderLogMessage,
3737
)
38-
from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream
39-
from airbyte_cdk.legacy.sources.declarative.incremental import (
40-
DatetimeBasedCursor,
41-
)
4238
from airbyte_cdk.models import (
4339
AirbyteStateBlob,
4440
AirbyteStateMessage,
@@ -740,7 +736,6 @@ def _init_mappings(self) -> None:
740736
CustomTransformationModel: self.create_custom_component,
741737
CustomValidationStrategyModel: self.create_custom_component,
742738
CustomConfigTransformationModel: self.create_custom_component,
743-
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
744739
DeclarativeStreamModel: self.create_default_stream,
745740
DefaultErrorHandlerModel: self.create_default_error_handler,
746741
DefaultPaginatorModel: self.create_default_paginator,
@@ -763,7 +758,6 @@ def _init_mappings(self) -> None:
763758
FlattenFieldsModel: self.create_flatten_fields,
764759
DpathFlattenFieldsModel: self.create_dpath_flatten_fields,
765760
IterableDecoderModel: self.create_iterable_decoder,
766-
IncrementingCountCursorModel: self.create_incrementing_count_cursor,
767761
XmlDecoderModel: self.create_xml_decoder,
768762
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
769763
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
@@ -1931,64 +1925,6 @@ def _create_nested_component(
19311925
def _is_component(model_value: Any) -> bool:
19321926
return isinstance(model_value, dict) and model_value.get("type") is not None
19331927

1934-
def create_datetime_based_cursor(
1935-
self, model: DatetimeBasedCursorModel, config: Config, **kwargs: Any
1936-
) -> DatetimeBasedCursor:
1937-
start_datetime: Union[str, MinMaxDatetime] = (
1938-
model.start_datetime
1939-
if isinstance(model.start_datetime, str)
1940-
else self.create_min_max_datetime(model.start_datetime, config)
1941-
)
1942-
end_datetime: Union[str, MinMaxDatetime, None] = None
1943-
if model.is_data_feed and model.end_datetime:
1944-
raise ValueError("Data feed does not support end_datetime")
1945-
if model.is_data_feed and model.is_client_side_incremental:
1946-
raise ValueError(
1947-
"`Client side incremental` cannot be applied with `data feed`. Choose only 1 from them."
1948-
)
1949-
if model.end_datetime:
1950-
end_datetime = (
1951-
model.end_datetime
1952-
if isinstance(model.end_datetime, str)
1953-
else self.create_min_max_datetime(model.end_datetime, config)
1954-
)
1955-
1956-
end_time_option = (
1957-
self._create_component_from_model(
1958-
model.end_time_option, config, parameters=model.parameters or {}
1959-
)
1960-
if model.end_time_option
1961-
else None
1962-
)
1963-
start_time_option = (
1964-
self._create_component_from_model(
1965-
model.start_time_option, config, parameters=model.parameters or {}
1966-
)
1967-
if model.start_time_option
1968-
else None
1969-
)
1970-
1971-
return DatetimeBasedCursor(
1972-
cursor_field=model.cursor_field,
1973-
cursor_datetime_formats=model.cursor_datetime_formats
1974-
if model.cursor_datetime_formats
1975-
else [],
1976-
cursor_granularity=model.cursor_granularity,
1977-
datetime_format=model.datetime_format,
1978-
end_datetime=end_datetime,
1979-
start_datetime=start_datetime,
1980-
step=model.step,
1981-
end_time_option=end_time_option,
1982-
lookback_window=model.lookback_window,
1983-
start_time_option=start_time_option,
1984-
partition_field_end=model.partition_field_end,
1985-
partition_field_start=model.partition_field_start,
1986-
message_repository=self._message_repository,
1987-
is_compare_strictly=model.is_compare_strictly,
1988-
config=config,
1989-
parameters=model.parameters or {},
1990-
)
1991-
19921928
def create_default_stream(
19931929
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
19941930
) -> AbstractStream:
@@ -2652,24 +2588,6 @@ def create_gzip_decoder(
26522588
fallback_parser=gzip_parser.inner_parser,
26532589
)
26542590

2655-
# todo: This method should be removed once we deprecate the SimpleRetriever.cursor field and the various
2656-
# state methods
2657-
@staticmethod
2658-
def create_incrementing_count_cursor(
2659-
model: IncrementingCountCursorModel, config: Config, **kwargs: Any
2660-
) -> DatetimeBasedCursor:
2661-
# This should not actually get used anywhere at runtime, but needed to add this to pass checks since
2662-
# we still parse models into components. The issue is that there's no runtime implementation of a
2663-
# IncrementingCountCursor.
2664-
# A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor.
2665-
return DatetimeBasedCursor(
2666-
cursor_field=model.cursor_field,
2667-
datetime_format="%Y-%m-%d",
2668-
start_datetime="2024-12-12",
2669-
config=config,
2670-
parameters={},
2671-
)
2672-
26732591
@staticmethod
26742592
def create_iterable_decoder(
26752593
model: IterableDecoderModel, config: Config, **kwargs: Any
@@ -3451,7 +3369,6 @@ def _get_url(req: Requester) -> str:
34513369
record_selector=record_selector,
34523370
stream_slicer=_NO_STREAM_SLICING,
34533371
request_option_provider=request_options_provider,
3454-
cursor=None,
34553372
config=config,
34563373
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
34573374
parameters=model.parameters or {},
@@ -3472,7 +3389,6 @@ def _get_url(req: Requester) -> str:
34723389
record_selector=record_selector,
34733390
stream_slicer=_NO_STREAM_SLICING,
34743391
request_option_provider=request_options_provider,
3475-
cursor=None,
34763392
config=config,
34773393
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
34783394
additional_query_properties=query_properties,
@@ -3565,7 +3481,7 @@ def create_state_delegating_stream(
35653481
config: Config,
35663482
has_parent_state: Optional[bool] = None,
35673483
**kwargs: Any,
3568-
) -> DeclarativeStream:
3484+
) -> DefaultStream:
35693485
if (
35703486
model.full_refresh_stream.name != model.name
35713487
or model.name != model.incremental_stream.name

airbyte_cdk/sources/declarative/retrievers/async_retriever.py

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
)
1212
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1313
from airbyte_cdk.sources.streams.core import StreamData
14-
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
14+
from airbyte_cdk.sources.types import Config, StreamSlice
1515
from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger
1616

1717

@@ -59,30 +59,6 @@ def exit_on_rate_limit(self, value: bool) -> None:
5959
if job_orchestrator is not None:
6060
job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment]
6161

62-
@property
63-
def state(self) -> StreamState:
64-
"""
65-
As a first iteration for sendgrid, there is no state to be managed
66-
"""
67-
return {}
68-
69-
@state.setter
70-
def state(self, value: StreamState) -> None:
71-
"""
72-
As a first iteration for sendgrid, there is no state to be managed
73-
"""
74-
pass
75-
76-
def _get_stream_state(self) -> StreamState:
77-
"""
78-
Gets the current state of the stream.
79-
80-
Returns:
81-
StreamState: Mapping[str, Any]
82-
"""
83-
84-
return self.state
85-
8662
def _validate_and_get_stream_slice_jobs(
8763
self, stream_slice: Optional[StreamSlice] = None
8864
) -> Iterable[AsyncJob]:
@@ -101,9 +77,6 @@ def _validate_and_get_stream_slice_jobs(
10177
"""
10278
return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
10379

104-
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
105-
yield from self.stream_slicer.stream_slices()
106-
10780
def read_records(
10881
self,
10982
records_schema: Mapping[str, Any],
@@ -112,13 +85,12 @@ def read_records(
11285
# emit the slice_descriptor log message, for connector builder TestRead
11386
yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore
11487

115-
stream_state: StreamState = self._get_stream_state()
11688
jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
11789
records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
11890

11991
yield from self.record_selector.filter_and_transform(
12092
all_data=records,
121-
stream_state=stream_state,
93+
stream_state={}, # stream_state as an interpolation context is deprecated
12294
records_schema=records_schema,
12395
stream_slice=stream_slice,
12496
)

airbyte_cdk/sources/declarative/retrievers/retriever.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,29 +30,27 @@ def read_records(
3030
:return: The records read from the API source
3131
"""
3232

33-
@abstractmethod
3433
@deprecated("Stream slicing is being moved to the stream level.")
3534
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
36-
"""Returns the stream slices"""
35+
"""Does nothing as this method is deprecated, so underlying Retriever implementations
36+
do not need to implement this.
37+
"""
38+
yield from []
3739

3840
@property
39-
@abstractmethod
4041
@deprecated("State management is being moved to the stream level.")
4142
def state(self) -> StreamState:
42-
"""State getter, should return state in form that can serialized to a string and send to the output
43-
as a STATE AirbyteMessage.
44-
45-
A good example of a state is a cursor_value:
46-
{
47-
self.cursor_field: "cursor_value"
48-
}
49-
50-
State should try to be as small as possible but at the same time descriptive enough to restore
51-
syncing process from the point where it stopped.
5243
"""
44+
Does nothing as this method is deprecated, so underlying Retriever implementations
45+
do not need to implement this.
46+
"""
47+
return {}
5348

5449
@state.setter
55-
@abstractmethod
5650
@deprecated("State management is being moved to the stream level.")
5751
def state(self, value: StreamState) -> None:
58-
"""State setter, accept state serialized by state getter."""
52+
"""
53+
Does nothing as this method is deprecated, so underlying Retriever implementations
54+
do not need to implement this.
55+
"""
56+
pass

0 commit comments

Comments
 (0)