Skip to content
20 changes: 18 additions & 2 deletions port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
from typing import Any, Dict, List, TYPE_CHECKING, Optional, TypedDict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict
from urllib.parse import quote_plus

import httpx
from loguru import logger

from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_port_status_code
from port_ocean.exceptions.port_defaults import DefaultsProvisionFailed
Expand Down Expand Up @@ -125,7 +126,7 @@ async def _poll_integration_until_default_provisioning_is_complete(

while attempts < INTEGRATION_POLLING_RETRY_LIMIT:
logger.info(
f"Fetching created integration and validating config, attempt {attempts+1}/{INTEGRATION_POLLING_RETRY_LIMIT}",
f"Fetching created integration and validating config, attempt {attempts + 1}/{INTEGRATION_POLLING_RETRY_LIMIT}",
attempt=attempts,
)
response = await self._get_current_integration()
Expand Down Expand Up @@ -288,3 +289,18 @@ async def delete_current_integration(
response = await self._delete_current_integration()
handle_port_status_code(response, should_raise, should_log)
return response.json()

async def post_integration_raw_data(
self, raw_data: list[dict[Any, Any]], sync_id: str, kind: str
) -> None:
logger.debug("starting POST raw data request", raw_data=raw_data)
headers = await self.auth.headers()
response = await self.client.post(
f"{self.auth.api_url}/lakehouse/integration/{self.integration_identifier}/sync/{sync_id}/kind/{kind}/items",
headers=headers,
json={
"items": raw_data,
},
)
handle_port_status_code(response, should_log=False)
logger.debug("Finished POST raw data request")
3 changes: 2 additions & 1 deletion port_ocean/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from port_ocean.core.models import (
CachingStorageMode,
CreatePortResourcesOrigin,
Runtime,
ProcessExecutionMode,
Runtime,
)
from port_ocean.utils.misc import get_integration_name, get_spec_file

Expand Down Expand Up @@ -108,6 +108,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):

upsert_entities_batch_max_length: int = 20
upsert_entities_batch_max_size_in_bytes: int = 1024 * 1024
lakehouse_enabled: bool = False

@validator("process_execution_mode")
def validate_process_execution_mode(
Expand Down
19 changes: 19 additions & 0 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,13 @@ async def _register_in_batches(
results, errors = await self._get_resource_raw_results(resource_config)
async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = []
raw_results: RAW_RESULT = []
lakehouse_data_enabled = await self._lakehouse_data_enabled()

for result in results:
if isinstance(result, dict):
raw_results.append(result)
if lakehouse_data_enabled:
await ocean.port_client.post_integration_raw_data(result, event.id, resource_config.kind)
else:
async_generators.append(result)

Expand Down Expand Up @@ -397,6 +401,8 @@ async def _register_in_batches(
for generator in async_generators:
try:
async for items in generator:
if lakehouse_data_enabled:
await ocean.port_client.post_integration_raw_data(items, event.id, resource_config.kind)
number_of_raw_results += len(items)
if send_raw_data_examples_amount > 0:
send_raw_data_examples_amount = max(
Expand Down Expand Up @@ -462,6 +468,19 @@ async def _register_in_batches(

return passed_entities, errors

async def _lakehouse_data_enabled(
self
) -> bool:
"""Check if lakehouse data is enabled.

Returns:
bool: True if lakehouse data is enabled, False otherwise
"""
flags = await ocean.port_client.get_organization_feature_flags()
if "LAKEHOUSE_ELIGIBLE" in flags and ocean.config.lakehouse_enabled:
return True
return False

async def register_raw(
self,
kind: str,
Expand Down
5 changes: 3 additions & 2 deletions port_ocean/tests/core/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator
from unittest.mock import MagicMock, AsyncMock, patch
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from httpx import Response

from port_ocean.cache.memory import InMemoryCacheProvider
from port_ocean.clients.port.client import PortClient
from port_ocean.config.settings import IntegrationSettings, MetricsSettings
from port_ocean.context.event import EventContext
Expand All @@ -26,7 +27,6 @@
from port_ocean.core.models import Entity, ProcessExecutionMode
from port_ocean.helpers.metric.metric import Metrics
from port_ocean.ocean import Ocean
from port_ocean.cache.memory import InMemoryCacheProvider


@pytest.fixture
Expand Down Expand Up @@ -107,6 +107,7 @@ def mock_port_client(mock_http_client: MagicMock) -> PortClient:
)

mock_port_client.search_entities = AsyncMock(return_value=[]) # type: ignore
mock_port_client.get_organization_feature_flags = AsyncMock(return_value=[]) # type: ignore
mock_port_client.client = mock_http_client
return mock_port_client

Expand Down