From 05d1a176dcbe13ffb98a644a64c3a8b01bdaf27c Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Mon, 21 Jul 2025 15:20:48 +0300 Subject: [PATCH 01/10] post data to data lake service --- .../clients/port/mixins/integrations.py | 20 +++++++++++++++++-- .../core/integrations/mixins/sync_raw.py | 9 +++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/port_ocean/clients/port/mixins/integrations.py b/port_ocean/clients/port/mixins/integrations.py index aa65b4cb73..a32050742c 100644 --- a/port_ocean/clients/port/mixins/integrations.py +++ b/port_ocean/clients/port/mixins/integrations.py @@ -1,9 +1,10 @@ import asyncio -from typing import Any, Dict, List, TYPE_CHECKING, Optional, TypedDict +from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict from urllib.parse import quote_plus import httpx from loguru import logger + from port_ocean.clients.port.authentication import PortAuthentication from port_ocean.clients.port.utils import handle_port_status_code from port_ocean.exceptions.port_defaults import DefaultsProvisionFailed @@ -125,7 +126,7 @@ async def _poll_integration_until_default_provisioning_is_complete( while attempts < INTEGRATION_POLLING_RETRY_LIMIT: logger.info( - f"Fetching created integration and validating config, attempt {attempts+1}/{INTEGRATION_POLLING_RETRY_LIMIT}", + f"Fetching created integration and validating config, attempt {attempts + 1}/{INTEGRATION_POLLING_RETRY_LIMIT}", attempt=attempts, ) response = await self._get_current_integration() @@ -288,3 +289,18 @@ async def delete_current_integration( response = await self._delete_current_integration() handle_port_status_code(response, should_raise, should_log) return response.json() + + async def post_integration_raw_data( + self, raw_data: list[dict[Any, Any]], sync_id: str, kind: str + ) -> None: + logger.debug("starting POST raw data request", raw_data=raw_data) + headers = await self.auth.headers() + response = await self.client.post( + f"{self.auth.api_url}/{self.integration_identifier}/datalake/{sync_id}/{kind}/items", + headers=headers, + json={ + "items": raw_data, + }, + ) + handle_port_status_code(response, should_log=False) + logger.debug("Finished POST raw data request") diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index ed09659e77..4a935187a9 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -363,9 +363,13 @@ async def _register_in_batches( results, errors = await self._get_resource_raw_results(resource_config) async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] raw_results: RAW_RESULT = [] + + raw_data = [] + for result in results: if isinstance(result, dict): raw_results.append(result) + raw_data.append(result) else: async_generators.append(result) @@ -397,6 +401,7 @@ async def _register_in_batches( for generator in async_generators: try: async for items in generator: + raw_data.extend(items) number_of_raw_results += len(items) if send_raw_data_examples_amount > 0: send_raw_data_examples_amount = max( @@ -424,6 +429,10 @@ async def _register_in_batches( f"Finished registering kind: {resource_config.kind}-{resource.resource.index} ,{len(passed_entities)} entities out of {number_of_raw_results} raw results" ) + flags = await ocean.port_client.get_organization_feature_flags(); + if "DATALAKE_ENABLED" in flags: + await ocean.port_client.post_integration_raw_data(raw_data, event.id, resource_config.kind) + ocean.metrics.set_metric( name=MetricType.SUCCESS_NAME, labels=[ocean.metrics.current_resource_kind(), MetricPhase.RESYNC], From b89f216e50babe2441f3bc7cc4eb3779aa293019 Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Mon, 21 Jul 2025 19:14:02 +0300 Subject: [PATCH 02/10] Update integration client to post data to the data lake service with new sync endpoint --- port_ocean/clients/port/mixins/integrations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/port_ocean/clients/port/mixins/integrations.py b/port_ocean/clients/port/mixins/integrations.py index a32050742c..5c59c6c446 100644 --- a/port_ocean/clients/port/mixins/integrations.py +++ b/port_ocean/clients/port/mixins/integrations.py @@ -296,7 +296,7 @@ async def post_integration_raw_data( logger.debug("starting POST raw data request", raw_data=raw_data) headers = await self.auth.headers() response = await self.client.post( - f"{self.auth.api_url}/{self.integration_identifier}/datalake/{sync_id}/{kind}/items", + f"{self.auth.api_url}/datalake/integration/{self.integration_identifier}/sync/{sync_id}/kind/{kind}/items", headers=headers, json={ "items": raw_data, From 155d420839255f0fc575b9d056e22484d2553a59 Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Tue, 22 Jul 2025 16:54:22 +0300 Subject: [PATCH 03/10] Add datalake_enabled configuration and update integration logic for data lake posting --- port_ocean/config/settings.py | 3 ++- port_ocean/core/integrations/mixins/sync_raw.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index 34515f036f..cff2975e16 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -12,8 +12,8 @@ from port_ocean.core.models import ( CachingStorageMode, CreatePortResourcesOrigin, - Runtime, ProcessExecutionMode, + Runtime, ) from port_ocean.utils.misc import get_integration_name, get_spec_file @@ -108,6 +108,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): upsert_entities_batch_max_length: int = 20 upsert_entities_batch_max_size_in_bytes: int = 1024 * 1024 + datalake_enabled: bool = False @validator("process_execution_mode") def validate_process_execution_mode( diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 4a935187a9..2ac39e9690 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -430,7 +430,7 @@ async def _register_in_batches( ) flags = await ocean.port_client.get_organization_feature_flags(); - if "DATALAKE_ENABLED" in flags: + if "DATALAKE_ALLOWED" in flags and ocean.config.datalake_enabled: await ocean.port_client.post_integration_raw_data(raw_data, event.id, resource_config.kind) ocean.metrics.set_metric( From e04a0f8b34c4ca02d34d609eb4db46cbd2902a80 Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Wed, 23 Jul 2025 14:04:28 +0300 Subject: [PATCH 04/10] Enhance mock_port_client fixture to include get_organization_feature_flags method --- port_ocean/tests/core/conftest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/port_ocean/tests/core/conftest.py b/port_ocean/tests/core/conftest.py index 55e71c529e..86bfca32aa 100644 --- a/port_ocean/tests/core/conftest.py +++ b/port_ocean/tests/core/conftest.py @@ -1,10 +1,11 @@ from contextlib import asynccontextmanager from typing import Any, AsyncGenerator -from unittest.mock import MagicMock, AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from httpx import Response +from port_ocean.cache.memory import InMemoryCacheProvider from port_ocean.clients.port.client import PortClient from port_ocean.config.settings import IntegrationSettings, MetricsSettings from port_ocean.context.event import EventContext @@ -26,7 +27,6 @@ from port_ocean.core.models import Entity, ProcessExecutionMode from port_ocean.helpers.metric.metric import Metrics from port_ocean.ocean import Ocean -from port_ocean.cache.memory import InMemoryCacheProvider @pytest.fixture @@ -107,6 +107,7 @@ def mock_port_client(mock_http_client: MagicMock) -> PortClient: ) mock_port_client.search_entities = AsyncMock(return_value=[]) # type: ignore + mock_port_client.get_organization_feature_flags = AsyncMock(return_value=[]) # type: ignore mock_port_client.client = mock_http_client return mock_port_client From 1dcc748b5e6dcefc9787732ad9941ad872fd81ed Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Sun, 27 Jul 2025 12:11:32 +0300 Subject: [PATCH 05/10] fixed names --- port_ocean/clients/port/mixins/integrations.py | 2 +- port_ocean/core/integrations/mixins/sync_raw.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/port_ocean/clients/port/mixins/integrations.py b/port_ocean/clients/port/mixins/integrations.py index 5c59c6c446..f9f3afd73c 100644 --- a/port_ocean/clients/port/mixins/integrations.py +++ b/port_ocean/clients/port/mixins/integrations.py @@ -296,7 +296,7 @@ async def post_integration_raw_data( logger.debug("starting POST raw data request", raw_data=raw_data) headers = await self.auth.headers() response = await self.client.post( - f"{self.auth.api_url}/datalake/integration/{self.integration_identifier}/sync/{sync_id}/kind/{kind}/items", + f"{self.auth.api_url}/lakehouse/integration/{self.integration_identifier}/sync/{sync_id}/kind/{kind}/items", headers=headers, json={ "items": raw_data, diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 2ac39e9690..b9506da439 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -430,7 +430,7 @@ async def _register_in_batches( ) flags = await ocean.port_client.get_organization_feature_flags(); - if "DATALAKE_ALLOWED" in flags and ocean.config.datalake_enabled: + if "LAKEHOUSE_ALLOWED" in flags and ocean.config.datalake_enabled: await ocean.port_client.post_integration_raw_data(raw_data, event.id, resource_config.kind) ocean.metrics.set_metric( From 7e2152a7f780e3c2c6ad943f7f702c97f663a77c Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Sun, 27 Jul 2025 14:27:53 +0300 Subject: [PATCH 06/10] Refactor SyncRawMixin to enable conditional posting of raw data to lakehouse. Introduced a method to check if lakehouse data is enabled and updated logic to post data accordingly. --- .../core/integrations/mixins/sync_raw.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index b9506da439..76f049f15e 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -363,13 +363,13 @@ async def _register_in_batches( results, errors = await self._get_resource_raw_results(resource_config) async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] raw_results: RAW_RESULT = [] - - raw_data = [] + lakehouse_data_enabled = await self._lakehouse_data_enabled() for result in results: if isinstance(result, dict): raw_results.append(result) - raw_data.append(result) + if lakehouse_data_enabled: + await ocean.port_client.post_integration_raw_data(result, event.id, resource_config.kind) else: async_generators.append(result) @@ -401,7 +401,8 @@ async def _register_in_batches( for generator in async_generators: try: async for items in generator: - raw_data.extend(items) + if lakehouse_data_enabled: + await ocean.port_client.post_integration_raw_data(items, event.id, resource_config.kind) number_of_raw_results += len(items) if send_raw_data_examples_amount > 0: send_raw_data_examples_amount = max( @@ -429,9 +430,7 @@ async def _register_in_batches( f"Finished registering kind: {resource_config.kind}-{resource.resource.index} ,{len(passed_entities)} entities out of {number_of_raw_results} raw results" ) - flags = await ocean.port_client.get_organization_feature_flags(); - if "LAKEHOUSE_ALLOWED" in flags and ocean.config.datalake_enabled: - await ocean.port_client.post_integration_raw_data(raw_data, event.id, resource_config.kind) + await self._post_lakehouse_data_if_enabled(raw_data, resource_config.kind) ocean.metrics.set_metric( name=MetricType.SUCCESS_NAME, @@ -471,6 +470,19 @@ async def _register_in_batches( return passed_entities, errors + async def _lakehouse_data_enabled( + self + ) -> bool: + """Check if lakehouse data is enabled. + + Returns: + bool: True if lakehouse data is enabled, False otherwise + """ + flags = await ocean.port_client.get_organization_feature_flags() + if "LAKEHOUSE_ALLOWED" in flags and ocean.config.datalake_enabled: + return True + return False + async def register_raw( self, kind: str, From bd6be8b8f9f287373ce7565c32caf9fafcaf37af Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Sun, 27 Jul 2025 14:30:53 +0300 Subject: [PATCH 07/10] fixed feature flag --- port_ocean/core/integrations/mixins/sync_raw.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 76f049f15e..539b45c4c4 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -479,7 +479,7 @@ async def _lakehouse_data_enabled( bool: True if lakehouse data is enabled, False otherwise """ flags = await ocean.port_client.get_organization_feature_flags() - if "LAKEHOUSE_ALLOWED" in flags and ocean.config.datalake_enabled: + if "LAKEHOUSE_ELIGIBLE" in flags and ocean.config.datalake_enabled: return True return False From 382a80dd97aca0d3941a9583c9e1a227f874dd81 Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Sun, 27 Jul 2025 16:10:14 +0300 Subject: [PATCH 08/10] Remove conditional posting of raw data to lakehouse in SyncRawMixin. This change simplifies the integration logic by eliminating the call to post lakehouse data, aligning with recent refactoring efforts. --- port_ocean/core/integrations/mixins/sync_raw.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 539b45c4c4..db56c80f75 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -430,8 +430,6 @@ async def _register_in_batches( f"Finished registering kind: {resource_config.kind}-{resource.resource.index} ,{len(passed_entities)} entities out of {number_of_raw_results} raw results" ) - await self._post_lakehouse_data_if_enabled(raw_data, resource_config.kind) - ocean.metrics.set_metric( name=MetricType.SUCCESS_NAME, labels=[ocean.metrics.current_resource_kind(), MetricPhase.RESYNC], From 77a461d3aa55950b7ff9f28188d3350609bf6df1 Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Tue, 29 Jul 2025 15:47:54 +0300 Subject: [PATCH 09/10] renamed config --- port_ocean/config/settings.py | 2 +- port_ocean/core/integrations/mixins/sync_raw.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index cff2975e16..2c81c4fc42 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -108,7 +108,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): upsert_entities_batch_max_length: int = 20 upsert_entities_batch_max_size_in_bytes: int = 1024 * 1024 - datalake_enabled: bool = False + lakehouse_enabled: bool = False @validator("process_execution_mode") def validate_process_execution_mode( diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index db56c80f75..0155a33277 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -477,7 +477,7 @@ async def _lakehouse_data_enabled( bool: True if lakehouse data is enabled, False otherwise """ flags = await ocean.port_client.get_organization_feature_flags() - if "LAKEHOUSE_ELIGIBLE" in flags and ocean.config.datalake_enabled: + if "LAKEHOUSE_ELIGIBLE" in flags and ocean.config.lakehouse_enabled: return True return False From f1b100530e0fb6f7753e4dca5d5d8ffb6d4f6a18 Mon Sep 17 00:00:00 2001 From: yaelibarg Date: Sun, 3 Aug 2025 14:10:14 +0300 Subject: [PATCH 10/10] bupmed version --- CHANGELOG.md | 6 ++++++ pyproject.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b40dbbcd5..aa2d853730 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.26.2 (2025-08-03) + +### Improvements + +- Add posting integration raw data to lakehouse + ## 0.26.1 (2025-07-20) ### Improvements diff --git a/pyproject.toml b/pyproject.toml index cf0c56548b..ab7c5d2e21 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.26.1" +version = "0.26.2" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"