Skip to content

Conversation

@IdansPort
Copy link
Contributor

@IdansPort IdansPort commented Jul 30, 2025

User description

Description

Why
Problem: GitHub webhook events for the same resource (like a PR) were getting processed out of order when multiple workers ran concurrently. This broke temporal consistency - you'd get a "PR closed" event processed before "PR updated", leading to stale data.
Solution: Group events by resource ID and ensure only one worker processes events for any given resource at a time, while still allowing parallel processing across different resources.
What
Three core changes:

Group-based queuing: Added GroupQueue that partitions events by group_id and locks groups during processing
Resource identification: Created group_selector.py to extract consistent resource IDs from GitHub webhook payloads
Multi-worker coordination: Modified processor manager to spawn multiple workers that respect group locks

How
Resource Identification: Extract consistent IDs from GitHub events (e.g., pull_request-123, issue-456) to group related events together.
Group Queue: Queue implementation that enforces sequential processing within groups while allowing concurrent processing across groups. Workers lock groups on get() and unlock on commit().
Worker Pool: Multiple workers per webhook path, each pulling from the shared group queue. Integration automatically chooses group-aware or simple processing based on worker count configuration.
Result: Events for the same resource process in order, different resources process in parallel.

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Overview of the flow:
image

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.


PR Type

Enhancement


Description

  • Implement group-based queue for ordered event processing

  • Add multi-worker support with group locking mechanism

  • Ensure sequential processing within resource groups

  • Enable parallel processing across different resource groups


Diagram Walkthrough

flowchart LR
  A["Webhook Events"] --> B["GroupQueue"]
  B --> C["Group Selector"]
  C --> D["Resource Groups"]
  D --> E["Worker Pool"]
  E --> F["Sequential Processing per Group"]
  E --> G["Parallel Processing across Groups"]
Loading

File Walkthrough

Relevant files
Configuration changes
1 files
settings.py
Add event workers configuration setting                                   
+2/-0     
Enhancement
6 files
__init__.py
Export GroupQueue class                                                                   
+2/-1     
abstract_queue.py
Add size method to queue interface                                             
+8/-0     
group_queue.py
Implement group-based queue with worker coordination         
+156/-0 
local_queue.py
Implement size method for local queue                                       
+3/-0     
processor_manager.py
Replace single processor with multi-worker system               
+68/-70 
webhook_event.py
Add group_id field to webhook events                                         
+2/-0     
Tests
1 files
test_group_queue.py
Comprehensive test suite for group queue                                 
+580/-0 
Documentation
1 files
CHANGELOG.md
Document parallel queue implementation improvement             
+5/-0     
Miscellaneous
1 files
pyproject.toml
Bump version to 0.26.2                                                                     
+1/-1     

@IdansPort IdansPort requested a review from a team as a code owner July 30, 2025 08:44
@qodo-code-review
Copy link
Contributor

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Concurrency Bug

The worker ID generation using uuid.uuid4() may not provide proper isolation between concurrent workers. Multiple workers could potentially get the same UUID or context variable issues could arise in high-concurrency scenarios.

def _get_worker_id(self) -> str:
    """Get unique worker ID for current task/coroutine"""
    worker_id = _worker_context.get()
    if worker_id is None:
        # Generate unique ID for this task
        worker_id = str(uuid.uuid4())
        _worker_context.set(worker_id)
    return worker_id
Resource Leak

The hardcoded CONCURRENCY_PER_PATH constant and the worker spawning logic may create resource leaks if workers fail to properly clean up. The exception handling in _queue_worker doesn't guarantee queue.commit() is called in all error scenarios.

except asyncio.CancelledError:
    logger.info(f"Worker {worker_id} for {path} shutting down")
    for _, proc in matching:
        await proc.cancel()
        self._timestamp_event_error(proc.event)
    break
except Exception as e:
    logger.exception(
        f"Unexpected error in worker {worker_id} for {path}: {e}"
    )
    for _, proc in matching:
        self._timestamp_event_error(proc.event)
finally:
    if event is not None:
        logger.info(f"{event.group_id}")
        await queue.commit()
Race Condition

