Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 202 additions & 114 deletions airbyte/_util/api_util.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions airbyte/cloud/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ def resolve_cloud_client_id(
return get_secret(constants.CLOUD_CLIENT_ID_ENV_VAR, default=input_value)


def resolve_cloud_bearer_token(
input_value: str | SecretString | None = None,
/,
) -> SecretString | None:
"""Get the Airbyte Cloud bearer token from the environment."""
return try_get_secret(constants.CLOUD_BEARER_TOKEN_ENV_VAR, default=input_value)


def resolve_cloud_api_url(
input_value: str | None = None,
/,
Expand Down
8 changes: 8 additions & 0 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def _fetch_connection_info(self) -> ConnectionResponse:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)

@classmethod
Expand Down Expand Up @@ -180,6 +181,7 @@ def run_sync(
workspace_id=self.workspace.workspace_id,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
sync_result = SyncResult(
workspace=self.workspace,
Expand Down Expand Up @@ -242,6 +244,7 @@ def get_previous_sync_logs(
order_by=order_by,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return [
SyncResult(
Expand Down Expand Up @@ -298,6 +301,7 @@ def get_state_artifacts(self) -> list[dict[str, Any]] | None:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
if state_response.get("stateType") == "not_set":
return None
Expand All @@ -319,6 +323,7 @@ def get_catalog_artifact(self) -> dict[str, Any] | None:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return connection_response.get("syncCatalog")

Expand All @@ -336,6 +341,7 @@ def rename(self, name: str) -> CloudConnection:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
name=name,
)
self._connection_info = updated_response
Expand All @@ -355,6 +361,7 @@ def set_table_prefix(self, prefix: str) -> CloudConnection:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
prefix=prefix,
)
self._connection_info = updated_response
Expand All @@ -379,6 +386,7 @@ def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
configurations=configurations,
)
self._connection_info = updated_response
Expand Down
19 changes: 16 additions & 3 deletions airbyte/cloud/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def check(
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
check_result = CheckResult(
success=result[0],
Expand Down Expand Up @@ -197,6 +198,7 @@ def _fetch_connector_info(self) -> api_models.SourceResponse:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)

def rename(self, name: str) -> CloudSource:
Expand All @@ -213,6 +215,7 @@ def rename(self, name: str) -> CloudSource:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
name=name,
)
self._connector_info = updated_response
Expand All @@ -235,6 +238,7 @@ def update_config(self, config: dict[str, Any]) -> CloudSource:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
config=config,
)
self._connector_info = updated_response
Expand All @@ -254,7 +258,7 @@ def _from_source_response(
workspace=workspace,
connector_id=source_response.source_id,
)
result._connector_info = source_response # noqa: SLF001 # Accessing Non-Public API
result._connector_info = source_response
return result


Expand All @@ -279,6 +283,7 @@ def _fetch_connector_info(self) -> api_models.DestinationResponse:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)

def rename(self, name: str) -> CloudDestination:
Expand All @@ -295,6 +300,7 @@ def rename(self, name: str) -> CloudDestination:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
name=name,
)
self._connector_info = updated_response
Expand All @@ -317,6 +323,7 @@ def update_config(self, config: dict[str, Any]) -> CloudDestination:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
config=config,
)
self._connector_info = updated_response
Expand All @@ -336,7 +343,7 @@ def _from_destination_response(
workspace=workspace,
connector_id=destination_response.destination_id,
)
result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API
result._connector_info = destination_response
return result


Expand Down Expand Up @@ -377,6 +384,7 @@ def _fetch_definition_info(
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
raise NotImplementedError(
"Docker custom source definitions are not yet supported. "
Expand Down Expand Up @@ -463,6 +471,7 @@ def connector_builder_project_id(self) -> str | None:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
)

Expand Down Expand Up @@ -510,6 +519,7 @@ def permanently_delete(
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
safe_mode=safe_mode,
)
else:
Expand Down Expand Up @@ -579,6 +589,7 @@ def update_definition(
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return CustomCloudSourceDefinition._from_yaml_response(self.workspace, result)

Expand Down Expand Up @@ -636,7 +647,7 @@ def _from_yaml_response(
definition_id=response.id,
definition_type="yaml",
)
result._definition_info = response # noqa: SLF001
result._definition_info = response
return result

def deploy_source(
Expand Down Expand Up @@ -685,6 +696,7 @@ def deploy_source(
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return CloudSource._from_source_response( # noqa: SLF001 # Accessing Non-Public API
workspace=self.workspace,
Expand Down Expand Up @@ -754,6 +766,7 @@ def set_testing_values(
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)

return self
5 changes: 5 additions & 0 deletions airbyte/cloud/sync_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResp
connection_id=self.connection.connection_id,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._connection_response

Expand All @@ -266,6 +267,7 @@ def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return asdict(destination_response.configuration)

Expand All @@ -287,6 +289,7 @@ def _fetch_latest_job_info(self) -> JobResponse:
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._latest_job_info

Expand All @@ -313,6 +316,7 @@ def start_time(self) -> datetime:
json={"id": self.job_id},
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
raw_start_time = job_info_raw.get("startTime")
if raw_start_time:
Expand All @@ -332,6 +336,7 @@ def _fetch_job_with_attempts(self) -> dict[str, Any]:
},
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._job_with_attempts_info

Expand Down
Loading
Loading