diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b40dbbcd5..aa2d853730 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.26.2 (2025-08-03) + +### Improvements + +- Add posting integration raw data to lakehouse + ## 0.26.1 (2025-07-20) ### Improvements diff --git a/port_ocean/clients/port/mixins/integrations.py b/port_ocean/clients/port/mixins/integrations.py index aa65b4cb73..f9f3afd73c 100644 --- a/port_ocean/clients/port/mixins/integrations.py +++ b/port_ocean/clients/port/mixins/integrations.py @@ -1,9 +1,10 @@ import asyncio -from typing import Any, Dict, List, TYPE_CHECKING, Optional, TypedDict +from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict from urllib.parse import quote_plus import httpx from loguru import logger + from port_ocean.clients.port.authentication import PortAuthentication from port_ocean.clients.port.utils import handle_port_status_code from port_ocean.exceptions.port_defaults import DefaultsProvisionFailed @@ -125,7 +126,7 @@ async def _poll_integration_until_default_provisioning_is_complete( while attempts < INTEGRATION_POLLING_RETRY_LIMIT: logger.info( - f"Fetching created integration and validating config, attempt {attempts+1}/{INTEGRATION_POLLING_RETRY_LIMIT}", + f"Fetching created integration and validating config, attempt {attempts + 1}/{INTEGRATION_POLLING_RETRY_LIMIT}", attempt=attempts, ) response = await self._get_current_integration() @@ -288,3 +289,18 @@ async def delete_current_integration( response = await self._delete_current_integration() handle_port_status_code(response, should_raise, should_log) return response.json() + + async def post_integration_raw_data( + self, raw_data: list[dict[Any, Any]], sync_id: str, kind: str + ) -> None: + logger.debug("starting POST raw data request", raw_data=raw_data) + headers = await self.auth.headers() + response = await self.client.post( + f"{self.auth.api_url}/lakehouse/integration/{self.integration_identifier}/sync/{sync_id}/kind/{kind}/items", + headers=headers, + json={ + "items": raw_data, + }, + ) + handle_port_status_code(response, should_log=False) + logger.debug("Finished POST raw data request") diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index 34515f036f..2c81c4fc42 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -12,8 +12,8 @@ from port_ocean.core.models import ( CachingStorageMode, CreatePortResourcesOrigin, - Runtime, ProcessExecutionMode, + Runtime, ) from port_ocean.utils.misc import get_integration_name, get_spec_file @@ -108,6 +108,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): upsert_entities_batch_max_length: int = 20 upsert_entities_batch_max_size_in_bytes: int = 1024 * 1024 + lakehouse_enabled: bool = False @validator("process_execution_mode") def validate_process_execution_mode( diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index ed09659e77..0155a33277 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -363,9 +363,13 @@ async def _register_in_batches( results, errors = await self._get_resource_raw_results(resource_config) async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] raw_results: RAW_RESULT = [] + lakehouse_data_enabled = await self._lakehouse_data_enabled() + for result in results: if isinstance(result, dict): raw_results.append(result) + if lakehouse_data_enabled: + await ocean.port_client.post_integration_raw_data(result, event.id, resource_config.kind) else: async_generators.append(result) @@ -397,6 +401,8 @@ async def _register_in_batches( for generator in async_generators: try: async for items in generator: + if lakehouse_data_enabled: + await ocean.port_client.post_integration_raw_data(items, event.id, resource_config.kind) number_of_raw_results += len(items) if send_raw_data_examples_amount > 0: send_raw_data_examples_amount = max( @@ -462,6 +468,19 @@ async def _register_in_batches( return passed_entities, errors + async def _lakehouse_data_enabled( + self + ) -> bool: + """Check if lakehouse data is enabled. + + Returns: + bool: True if lakehouse data is enabled, False otherwise + """ + flags = await ocean.port_client.get_organization_feature_flags() + if "LAKEHOUSE_ELIGIBLE" in flags and ocean.config.lakehouse_enabled: + return True + return False + async def register_raw( self, kind: str, diff --git a/port_ocean/tests/core/conftest.py b/port_ocean/tests/core/conftest.py index 55e71c529e..86bfca32aa 100644 --- a/port_ocean/tests/core/conftest.py +++ b/port_ocean/tests/core/conftest.py @@ -1,10 +1,11 @@ from contextlib import asynccontextmanager from typing import Any, AsyncGenerator -from unittest.mock import MagicMock, AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from httpx import Response +from port_ocean.cache.memory import InMemoryCacheProvider from port_ocean.clients.port.client import PortClient from port_ocean.config.settings import IntegrationSettings, MetricsSettings from port_ocean.context.event import EventContext @@ -26,7 +27,6 @@ from port_ocean.core.models import Entity, ProcessExecutionMode from port_ocean.helpers.metric.metric import Metrics from port_ocean.ocean import Ocean -from port_ocean.cache.memory import InMemoryCacheProvider @pytest.fixture @@ -107,6 +107,7 @@ def mock_port_client(mock_http_client: MagicMock) -> PortClient: ) mock_port_client.search_entities = AsyncMock(return_value=[]) # type: ignore + mock_port_client.get_organization_feature_flags = AsyncMock(return_value=[]) # type: ignore mock_port_client.client = mock_http_client return mock_port_client diff --git a/pyproject.toml b/pyproject.toml index cf0c56548b..ab7c5d2e21 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.26.1" +version = "0.26.2" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"