The commit() method has a race condition where it checks if the current item matches the head of the queue, but another worker could have modified the queue state between the check and the actual pop operation.

# Verify we're committing the right item (safety check)
if q and q[0] == item and self._group_to_worker.get(g) == worker_id:
    q.popleft()  # remove the item we processed
    if not q:
        del self._queues[g]  # tidy up empty queue

    # Clean up tracking
    self._locked.discard(g)
    del self._current_items[worker_id]
    if g in self._group_to_worker:
        del self._group_to_worker[g]

    self._queue_not_empty.notify_all()

@qodo-code-review
Copy link
Contributor

qodo-code-review bot commented Jul 30, 2025

PR Code Suggestions ✨

Latest suggestions up to 3ee1d82

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix size calculation inconsistency

The size() method documentation claims to exclude items being processed, but the
implementation counts all items in queues including the head items that are
currently being processed by workers. This creates inconsistency between
documentation and behavior.

port_ocean/core/handlers/queue/group_queue.py [133-145]

 async def size(self) -> int:
     """
     Return **the current number of batched items waiting in the queue**,
     aggregated across every group.
 
     • Excludes the batches that are being processed right now
       (i.e. those referenced by ``self._current_items``).
     • Safe to call from multiple coroutines concurrently.
     • Runs in O(#groups) time – negligible unless you have
       millions of distinct group keys.
     """
     async with self._queue_not_empty:
-        return sum(len(dq) for dq in self._queues.values())
+        total = sum(len(dq) for dq in self._queues.values())
+        # Subtract items currently being processed (head items in locked groups)
+        currently_processing = len(self._current_items)
+        return max(0, total - currently_processing)
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that the size() implementation does not match its docstring, as it includes items currently being processed, and the proposed fix aligns the behavior with the documentation.

Medium
General
Remove debug logging statement
Suggestion Impact:The debug logging statement "logger.info(f"{event.group_id}")" was removed from the finally block as suggested, though the commit also added additional error handling around the queue.commit() call

code diff:

-                if event is not None:
-                    logger.info(f"{event.group_id}")
-                    await queue.commit()
+                try:
+                    if event is not None:
+                        await queue.commit()
+
+                except Exception as e:
+                    logger.exception(
+                        f"Unexpected error in queue commit in worker {worker_id} for {path}: {e}"
+                    )

The debug log statement logger.info(f"{event.group_id}") in the finally block
provides no context and will log potentially sensitive group IDs without proper
formatting or purpose. This appears to be leftover debug code that should be
removed or improved.

port_ocean/core/handlers/webhook/processor_manager.py [109-111]

-async def _queue_worker(self, path: str, worker_id: int) -> None:
-    """Single‐worker loop pulling from the queue for a given path."""
-    queue = self._event_queues[path]
-    while True:
-        event = None
-        matching: List[Tuple[ResourceConfig, AbstractWebhookProcessor]] = []
-        try:
-            event = await queue.get()
-            ...
-        finally:
-            if event is not None:
-                logger.info(f"{event.group_id}")
-                await queue.commit()
+finally:
+    if event is not None:
+        await queue.commit()

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a leftover, unformatted debug log statement that provides little value and should be removed from production code.

Low
  • Update

Previous suggestions

Suggestions
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix premature event commit on errors
Suggestion Impact:The commit addressed the core issue by wrapping the queue.commit() call in a try-catch block within the finally block, preventing exceptions during commit from crashing the worker while still ensuring commit only happens after successful processing

code diff:

             finally:
-                if event is not None:
-                    logger.info(f"{event.group_id}")
-                    await queue.commit()
+                try:
+                    if event is not None:
+                        logger.info(f"{event.group_id}")
+                        await queue.commit()
+
+                except Exception as e:
+                    logger.exception(
+                        f"Unexpected error in queue commit in worker {worker_id} for {path}: {e}"
+                    )

The queue.commit() call in the finally block will execute even when exceptions
occur during event processing, potentially marking failed events as successfully
processed. Move commit inside the try block after successful processing.

