Skip to content

Conversation

@yaelibarg
Copy link
Contributor

@yaelibarg yaelibarg commented Jul 22, 2025

PR Type

Enhancement


Description

  • Add data lake service integration for raw data posting

  • Implement new endpoint for sending raw data to Port's data lake

  • Add feature flag check for DATALAKE_ENABLED functionality

  • Collect and send raw data during sync operations


Diagram Walkthrough

flowchart LR
  A["Sync Process"] --> B["Collect Raw Data"]
  B --> C["Check Feature Flag"]
  C --> D["Post to Data Lake"]
  D --> E["Data Lake Service"]
Loading

File Walkthrough

Relevant files
Enhancement
integrations.py
Add data lake client method                                                           

port_ocean/clients/port/mixins/integrations.py

  • Add post_integration_raw_data method for data lake API calls
  • Implement POST endpoint to send raw data with sync ID and kind
  • Add proper error handling and logging for data lake requests
+18/-2   
sync_raw.py
Integrate data lake posting in sync flow                                 

port_ocean/core/integrations/mixins/sync_raw.py

  • Collect raw data during sync operations in raw_data list
  • Add feature flag check for DATALAKE_ENABLED before posting
  • Extend raw data collection for both dict results and async generators
  • Call new data lake posting method after sync completion
+9/-0     

@yaelibarg yaelibarg requested a review from a team as a code owner July 22, 2025 12:33
@qodo-merge-pro qodo-merge-pro bot changed the title Send raw data to Port's POST endpoint Send raw data to Port's POST endpoint Jul 22, 2025
@yaelibarg yaelibarg changed the title Send raw data to Port's POST endpoint [Core] Send raw data to Port's POST endpoint Jul 22, 2025
@qodo-merge-pro
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Issue

The feature flag check uses a synchronous call pattern but the method is async. The semicolon at line 432 should be removed, and error handling should be added for the feature flag check and data lake posting operations.

flags = await ocean.port_client.get_organization_feature_flags();
if "DATALAKE_ENABLED" in flags:
    await ocean.port_client.post_integration_raw_data(raw_data, event.id, resource_config.kind)
Memory Concern

The raw_data list accumulates all raw data in memory throughout the sync process, which could cause memory issues for large datasets. Consider implementing batching or streaming for large data volumes.

raw_data = []

for result in results:
    if isinstance(result, dict):
        raw_results.append(result)
        raw_data.append(result)
    else:
        async_generators.append(result)

send_raw_data_examples_amount = (
    SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0
)

passed_entities = []
number_of_raw_results = 0
number_of_transformed_entities = 0

if raw_results:
    number_of_raw_results += len(raw_results)
    calculation_result = await self._register_resource_raw(
        resource_config,
        raw_results,
        user_agent_type,
        send_raw_data_examples_amount=send_raw_data_examples_amount,
    )
    errors.extend(calculation_result.errors)
    passed_entities = list(calculation_result.entity_selector_diff.passed)
    number_of_transformed_entities += (
        calculation_result.number_of_transformed_entities
    )
    logger.info(
        f"Finished registering change for {len(raw_results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected"
    )

for generator in async_generators:
    try:
        async for items in generator:
            raw_data.extend(items)
Error Handling

The post_integration_raw_data method has minimal error handling and logging is disabled. Failed data lake posts could go unnoticed, and there's no retry mechanism for transient failures.

async def post_integration_raw_data(
    self, raw_data: list[dict[Any, Any]], sync_id: str, kind: str
) -> None:
    logger.debug("starting POST raw data request", raw_data=raw_data)
    headers = await self.auth.headers()
    response = await self.client.post(
        f"{self.auth.api_url}/datalake/integration/{self.integration_identifier}/sync/{sync_id}/kind/{kind}/items",
        headers=headers,
        json={
            "items": raw_data,
        },
    )
    handle_port_status_code(response, should_log=False)
    logger.debug("Finished POST raw data request")

@qodo-merge-pro
Copy link
Contributor

qodo-merge-pro bot commented Jul 22, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Add error handling for data lake posting

Add error handling around the data lake posting operation to prevent sync
failures if the POST request fails. The current implementation could cause the
entire sync to fail if there's an issue with the data lake endpoint.

port_ocean/core/integrations/mixins/sync_raw.py [432-434]

 flags = await ocean.port_client.get_organization_feature_flags();
 if "DATALAKE_ENABLED" in flags:
-    await ocean.port_client.post_integration_raw_data(raw_data, event.id, resource_config.kind)
+    try:
+        await ocean.port_client.post_integration_raw_data(raw_data, event.id, resource_config.kind)
+    except Exception as e:
+        logger.warning(f"Failed to post raw data to data lake: {e}")
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that a failure in post_integration_raw_data would crash the sync process, and since this is a feature-flagged, auxiliary operation, it should be wrapped in error handling to improve robustness.

Medium
Security
Avoid logging sensitive raw data

Avoid logging the entire raw_data in debug logs as it could contain sensitive
information and create very large log entries. Log only metadata like the count
of items instead.

port_ocean/clients/port/mixins/integrations.py [293-296]

 async def post_integration_raw_data(
     self, raw_data: list[dict[Any, Any]], sync_id: str, kind: str
 ) -> None:
-    logger.debug("starting POST raw data request", raw_data=raw_data)
+    logger.debug("starting POST raw data request", item_count=len(raw_data), sync_id=sync_id, kind=kind)
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly points out that logging the entire raw_data can expose sensitive information and create large log entries; replacing it with metadata like item count is a good practice for security and performance.

Medium
  • Update

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16468890976/artifacts/3596118490

Code Coverage Total Percentage: 83.39%

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16549496645/artifacts/3623498579

Code Coverage Total Percentage: 83.78%

…kehouse. Introduced a method to check if lakehouse data is enabled and updated logic to post data accordingly.
…This change simplifies the integration logic by eliminating the call to post lakehouse data, aligning with recent refactoring efforts.
@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16551485646/artifacts/3623980090

Code Coverage Total Percentage: 83.77%

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16596538210/artifacts/3638959580

Code Coverage Total Percentage: 83.81%

Copy link
Contributor

@ivankalinovski ivankalinovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions bot added size/M and removed size/S labels Aug 3, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Aug 3, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16704342305/artifacts/3675969812

Code Coverage Total Percentage: 83.92%

@github-actions
Copy link
Contributor

github-actions bot commented Aug 3, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16704358431/artifacts/3675971734

Code Coverage Total Percentage: 83.92%

@yaelibarg yaelibarg merged commit de1ee21 into main Aug 3, 2025
17 checks passed
@yaelibarg yaelibarg deleted the PORT-15525-create-data-lake-service branch August 3, 2025 11:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants