diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 482c7ddf2..958828aaf 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -79,21 +79,49 @@ def get_web_url_root(api_root: str) -> str: def get_airbyte_server_instance( - *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> airbyte_api.AirbyteAPI: - """Get an Airbyte instance.""" - return airbyte_api.AirbyteAPI( - security=models.Security( + """Get an Airbyte instance. + + Supports authentication via either OAuth2 client credentials or bearer token. + Exactly one authentication method must be provided. + + In future, we may consider falling back from bearer token to client credentials + if bearer token is provided but expired. For now, we fail if both auth + modes are provided. + """ + has_client_creds = client_id is not None and client_secret is not None + has_bearer = bearer_token is not None + + if has_client_creds and has_bearer: + raise PyAirbyteInputError( + message="Cannot provide both client credentials and bearer token. " + "Please use only one authentication method.", + ) + + if not has_client_creds and not has_bearer: + raise PyAirbyteInputError( + message="Must provide either client credentials (client_id + client_secret) " + "or bearer_token for authentication.", + ) + + if has_bearer: + security = models.Security(bearer_auth=str(bearer_token)) + else: + security = models.Security( client_credentials=models.SchemeClientCredentials( client_id=client_id, client_secret=client_secret, token_url=api_root + "/applications/token", # e.g. https://api.airbyte.com/v1/applications/token ), - ), + ) + + return airbyte_api.AirbyteAPI( + security=security, server_url=api_root, ) @@ -105,14 +133,16 @@ def get_workspace( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.WorkspaceResponse: """Get a workspace object.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) response = airbyte_instance.workspaces.get_workspace( api.GetWorkspaceRequest( @@ -138,8 +168,9 @@ def list_connections( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.ConnectionResponse]: @@ -149,10 +180,10 @@ def list_connections( name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True) - _ = workspace_id # Not used (yet) airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) result: list[models.ConnectionResponse] = [] @@ -189,8 +220,9 @@ def list_workspaces( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.WorkspaceResponse]: @@ -200,10 +232,10 @@ def list_workspaces( name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True) - _ = workspace_id # Not used (yet) airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) result: list[models.WorkspaceResponse] = [] @@ -238,8 +270,9 @@ def list_sources( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.SourceResponse]: @@ -249,10 +282,11 @@ def list_sources( name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True) - _ = workspace_id # Not used (yet) + _ = workspace_id airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) result: list[models.SourceResponse] = [] @@ -286,8 +320,9 @@ def list_destinations( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.DestinationResponse]: @@ -297,10 +332,11 @@ def list_destinations( name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True) - _ = workspace_id # Not used (yet) + _ = workspace_id airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) result: list[models.DestinationResponse] = [] @@ -342,14 +378,15 @@ def get_connection( connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.ConnectionResponse: """Get a connection.""" - _ = workspace_id # Not used (yet) airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.connections.get_connection( @@ -372,8 +409,9 @@ def run_connection( connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.JobResponse: """Get a connection. @@ -381,10 +419,11 @@ def run_connection( If raise_on_failure is True, this will raise an exception if the connection fails. """ - _ = workspace_id # Not used (yet) + _ = workspace_id airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.jobs.create_job( @@ -408,14 +447,15 @@ def run_connection( # Get job info (logs) -def get_job_logs( +def get_job_logs( # noqa: PLR0913 workspace_id: str, connection_id: str, limit: int = 100, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, offset: int | None = None, order_by: str | None = None, ) -> list[models.JobResponse]: @@ -428,6 +468,7 @@ def get_job_logs( api_root: The API root URL. client_id: The client ID for authentication. client_secret: The client secret for authentication. + bearer_token: Optional bearer token for authentication (alternative to client credentials). offset: Number of jobs to skip from the beginning. Defaults to None (0). order_by: Field and direction to order by (e.g., "createdAt|DESC"). Defaults to None. @@ -437,6 +478,7 @@ def get_job_logs( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response: api.ListJobsResponse = airbyte_instance.jobs.list_jobs( @@ -465,13 +507,15 @@ def get_job_info( job_id: int, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.JobResponse: """Get a job.""" airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.jobs.get_job( @@ -499,8 +543,9 @@ def create_source( config: models.SourceConfiguration | dict[str, Any], definition_id: str | None = None, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.SourceResponse: """Create a source connector instance. @@ -509,6 +554,7 @@ def create_source( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response: api.CreateSourceResponse = airbyte_instance.sources.create_source( @@ -533,13 +579,15 @@ def get_source( source_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.SourceResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.sources.get_source( @@ -562,8 +610,9 @@ def delete_source( *, source_name: str | None = None, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, workspace_id: str | None = None, safe_mode: bool = True, ) -> None: @@ -576,6 +625,7 @@ def delete_source( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) workspace_id: The workspace ID (not currently used) safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. @@ -593,6 +643,7 @@ def delete_source( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) source_name = source_info.name @@ -610,10 +661,10 @@ def delete_source( "safe_mode": True, }, ) - airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.sources.delete_source( @@ -634,8 +685,9 @@ def patch_source( source_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, config: models.SourceConfiguration | dict[str, Any] | None = None, ) -> models.SourceResponse: @@ -643,21 +695,11 @@ def patch_source( This is a destructive operation that can break existing connections if the configuration is changed incorrectly. - - Args: - source_id: The ID of the source to update - api_root: The API root URL - client_id: Client ID for authentication - client_secret: Client secret for authentication - name: Optional new name for the source - config: Optional new configuration for the source - - Returns: - Updated SourceResponse object """ airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.sources.patch_source( @@ -712,13 +754,15 @@ def create_destination( workspace_id: str, config: DestinationConfiguration | dict[str, Any], api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DestinationResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) definition_id_override: str | None = None @@ -747,13 +791,15 @@ def get_destination( destination_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DestinationResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.destinations.get_destination( @@ -794,8 +840,9 @@ def delete_destination( *, destination_name: str | None = None, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, workspace_id: str | None = None, safe_mode: bool = True, ) -> None: @@ -808,6 +855,7 @@ def delete_destination( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) workspace_id: The workspace ID (not currently used) safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. @@ -825,6 +873,7 @@ def delete_destination( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) destination_name = destination_info.name @@ -843,10 +892,10 @@ def delete_destination( "safe_mode": True, }, ) - airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.destinations.delete_destination( @@ -867,8 +916,9 @@ def patch_destination( destination_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, config: DestinationConfiguration | dict[str, Any] | None = None, ) -> models.DestinationResponse: @@ -876,21 +926,11 @@ def patch_destination( This is a destructive operation that can break existing connections if the configuration is changed incorrectly. - - Args: - destination_id: The ID of the destination to update - api_root: The API root URL - client_id: Client ID for authentication - client_secret: Client secret for authentication - name: Optional new name for the destination - config: Optional new configuration for the destination - - Returns: - Updated DestinationResponse object """ airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.destinations.patch_destination( @@ -943,8 +983,9 @@ def create_connection( # noqa: PLR0913 # Too many arguments source_id: str, destination_id: str, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, workspace_id: str | None = None, prefix: str, selected_stream_names: list[str], @@ -953,6 +994,7 @@ def create_connection( # noqa: PLR0913 # Too many arguments airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) stream_configurations_obj = build_stream_configurations(selected_stream_names) @@ -982,8 +1024,9 @@ def get_connection_by_name( connection_name: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.ConnectionResponse: """Get a connection.""" connections = list_connections( @@ -991,6 +1034,7 @@ def get_connection_by_name( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) found: list[models.ConnectionResponse] = [ connection for connection in connections if connection.name == connection_name @@ -1033,8 +1077,9 @@ def delete_connection( *, api_root: str, workspace_id: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, safe_mode: bool = True, ) -> None: """Delete a connection. @@ -1047,6 +1092,7 @@ def delete_connection( workspace_id: The workspace ID client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. @@ -1062,6 +1108,7 @@ def delete_connection( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) connection_name = connection_info.name @@ -1085,6 +1132,7 @@ def delete_connection( airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.connections.delete_connection( @@ -1105,8 +1153,9 @@ def patch_connection( # noqa: PLR0913 # Too many arguments connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name: str | None = None, configurations: models.StreamConfigurationsInput | None = None, schedule: models.AirbyteAPIConnectionSchedule | None = None, @@ -1123,6 +1172,7 @@ def patch_connection( # noqa: PLR0913 # Too many arguments api_root: The API root URL client_id: Client ID for authentication client_secret: Client secret for authentication + bearer_token: Optional bearer token for authentication (alternative to client credentials) name: Optional new name for the connection configurations: Optional new stream configurations schedule: Optional new sync schedule @@ -1135,6 +1185,7 @@ def patch_connection( # noqa: PLR0913 # Too many arguments airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, api_root=api_root, ) response = airbyte_instance.connections.patch_connection( @@ -1164,7 +1215,7 @@ def patch_connection( # noqa: PLR0913 # Too many arguments # Functions for leveraging the Airbyte Config API (may not be supported or stable) -def get_bearer_token( +def get_new_bearer_token( *, client_id: SecretString, client_secret: SecretString, @@ -1197,15 +1248,17 @@ def _make_config_api_request( api_root: str, path: str, json: dict[str, Any], - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: config_api_root = get_config_api_root(api_root) - bearer_token = get_bearer_token( - client_id=client_id, - client_secret=client_secret, - api_root=api_root, - ) + if client_id and client_secret and not bearer_token: + bearer_token = get_new_bearer_token( + client_id=client_id, + client_secret=client_secret, + api_root=api_root, + ) headers: dict[str, Any] = { "Content-Type": "application/json", "Authorization": f"Bearer {bearer_token}", @@ -1245,8 +1298,9 @@ def check_connector( *, actor_id: str, connector_type: Literal["source", "destination"], - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, workspace_id: str | None = None, api_root: str = CLOUD_API_ROOT, ) -> tuple[bool, str | None]: @@ -1267,6 +1321,7 @@ def check_connector( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) result, message = json_result.get("status"), json_result.get("message") @@ -1330,14 +1385,16 @@ def create_custom_yaml_source_definition( workspace_id: str, manifest: dict[str, Any], api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DeclarativeSourceDefinitionResponse: """Create a custom YAML source definition.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request_body = models.CreateDeclarativeSourceDefinitionRequest( @@ -1363,14 +1420,16 @@ def list_custom_yaml_source_definitions( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> list[models.DeclarativeSourceDefinitionResponse]: """List all custom YAML source definitions in a workspace.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request = api.ListDeclarativeSourceDefinitionsRequest( @@ -1398,14 +1457,16 @@ def get_custom_yaml_source_definition( definition_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DeclarativeSourceDefinitionResponse: """Get a specific custom YAML source definition.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request = api.GetDeclarativeSourceDefinitionRequest( @@ -1436,14 +1497,16 @@ def update_custom_yaml_source_definition( *, manifest: dict[str, Any], api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> models.DeclarativeSourceDefinitionResponse: """Update a custom YAML source definition.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request_body = models.UpdateDeclarativeSourceDefinitionRequest( @@ -1477,8 +1540,9 @@ def delete_custom_yaml_source_definition( definition_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, safe_mode: bool = True, ) -> None: """Delete a custom YAML source definition. @@ -1489,6 +1553,7 @@ def delete_custom_yaml_source_definition( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) safe_mode: If True, requires the connector name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. @@ -1503,6 +1568,7 @@ def delete_custom_yaml_source_definition( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) connector_name = definition_info.name @@ -1529,6 +1595,7 @@ def delete_custom_yaml_source_definition( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) request = api.DeleteDeclarativeSourceDefinitionRequest( @@ -1553,8 +1620,9 @@ def get_connector_builder_project_for_definition_id( workspace_id: str, definition_id: str, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> str | None: """Get the connector builder project ID for a declarative source definition. @@ -1569,6 +1637,7 @@ def get_connector_builder_project_for_definition_id( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) Returns: The builder project ID if found, None otherwise (can be null in API response) @@ -1582,6 +1651,7 @@ def get_connector_builder_project_for_definition_id( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) return json_result.get("builderProjectId") @@ -1593,8 +1663,9 @@ def update_connector_builder_project_testing_values( testing_values: dict[str, Any], spec: dict[str, Any], api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: """Update the testing values for a connector builder project. @@ -1613,6 +1684,7 @@ def update_connector_builder_project_testing_values( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) Returns: The updated testing values from the API response @@ -1628,6 +1700,7 @@ def update_connector_builder_project_testing_values( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) @@ -1637,8 +1710,9 @@ def update_connector_builder_project_testing_values( def list_organizations_for_user( *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> list[models.OrganizationResponse]: """List all organizations accessible to the current user. @@ -1648,6 +1722,7 @@ def list_organizations_for_user( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) Returns: List of OrganizationResponse objects containing organization_id, organization_name, email @@ -1656,6 +1731,7 @@ def list_organizations_for_user( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) response = airbyte_instance.organizations.list_organizations_for_user() @@ -1675,8 +1751,9 @@ def list_workspaces_in_organization( organization_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, name_contains: str | None = None, max_items_limit: int | None = None, ) -> list[dict[str, Any]]: @@ -1689,6 +1766,7 @@ def list_workspaces_in_organization( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) name_contains: Optional substring filter for workspace names (server-side) max_items_limit: Optional maximum number of workspaces to return @@ -1717,6 +1795,7 @@ def list_workspaces_in_organization( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) workspaces = json_result.get("workspaces", []) @@ -1745,8 +1824,9 @@ def get_workspace_organization_info( workspace_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: """Get organization info for a workspace. @@ -1760,6 +1840,7 @@ def get_workspace_organization_info( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) Returns: Dictionary containing organization info: @@ -1774,6 +1855,7 @@ def get_workspace_organization_info( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) @@ -1781,8 +1863,9 @@ def get_connection_state( connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: """Get the state for a connection. @@ -1793,6 +1876,7 @@ def get_connection_state( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) Returns: Dictionary containing the connection state. @@ -1803,6 +1887,7 @@ def get_connection_state( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) @@ -1810,8 +1895,9 @@ def get_connection_catalog( connection_id: str, *, api_root: str, - client_id: SecretString, - client_secret: SecretString, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, ) -> dict[str, Any]: """Get the configured catalog for a connection. @@ -1825,6 +1911,7 @@ def get_connection_catalog( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret + bearer_token: Optional bearer token for authentication (alternative to client credentials) Returns: Dictionary containing the connection info with syncCatalog. @@ -1835,4 +1922,5 @@ def get_connection_catalog( api_root=api_root, client_id=client_id, client_secret=client_secret, + bearer_token=bearer_token, ) diff --git a/airbyte/cloud/auth.py b/airbyte/cloud/auth.py index 7925bdd95..edad4e028 100644 --- a/airbyte/cloud/auth.py +++ b/airbyte/cloud/auth.py @@ -22,6 +22,14 @@ def resolve_cloud_client_id( return get_secret(constants.CLOUD_CLIENT_ID_ENV_VAR, default=input_value) +def resolve_cloud_bearer_token( + input_value: str | SecretString | None = None, + /, +) -> SecretString | None: + """Get the Airbyte Cloud bearer token from the environment.""" + return try_get_secret(constants.CLOUD_BEARER_TOKEN_ENV_VAR, default=input_value) + + def resolve_cloud_api_url( input_value: str | None = None, /, diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index 95531f6eb..34a98322e 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -62,6 +62,7 @@ def _fetch_connection_info(self) -> ConnectionResponse: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) @classmethod @@ -180,6 +181,7 @@ def run_sync( workspace_id=self.workspace.workspace_id, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) sync_result = SyncResult( workspace=self.workspace, @@ -242,6 +244,7 @@ def get_previous_sync_logs( order_by=order_by, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return [ SyncResult( @@ -298,6 +301,7 @@ def get_state_artifacts(self) -> list[dict[str, Any]] | None: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) if state_response.get("stateType") == "not_set": return None @@ -319,6 +323,7 @@ def get_catalog_artifact(self) -> dict[str, Any] | None: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return connection_response.get("syncCatalog") @@ -336,6 +341,7 @@ def rename(self, name: str) -> CloudConnection: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, name=name, ) self._connection_info = updated_response @@ -355,6 +361,7 @@ def set_table_prefix(self, prefix: str) -> CloudConnection: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, prefix=prefix, ) self._connection_info = updated_response @@ -379,6 +386,7 @@ def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, configurations=configurations, ) self._connection_info = updated_response diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index 2707b4e74..4d3ec7b9d 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -165,6 +165,7 @@ def check( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) check_result = CheckResult( success=result[0], @@ -197,6 +198,7 @@ def _fetch_connector_info(self) -> api_models.SourceResponse: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) def rename(self, name: str) -> CloudSource: @@ -213,6 +215,7 @@ def rename(self, name: str) -> CloudSource: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, name=name, ) self._connector_info = updated_response @@ -235,6 +238,7 @@ def update_config(self, config: dict[str, Any]) -> CloudSource: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, config=config, ) self._connector_info = updated_response @@ -254,7 +258,7 @@ def _from_source_response( workspace=workspace, connector_id=source_response.source_id, ) - result._connector_info = source_response # noqa: SLF001 # Accessing Non-Public API + result._connector_info = source_response return result @@ -279,6 +283,7 @@ def _fetch_connector_info(self) -> api_models.DestinationResponse: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) def rename(self, name: str) -> CloudDestination: @@ -295,6 +300,7 @@ def rename(self, name: str) -> CloudDestination: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, name=name, ) self._connector_info = updated_response @@ -317,6 +323,7 @@ def update_config(self, config: dict[str, Any]) -> CloudDestination: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, config=config, ) self._connector_info = updated_response @@ -336,7 +343,7 @@ def _from_destination_response( workspace=workspace, connector_id=destination_response.destination_id, ) - result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API + result._connector_info = destination_response return result @@ -377,6 +384,7 @@ def _fetch_definition_info( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) raise NotImplementedError( "Docker custom source definitions are not yet supported. " @@ -463,6 +471,7 @@ def connector_builder_project_id(self) -> str | None: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) ) @@ -510,6 +519,7 @@ def permanently_delete( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, safe_mode=safe_mode, ) else: @@ -579,6 +589,7 @@ def update_definition( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return CustomCloudSourceDefinition._from_yaml_response(self.workspace, result) @@ -636,7 +647,7 @@ def _from_yaml_response( definition_id=response.id, definition_type="yaml", ) - result._definition_info = response # noqa: SLF001 + result._definition_info = response return result def deploy_source( @@ -685,6 +696,7 @@ def deploy_source( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return CloudSource._from_source_response( # noqa: SLF001 # Accessing Non-Public API workspace=self.workspace, @@ -754,6 +766,7 @@ def set_testing_values( api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return self diff --git a/airbyte/cloud/sync_results.py b/airbyte/cloud/sync_results.py index 931c33733..c59ae2012 100644 --- a/airbyte/cloud/sync_results.py +++ b/airbyte/cloud/sync_results.py @@ -255,6 +255,7 @@ def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResp connection_id=self.connection.connection_id, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return self._connection_response @@ -266,6 +267,7 @@ def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return asdict(destination_response.configuration) @@ -287,6 +289,7 @@ def _fetch_latest_job_info(self) -> JobResponse: api_root=self.workspace.api_root, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return self._latest_job_info @@ -313,6 +316,7 @@ def start_time(self) -> datetime: json={"id": self.job_id}, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) raw_start_time = job_info_raw.get("startTime") if raw_start_time: @@ -332,6 +336,7 @@ def _fetch_job_with_attempts(self) -> dict[str, Any]: }, client_id=self.workspace.client_id, client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) return self._job_with_attempts_info diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index 57b0bfd5b..881f0989b 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -6,6 +6,8 @@ ## Usage Examples +### Authentication with OAuth2 Client Credentials + Get a new workspace object and deploy a source to it: ```python @@ -31,6 +33,35 @@ # Permanently delete the newly-created source workspace.permanently_delete_source(deployed_source) ``` + + +Alternatively, authenticate using a bearer token directly: + +```python +import airbyte as ab +from airbyte import cloud + +workspace = cloud.CloudWorkspace( + workspace_id="...", + bearer_token="...", +) + +sources = workspace.list_sources() +``` + + +Generate a bearer token from client credentials: + +```python +workspace = cloud.CloudWorkspace( + workspace_id="...", + client_id="...", + client_secret="...", +) + +token = workspace.create_oauth_token() +print(f"Bearer token set: {token is not None}") +``` """ from __future__ import annotations @@ -82,17 +113,40 @@ class CloudWorkspace: By overriding `api_root`, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise. + + Authentication can be done via either OAuth2 client credentials or bearer token. + Provide either (client_id + client_secret) OR bearer_token, but not both. """ workspace_id: str - client_id: SecretString - client_secret: SecretString + client_id: SecretString | None = None + client_secret: SecretString | None = None + bearer_token: SecretString | None = None api_root: str = api_util.CLOUD_API_ROOT def __post_init__(self) -> None: - """Ensure that the client ID and secret are handled securely.""" - self.client_id = SecretString(self.client_id) - self.client_secret = SecretString(self.client_secret) + """Ensure that credentials are handled securely and validate auth method.""" + has_client_creds = self.client_id is not None and self.client_secret is not None + has_bearer = self.bearer_token is not None + + if has_client_creds and has_bearer: + raise exc.PyAirbyteInputError( + message="Cannot provide both client credentials and bearer token. " + "Please use only one authentication method.", + ) + + if not has_client_creds and not has_bearer: + raise exc.PyAirbyteInputError( + message="Must provide either client credentials (client_id + client_secret) " + "or bearer_token for authentication.", + ) + + if self.client_id is not None: + self.client_id = SecretString(self.client_id) + if self.client_secret is not None: + self.client_secret = SecretString(self.client_secret) + if self.bearer_token is not None: + self.bearer_token = SecretString(self.bearer_token) @property def workspace_url(self) -> str | None: @@ -111,6 +165,7 @@ def _organization_info(self) -> dict[str, Any]: api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) @overload @@ -193,9 +248,37 @@ def connect(self) -> None: workspace_id=self.workspace_id, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) print(f"Successfully connected to workspace: {self.workspace_url}") + def create_oauth_token(self) -> SecretString: + """Create and return an OAuth2 bearer token. + + This method generates a new bearer token using the workspace's client credentials. + The token can be used for subsequent API calls or shared with other applications. + + Returns: + SecretString containing the bearer token + + Raises: + PyAirbyteInputError: If the workspace is configured with a bearer token + instead of client credentials + """ + if not self.client_id or not self.client_secret: + raise exc.PyAirbyteInputError( + message=( + "This method requires client credentials (client_id + client_secret). " + "Cannot create OAuth token when workspace is configured with a bearer token." + ), + ) + + return api_util.get_new_bearer_token( + client_id=self.client_id, + client_secret=self.client_secret, + api_root=self.api_root, + ) + # Get sources, destinations, and connections def get_connection( @@ -282,6 +365,7 @@ def deploy_source( config=source_config_dict, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return CloudSource( workspace=self, @@ -337,6 +421,7 @@ def deploy_destination( config=destination_conf_dict, # Wants a dataclass but accepts dict client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return CloudDestination( workspace=self, @@ -370,6 +455,7 @@ def permanently_delete_source( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, safe_mode=safe_mode, ) @@ -406,6 +492,7 @@ def permanently_delete_destination( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, safe_mode=safe_mode, ) @@ -452,6 +539,7 @@ def deploy_connection( prefix=table_prefix or "", client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return CloudConnection( @@ -496,6 +584,7 @@ def permanently_delete_connection( workspace_id=self.workspace_id, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, safe_mode=safe_mode, ) @@ -529,6 +618,7 @@ def list_connections( name_filter=name_filter, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return [ CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) @@ -556,6 +646,7 @@ def list_sources( name_filter=name_filter, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return [ CloudSource._from_source_response( # noqa: SLF001 (non-public API) @@ -583,6 +674,7 @@ def list_destinations( name_filter=name_filter, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return [ CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) @@ -683,6 +775,7 @@ def publish_custom_source_definition( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 self, result @@ -718,6 +811,7 @@ def list_custom_source_definitions( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return [ CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 @@ -751,6 +845,7 @@ def get_custom_source_definition( api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + bearer_token=self.bearer_token, ) return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 diff --git a/airbyte/constants.py b/airbyte/constants.py index 465afffc1..7191012c8 100644 --- a/airbyte/constants.py +++ b/airbyte/constants.py @@ -216,6 +216,9 @@ def _str_to_bool(value: str) -> bool: CLOUD_CLIENT_SECRET_ENV_VAR: str = "AIRBYTE_CLOUD_CLIENT_SECRET" """The environment variable name for the Airbyte Cloud client secret.""" +CLOUD_BEARER_TOKEN_ENV_VAR: str = "AIRBYTE_CLOUD_BEARER_TOKEN" +"""The environment variable name for the Airbyte Cloud bearer token.""" + CLOUD_API_ROOT_ENV_VAR: str = "AIRBYTE_CLOUD_API_URL" """The environment variable name for the Airbyte Cloud API URL.""" diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index 841db4fe1..339f170bb 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -11,6 +11,7 @@ from airbyte._util import api_util from airbyte.cloud.auth import ( resolve_cloud_api_url, + resolve_cloud_bearer_token, resolve_cloud_client_id, resolve_cloud_client_secret, resolve_cloud_workspace_id, @@ -214,10 +215,22 @@ class SyncJobListResult(BaseModel): def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace: """Get an authenticated CloudWorkspace. + Prefers bearer token authentication if available, otherwise falls back to + client credentials (client_id + client_secret). + Args: workspace_id: Optional workspace ID. If not provided, uses the AIRBYTE_CLOUD_WORKSPACE_ID environment variable. """ + bearer_token = resolve_cloud_bearer_token() + + if bearer_token is not None: + return CloudWorkspace( + workspace_id=resolve_cloud_workspace_id(workspace_id), + bearer_token=bearer_token, + api_root=resolve_cloud_api_url(), + ) + return CloudWorkspace( workspace_id=resolve_cloud_workspace_id(workspace_id), client_id=resolve_cloud_client_id(), diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index c0ba2c999..ee9afd08a 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -15,7 +15,7 @@ CLOUD_API_ROOT, AirbyteError, check_connector, - get_bearer_token, + get_new_bearer_token, ) from airbyte.secrets.base import SecretString from airbyte_api.models import ( @@ -253,7 +253,7 @@ def test_get_bearer_token( api_root: str, ) -> None: try: - token: SecretString = get_bearer_token( + token: SecretString = get_new_bearer_token( client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, api_root=api_root,