port_ocean/core/handlers/webhook/processor_manager.py [55-111]

 async def _queue_worker(self, path: str, worker_id: int) -> None:
     """Single‐worker loop pulling from the queue for a given path."""
     queue = self._event_queues[path]
     while True:
         event = None
         matching: List[Tuple[ResourceConfig, AbstractWebhookProcessor]] = []
         try:
             event = await queue.get()
             ...
-        finally:
+            # handle good/bad, sync results…
             if event is not None:
                 logger.info(f"{event.group_id}")
                 await queue.commit()
+        except asyncio.CancelledError:
+            logger.info(f"Worker {worker_id} for {path} shutting down")
+            for _, proc in matching:
+                await proc.cancel()
+                self._timestamp_event_error(proc.event)
+            break
+        except Exception as e:
+            logger.exception(
+                f"Unexpected error in worker {worker_id} for {path}: {e}"
+            )
+            for _, proc in matching:
+                self._timestamp_event_error(proc.event)
Suggestion importance[1-10]: 9

__

Why: This suggestion correctly identifies a critical bug where queue.commit() is called in a finally block, which would incorrectly mark an event as successfully processed even if an exception occurred, leading to data loss.

High
Fix size calculation excluding processing items

The docstring mentions excluding items being processed but the implementation
doesn't actually exclude them. The method should subtract the count of currently
processing items from the total queue size to match the documented behavior.

port_ocean/core/handlers/queue/group_queue.py [133-145]

 async def size(self) -> int:
     """
     Return **the current number of batched items waiting in the queue**,
     aggregated across every group.
 
     • Excludes the batches that are being processed right now
       (i.e. those referenced by ``self._current_items``).
     • Safe to call from multiple coroutines concurrently.
     • Runs in O(#groups) time – negligible unless you have
       millions of distinct group keys.
     """
     async with self._queue_not_empty:
-        return sum(len(dq) for dq in self._queues.values())
+        total_queued = sum(len(dq) for dq in self._queues.values())
+        currently_processing = len(self._current_items)
+        return max(0, total_queued - currently_processing)
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a bug where the size() method's implementation contradicts its docstring, as it includes items currently being processed instead of excluding them.

Medium

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16617795812/artifacts/3646722014

Code Coverage Total Percentage: 83.97%

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16642869348/artifacts/3655915252

Code Coverage Total Percentage: 84.06%

@qodo-code-review
Copy link
Contributor

Persistent suggestions updated to latest commit 3ee1d82

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16643906895/artifacts/3656269122

Code Coverage Total Percentage: 84.06%

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16644386130/artifacts/3656420807

Code Coverage Total Percentage: 84.06%

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16647488674/artifacts/3657498211

Code Coverage Total Percentage: 84.08%

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16647767834/artifacts/3657626476

Code Coverage Total Percentage: 84.1%

@github-actions
Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16648522726/artifacts/3657885506

Code Coverage Total Percentage: 84.08%

Copy link
Member

@matan84 matan84 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left comments

self.group_key = group_key # str | None
self._queues: Dict[MaybeStr, Deque[T]] = defaultdict(deque)
self._locked: Set[MaybeStr] = set()
self._queue_not_empty = asyncio.Condition()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we make sure this isn't locking all events and does lock only events in a certain group?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is the fact that this is only for a specific class instance enough to take care of that case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a diagram

loop = asyncio.get_event_loop()
config = ocean.integration.context.config

for path in self._event_queues.keys():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know that a group queue is being used? I think that the default is LocalQueue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The group queue is defined per integration. by default it is LocalQueue, and the group_name_selector is defined per class integration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's by integration or by kind?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by integration

Copy link
Contributor

@shalev007 shalev007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good just some variable and comments renaming

Copy link
Member

@matan84 matan84 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Make sure to cleanup all leftover comments

loop = asyncio.get_event_loop()
config = ocean.integration.context.config

for path in self._event_queues.keys():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's by integration or by kind?

@github-actions
Copy link
Contributor

github-actions bot commented Aug 5, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/16746037366/artifacts/3689532378

Code Coverage Total Percentage: 84.07%

@IdansPort IdansPort merged commit c602312 into main Aug 5, 2025
17 checks passed
@IdansPort IdansPort deleted the PORT-14655-PARALLEL-LIVE-INGEST-CORE branch August 5, 2025 10:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants