-
Notifications
You must be signed in to change notification settings - Fork 140
feat(flowcontrol): Implement ShardProcessor engine #1203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(flowcontrol): Implement ShardProcessor engine #1203
Conversation
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Hi @LukeAVanDrie. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
/ok-to-test |
05b21f8
to
7176d6c
Compare
// `dispatchCycle` methods). New requests are funneled to this goroutine via the `enqueueChan`. | ||
// | ||
// This design choice is critical for correctness and performance. The `Run` method's goroutine has exclusive ownership | ||
// of all operations that *add* items to the queues (`enqueue`). This prevents the most dangerous type of race |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This design materially increased the complexity, I don't follow why a multi-writer/single-reader queue is not sufficient here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great question. You're right, a standard multi-writer queue would absolutely be simpler under normal circumstances.
The main reason for this single-writer approach is the complexity of our capacity check. The system needs to validate capacity not just for an individual flow's queue, but hierarchically against the entire priority band and the overall shard.
In a multi-writer model, that "check-then-act" sequence would require a coarse, shard-wide lock to prevent race conditions, which would likely create a contention bottleneck. This channel-based design serializes the additions, making that multi-level capacity check atomic without locks. The ShardProcessor
's core design is built around this to ensure correctness and performance.
This model also gives us a big advantage for the future. We plan to add displacement policies (e.g., a high-priority item evicting lower-priority ones), which involves a complex, multi-step atomic transaction. Serializing all state-mutating operations through this single goroutine will make that much safer to implement.
This is a key design point, and I realize the rationale could be clearer. I'll add a more detailed comment to the ShardProcessor struct to explain these trade-offs for future contributors. Does this reasoning make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enqueueing the requests is being done sequentially anyway by the shard processor, so why would a shard-wide lock for the writers be worse here? also, I think we can be approximate when checking for capacity, we don't need to be exact, so perhaps a lock for capacity checking is not even necessary?
Not asking to change, just trying to think through the rationale behind this choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the follow-up and for pushing on this, these are exactly the right questions to be asking to make sure the design is robust. You've hit on the core trade-offs I was weighing, and I'd like to walk you through my reasoning.
- Why the Buffered Channel is Better Than a Lock Here
You're right, the processor does its actual enqueue work sequentially in either model. The key difference is that the buffered channel serves a critical dual purpose: it decouples the caller and it provides a high-fidelity backpressure signal.
- Decoupling: With a lock, the
FlowController
's main goroutine would block every time it callsEnqueue
on a busy shard, stalling the entire distribution process. The channel's non-blocking send frees the controller instantly, which is critical for system throughput. - Backpressure Signal: This is just as important. The state of the buffered channel is the backpressure signal. If a send fails because the buffer is full, the
FlowController
receives an immediate, unambiguous signal that this specific shard is saturated. This is invaluable data for its distribution algorithm, allowing the system to be more intelligent in its load balancing—a capability a simple lock can't offer.
- Why Strict Capacity is Better Than Approximate
This leads to the next logical question: could we simplify even further by removing the need for strict synchronization and allowing approximate capacity checks?
This is a valid architectural path, but it has two significant technical downsides in our specific context:
- It offers a performance gain we don't need. The current non-blocking send to a buffered channel is already extremely fast. A lock-free atomic model would add non-deterministic behavior without solving a real-world performance bottleneck.
- It would make our future roadmap significantly harder to implement. This is the most critical point. The roadmap includes features like displacement.
- The Displacement Case
You could argue that for a complex, multi-step transaction like displacement (add one item, remove one or more others), a single coarse-grained lock would make ensuring atomicity "easier".
However, the true cost of that lock becomes apparent when you consider its impact on the FlowController
. A displacement transaction could be long.
- With a lock, the
FlowController
's goroutine would be blocked for the entire duration of that long transaction. The central distributor would be stalled, unable to process other requests, even those destined for different, non-busy shards. - With the channel, the
FlowController
performs its non-blocking handoff and is immediately free. While theShardProcessor
is busy with the long displacement, theFlowController
can continue its work, processing other requests and distributing them to other available shards.
The channel model contains the performance cost of a slow operation to the single ShardProcessor
's goroutine, preventing it from bubbling up and halting the entire FlowController
. The lock-based model would cause that performance cost to block the central distributor.
In summary, my hope is that while the ShardProcessor
's internals may seem complex, this design results in a system that is higher-throughput and provides a robust, extensible foundation for the more complex features we know are coming.
I've made sure this full rationale is captured clearly in the code comments (I migrated it to the package comment and refer to it from the ShardProcesser
go-doc comment now. Thank you for taking the time to dig into this with me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the elaborate reply, this is an interesting design choice indeed!
Decoupling: With a lock, the FlowController's main goroutine would block every time it calls Enqueue on a busy shard, stalling the entire distribution process. The channel's non-blocking send frees the controller instantly, which is critical for system throughput.
We don't have everything wired up yet so I might be wrong, but in my mind it is not the flowcontroller main goroutine that should block, it is the request goroutine; I assumed that the request goroutine would call enqueue and blocks until it gets its chance to add the request to the queue.
This looks good to me, please address the two suggestions on code change to move forward. |
7176d6c
to
d668378
Compare
Thank you for the thorough review! I've addressed the suggestions and pushed up the changes. |
Just FYI: I plan on taking a look through this, this afternoon, I want to make sure I'm up to speed here. Sorry for delays on my end |
Introduces the `ShardProcessor`, the core data plane worker for the `FlowController`. Each processor is paired with a `RegistryShard` and is responsible for all request lifecycle operations on that shard, including enqueueing, dispatching, and background cleanup. The processor's design is centered on correctness and performance in a highly concurrent environment. It uses a single-goroutine ownership model for its main `Run` loop, which serializes all queue write operations (enqueues) through a channel. This makes the critical "check-then-act" capacity logic safe without complex locking. The `flowItem` uses a `sync.Once` to guarantee idempotent finalization, deterministically resolving the race between dispatch and expiry. Resilience is achieved through a two-tiered error handling strategy that distinguishes between inter-flow and intra-flow policy failures. This isolates faults, prevents tight-loop error conditions, and ensures a failure in one priority band does not halt progress in others. To simplify policy logic, this change also introduces the `BandFilter` abstraction. This acts as a pre-policy gate, decoupling the logic of *viability* (e.g., is a flow backpressured?) from the logic of *selection* (e.g., which flow is next?). A comprehensive, high-fidelity test suite with stateful mocks is included to reliably verify the processor's complex concurrent behavior, race conditions, and failure modes.
d668378
to
30fbecb
Compare
You might notice a seemingly unrelated change in The original test for the besthead policy was non-deterministic because it iterated over a map to build its test cases. This meant the test would sometimes fail to trigger a necessary panic, causing intermittent CI failures. The fix was to change the test setup to use a slice instead of a map, guaranteeing a deterministic iteration order and making the test reliable. I've included this fix in this PR to ensure the CI remains green If this PR plans to stay open for much longer, I will put this fix into it's own isolated PR to resolve flaky CI for others contributing to the repo today. Edit: split off into #1231 to unblock main |
// The effectiveness of the sharded model hinges on a critical assumption: while the system as a whole manages a | ||
// heterogeneous set of flows, the traffic *within a single logical flow* is assumed to be roughly homogeneous in its | ||
// characteristics. A logical flow is intended to represent a single workload or tenant; therefore, the most | ||
// unpredictable variables (effecting decode behavior) are expected to be statistically similar *within* that flow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'the most unpredictable variables' since this seems to be the core of the critical assumption, it may be good to list these variables.
// This model's true power is that it provides a robust foundation for future features like **displacement** (a | ||
// high-priority item evicting lower-priority ones). This is an "all-or-nothing" atomic transaction that is | ||
// exceptionally difficult to implement correctly in a lock-free or coarse-grained locking model without significant | ||
// performance penalties. The single-writer model contains the performance cost of such a potentially long transaction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'performance penalties' did we do some rudimentary benchmarking to compare limitations? Or is there literature on these approaches?
// Phase 2 (Future): This is where per-flow saturation logic would go. | ||
// It would iterate `band`, call `IsSaturated(ctx, flowID)`, and build a filtered map of allowed flows. | ||
// For now, no per-flow filtering is done. Return nil to signal the fast path. | ||
return nil, false // Do not pause, and do not filter any flows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would per-flow saturation look like? Is this a full expenditure of 'fairness' budget?
|
||
if err := sp.dispatchItem(item, dispatchPriority, logger); err != nil { | ||
// All errors from dispatchItem are considered intra-flow and unrecoverable for this band in this cycle. | ||
logger.Error(err, "Failed to dispatch item, skipping priority band for this cycle") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what sort of errors could happen from dispatch?
Jumping an entire priority band (and subsequently potentially dispatching a lower priority band if the failure is intermittent) might be a surprising behavior.
// FUTURE EXTENSION POINT: The iteration over priority bands is currently a simple, strict-priority loop. | ||
// This could be abstracted into a third policy tier (e.g., an `InterBandDispatchPolicy`) if more complex scheduling | ||
// between bands, such as Weighted Fair Queuing (WFQ), is ever required. For now, strict priority is sufficient. | ||
for _, priority := range sp.shard.AllOrderedPriorityLevels() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the Critical
band fills up faster than this can clear it (and the pool has capacity for all requests)? I'm wondering how likely is that to happen, latency seems to be variable because the saturation detection logic is ran per flow (eventually), so this seems plausible as the saturation detection logic could grow sophisticated/expensive?
Would that mean that lower criticality requests do not get dispatched, even with capacity available?
if !sp.dispatchCycle(ctx) { | ||
// If no work was done, yield to other goroutines to prevent a tight, busy-loop when idle, but allow for | ||
// immediate rescheduling. | ||
runtime.Gosched() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
musing: We should probably look into our GOMAXPROCS
if we start to hit scaling limits.
// enqueueChannelBufferSize sets the size of the buffered channel that accepts incoming requests for the shard | ||
// processor. This buffer acts as a "shock absorber," decoupling the upstream distributor from the processor's main | ||
// loop and allowing the system to handle short, intense bursts of traffic without blocking the distributor. | ||
enqueueChannelBufferSize = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may need to make this configurable
|
||
// Enqueue sends a new flow item to the processor's internal channel for asynchronous processing by its main `Run` loop. | ||
// If the processor is shutting down, it immediately finalizes the item with a shutdown error. | ||
func (sp *ShardProcessor) Enqueue(item *flowItem) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this called?
// This avoids holding locks on the shard while processing, and allows us to determine the optimal number of workers. | ||
var queuesToProcess []framework.FlowQueueAccessor | ||
for _, priority := range sp.shard.AllOrderedPriorityLevels() { | ||
band, err := sp.shard.PriorityBandAccessor(priority) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions:
Where is PriorityBandAccessor
defined?
What is the lifecycle of subsetPriorityBandAccessor
?
How are the allowed flows determined?
Left some comments, overall it seems reasonable! I'm curious to see how the single routine shardprocessor works in practice. How is |
/lgtm leaving approve to Kellen |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: kfswain, LukeAVanDrie The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
) Introduces the `ShardProcessor`, the core data plane worker for the `FlowController`. Each processor is paired with a `RegistryShard` and is responsible for all request lifecycle operations on that shard, including enqueueing, dispatching, and background cleanup. The processor's design is centered on correctness and performance in a highly concurrent environment. It uses a single-goroutine ownership model for its main `Run` loop, which serializes all queue write operations (enqueues) through a channel. This makes the critical "check-then-act" capacity logic safe without complex locking. The `flowItem` uses a `sync.Once` to guarantee idempotent finalization, deterministically resolving the race between dispatch and expiry. Resilience is achieved through a two-tiered error handling strategy that distinguishes between inter-flow and intra-flow policy failures. This isolates faults, prevents tight-loop error conditions, and ensures a failure in one priority band does not halt progress in others. To simplify policy logic, this change also introduces the `BandFilter` abstraction. This acts as a pre-policy gate, decoupling the logic of *viability* (e.g., is a flow backpressured?) from the logic of *selection* (e.g., which flow is next?). A comprehensive, high-fidelity test suite with stateful mocks is included to reliably verify the processor's complex concurrent behavior, race conditions, and failure modes.
This pull request introduces the
ShardProcessor
, the core data plane engine of theFlowController
. This is the heart of the flow control system, making it capable of processing, queuing, and dispatching requests.This PR is part of a stack.
ShardProcessor
Incremental diff against #1202: LukeAVanDrie/gateway-api-inference-extension@refactor/flow-control-mocks...feat/flow-control-controller-shard-processor
Each
ShardProcessor
is paired one-to-one with acontracts.RegistryShard
and is responsible for all request lifecycle operations on that shard. It acts as the concurrent worker that executes against the state provided by the shard, orchestrating policies and managing requests from initial enqueue to a final outcome (dispatch, eviction, or rejection).Key Changes
ShardProcessor
Implementation:Run
loop, which uses aselect
statement to interleave enqueueing new requests with dispatching existing ones. This balances responsiveness to new arrivals with draining the existing backlog.dispatchCycle
, which iterates through priority bands and applies policies to select an item for dispatch.runExpiryCleanup
goroutine that periodically scans all queues for expired items (due to TTL or context cancellation).Robust Concurrency and State Management:
enqueue
) are funneled through a single channel, which makes the critical "check-then-act" capacity logic safe from race conditions without requiring complex locking.flowItem
usessync.Once
for itsfinalize
method. This guarantees idempotent, exactly-once finalization and deterministically resolves the critical race condition between the dispatch and expiry cleanup loops.Resilient Error Handling:
errInterFlow
anderrIntraFlow
sentinel errors.errInterFlow
) aborts the current priority band but allows the processor to continue to lower-priority bands. A failure after a queue is selected (errIntraFlow
) aborts the band for the entire cycle to prevent tight-loop error conditions.BandFilter
Abstraction:BandFilter
function type, which acts as a pre-policy gate. This decouples the logic of determining request viability (e.g., is the system saturated?) from the logic of selection (e.g., which flow is the fairest to pick next?).NewSaturationFilter
is provided as the default implementation.New Contracts and Mocks:
contracts.SaturationDetector
interface, which acts as a service port for the engine to query for system load.MockManagedQueue
to enable deterministic testing of the processor's concurrent logic.Testing Strategy
The
ShardProcessor
is a complex, concurrent orchestrator. To test it reliably, this PR introduces an extensive, high-fidelitytestHarness
inprocessor_test.go
.errIntraFlow
circuit breaker.A detailed explanation of this testing philosophy is included in the comments at the top of
processor_test.go
.