Commit c602312
[Core] feat: implement core change for group queue (#1936)
### **User description**
# Description
Why
Problem: GitHub webhook events for the same resource (like a PR) were
getting processed out of order when multiple workers ran concurrently.
This broke temporal consistency - you'd get a "PR closed" event
processed before "PR updated", leading to stale data.
Solution: Group events by resource ID and ensure only one worker
processes events for any given resource at a time, while still allowing
parallel processing across different resources.
What
Three core changes:
Group-based queuing: Added GroupQueue that partitions events by group_id
and locks groups during processing
Resource identification: Created group_selector.py to extract consistent
resource IDs from GitHub webhook payloads
Multi-worker coordination: Modified processor manager to spawn multiple
workers that respect group locks
How
Resource Identification: Extract consistent IDs from GitHub events
(e.g., pull_request-123, issue-456) to group related events together.
Group Queue: Queue implementation that enforces sequential processing
within groups while allowing concurrent processing across groups.
Workers lock groups on get() and unlock on commit().
Worker Pool: Multiple workers per webhook path, each pulling from the
shared group queue. Integration automatically chooses group-aware or
simple processing based on worker count configuration.
Result: Events for the same resource process in order, different
resources process in parallel.
## Type of change
Please leave one option from the following and delete the rest:
- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [x] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)
<h4> All tests should be run against the port production
environment(using a testing org). </h4>
### Core testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector
### Integration testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)
### Preflight checklist
- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account
## Screenshots
Overview of the flow:
<img width="841" height="760" alt="image"
src="https://github.com/user-attachments/assets/ad8b39db-530f-4e43-9e82-66f41dbce66b"
/>
Include screenshots from your environment showing how the resources of
the integration will look.
## API Documentation
Provide links to the API documentation used for this integration.
___
### **PR Type**
Enhancement
___
### **Description**
- Implement group-based queue for ordered event processing
- Add multi-worker support with group locking mechanism
- Ensure sequential processing within resource groups
- Enable parallel processing across different resource groups
___
### Diagram Walkthrough
```mermaid
flowchart LR
A["Webhook Events"] --> B["GroupQueue"]
B --> C["Group Selector"]
C --> D["Resource Groups"]
D --> E["Worker Pool"]
E --> F["Sequential Processing per Group"]
E --> G["Parallel Processing across Groups"]
```
<details> <summary><h3> File Walkthrough</h3></summary>
<table><thead><tr><th></th><th align="left">Relevant
files</th></tr></thead><tbody><tr><td><strong>Configuration
changes</strong></td><td><details><summary>1 files</summary><table>
<tr>
<td><strong>settings.py</strong><dd><code>Add event workers
configuration setting</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-c4ab144128bdbbe50dd95b726ae9c2f4cbc7306025e07a9e1db3dff7c5d4b3bb">+2/-0</a>
</td>
</tr>
</table></details></td></tr><tr><td><strong>Enhancement</strong></td><td><details><summary>6
files</summary><table>
<tr>
<td><strong>__init__.py</strong><dd><code>Export GroupQueue
class</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-c9170904db583a716fd64819353c6319e176ef9dff07e4355d491a410627efa5">+2/-1</a>
</td>
</tr>
<tr>
<td><strong>abstract_queue.py</strong><dd><code>Add size method to queue
interface</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-c5f159796d2303aac77a0311758cafb17185c6b2ac983d3d6519754bb163ae5f">+8/-0</a>
</td>
</tr>
<tr>
<td><strong>group_queue.py</strong><dd><code>Implement group-based queue
with worker coordination</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-8d8b3ee228422ff5eb26b7221858028a7bf52eb77712ffd81cf13daf7f7e619b">+156/-0</a>
</td>
</tr>
<tr>
<td><strong>local_queue.py</strong><dd><code>Implement size method for
local queue</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-334e57aa010cbd80d458171ffd86093c6076ef97648f4d26ad8db9ea05b133e7">+3/-0</a>
</td>
</tr>
<tr>
<td><strong>processor_manager.py</strong><dd><code>Replace single
processor with multi-worker system</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-58a2a8bcf5e453cdb2b298a8edcb716c3c3454bb37eed3d6fb3c0c88b50e8b6d">+68/-70</a>
</td>
</tr>
<tr>
<td><strong>webhook_event.py</strong><dd><code>Add group_id field to
webhook events</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-b731c7033797cd3ff458c104fde99743a6ec09baa1c2da550975f1e383c3143e">+2/-0</a>
</td>
</tr>
</table></details></td></tr><tr><td><strong>Tests</strong></td><td><details><summary>1
files</summary><table>
<tr>
<td><strong>test_group_queue.py</strong><dd><code>Comprehensive test
suite for group queue</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-c6c81784c82c06f64808e8e6b211361fa4e24c31de973c7c9bc7ac544bbb1f26">+580/-0</a>
</td>
</tr>
</table></details></td></tr><tr><td><strong>Documentation</strong></td><td><details><summary>1
files</summary><table>
<tr>
<td><strong>CHANGELOG.md</strong><dd><code>Document parallel queue
implementation improvement</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-06572a96a58dc510037d5efa622f9bec8519bc1beab13c9f251e97e657a9d4ed">+5/-0</a>
</td>
</tr>
</table></details></td></tr><tr><td><strong>Miscellaneous</strong></td><td><details><summary>1
files</summary><table>
<tr>
<td><strong>pyproject.toml</strong><dd><code>Bump version to
0.26.2</code>
</dd></td>
<td><a
href="https://github.com/port-labs/ocean/pull/1936/files#diff-50c86b7ed8ac2cf95bd48334961bf0530cdc77b5a56f852c5c61b89d735fd711">+1/-1</a>
</td>
</tr>
</table></details></td></tr></tr></tbody></table>
</details>
___
---------
Co-authored-by: Matan <[email protected]>
Co-authored-by: Shalev Avhar <[email protected]>1 parent 805b3c9 commit c602312
File tree
10 files changed
+950
-80
lines changed- port_ocean
- config
- core/handlers
- queue
- webhook
- tests/core/handlers/queue
10 files changed
+950
-80
lines changed| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
6 | 6 | | |
7 | 7 | | |
8 | 8 | | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
9 | 25 | | |
10 | 26 | | |
11 | 27 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
9 | 9 | | |
10 | 10 | | |
11 | 11 | | |
| 12 | + | |
12 | 13 | | |
13 | 14 | | |
14 | 15 | | |
| |||
88 | 89 | | |
89 | 90 | | |
90 | 91 | | |
| 92 | + | |
91 | 93 | | |
92 | 94 | | |
93 | 95 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
1 | 1 | | |
2 | 2 | | |
| 3 | + | |
3 | 4 | | |
4 | | - | |
| 5 | + | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
7 | 7 | | |
8 | 8 | | |
9 | 9 | | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
10 | 13 | | |
11 | 14 | | |
12 | 15 | | |
| |||
22 | 25 | | |
23 | 26 | | |
24 | 27 | | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
25 | 33 | | |
26 | 34 | | |
27 | 35 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
23 | 23 | | |
24 | 24 | | |
25 | 25 | | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
0 commit comments