Skip to content

Commit 3511604

Browse files
committed
queue: add new BackpressureQueue[T] variant
In this commit, we add a new type of queue: the back pressure queue. This is a bounded queue based on a simple channel, that will consult a predicate to decide if we should preemptively drop a message or not. We then provide a sample predicate for this use case, based on random early dropping. Given a min and max threshold, we'll start to drop message randomly once we get past the min threshold, ramping up to the max threshold where we'll start to always drop the message.
1 parent c33fbfb commit 3511604

File tree

4 files changed

+544
-1
lines changed

4 files changed

+544
-1
lines changed

queue/back_pressure.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package queue
2+
3+
import (
4+
"context"
5+
"errors"
6+
"math/rand"
7+
8+
"github.com/lightningnetwork/lnd/fn/v2"
9+
)
10+
11+
// DropPredicate decides whether to drop an item when the queue is full.
12+
// It receives the current queue length and the item, and returns true to drop,
13+
// false to enqueue.
14+
type DropPredicate[T any] func(queueLen int, item T) bool
15+
16+
// ErrQueueFullAndDropped is returned by Enqueue when the item is dropped
17+
// due to the DropPredicate.
18+
var ErrQueueFullAndDropped = errors.New("queue full and item dropped")
19+
20+
// BackpressureQueue is a generic, fixed-capacity queue with predicate-based
21+
// drop behavior. When full, it uses the DropPredicate to perform early drops
22+
// (e.g., RED-style).
23+
type BackpressureQueue[T any] struct {
24+
ch chan T
25+
dropPredicate DropPredicate[T]
26+
}
27+
28+
// NewBackpressureQueue creates a new BackpressureQueue with the given capacity
29+
// and drop predicate.
30+
func NewBackpressureQueue[T any](capacity int,
31+
predicate DropPredicate[T]) *BackpressureQueue[T] {
32+
33+
return &BackpressureQueue[T]{
34+
ch: make(chan T, capacity),
35+
dropPredicate: predicate,
36+
}
37+
}
38+
39+
// Enqueue attempts to add an item to the queue, respecting context
40+
// cancellation. Returns ErrQueueFullAndDropped if dropped, or context error if
41+
// ctx is done before enqueue. Otherwise, `nil` is returned on success.
42+
func (q *BackpressureQueue[T]) Enqueue(ctx context.Context,
43+
item T) error {
44+
45+
// First, consult the drop predicate based on the current queue length.
46+
// If the predicate decides to drop the item, return true (dropped).
47+
if q.dropPredicate(len(q.ch), item) {
48+
return ErrQueueFullAndDropped
49+
}
50+
51+
// If the predicate decides not to drop, attempt to enqueue the item.
52+
select {
53+
case q.ch <- item:
54+
return nil
55+
56+
default:
57+
// Channel is full, and the predicate decided not to drop. We
58+
// must block until space is available or context is cancelled.
59+
select {
60+
case q.ch <- item:
61+
return nil
62+
63+
case <-ctx.Done():
64+
return ctx.Err()
65+
}
66+
}
67+
}
68+
69+
// Dequeue retrieves the next item from the queue, blocking until available or
70+
// context done. Returns the item or an error if ctx is done before an item is
71+
// available.
72+
func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T] {
73+
select {
74+
75+
case item := <-q.ch:
76+
return fn.Ok(item)
77+
78+
case <-ctx.Done():
79+
return fn.Err[T](ctx.Err())
80+
}
81+
}
82+
83+
// redConfig holds configuration for RandomEarlyDrop.
84+
type redConfig struct {
85+
randSrc func() float64
86+
}
87+
88+
// REDOption is a functional option for configuring RandomEarlyDrop.
89+
type REDOption func(*redConfig)
90+
91+
// WithRandSource provides a custom random number source (a function that
92+
// returns a float64 between 0.0 and 1.0).
93+
func WithRandSource(src func() float64) REDOption {
94+
return func(cfg *redConfig) {
95+
cfg.randSrc = src
96+
}
97+
}
98+
99+
// RandomEarlyDrop returns a DropPredicate that implements Random Early
100+
// Detection (RED), inspired by TCP-RED queue management.
101+
//
102+
// RED prevents sudden buffer overflows by proactively dropping packets before
103+
// the queue is full. It establishes two thresholds:
104+
//
105+
// 1. minThreshold: queue length below which no drops occur.
106+
// 2. maxThreshold: queue length at or above which all items are dropped.
107+
//
108+
// Between these points, the drop probability p increases linearly:
109+
//
110+
// p = (queueLen - minThreshold) / (maxThreshold - minThreshold)
111+
//
112+
// For example, with minThreshold=15 and maxThreshold=35:
113+
// - At queueLen=15, p=0.0 (0% drop chance)
114+
// - At queueLen=25, p=0.5 (50% drop chance)
115+
// - At queueLen=35, p=1.0 (100% drop chance)
116+
//
117+
// This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy,
118+
// and gives early back-pressure signals to senders.
119+
func RandomEarlyDrop[T any](minThreshold, maxThreshold int, opts ...REDOption) DropPredicate[T] {
120+
cfg := redConfig{
121+
randSrc: rand.Float64,
122+
}
123+
124+
for _, opt := range opts {
125+
opt(&cfg)
126+
}
127+
if cfg.randSrc == nil {
128+
cfg.randSrc = rand.Float64
129+
}
130+
131+
return func(queueLen int, _ T) bool {
132+
// If the queue is below the minimum threshold, then we never
133+
// drop.
134+
if queueLen < minThreshold {
135+
return false
136+
}
137+
138+
// If the queue is at or above the maximum threshold, then we
139+
// always drop.
140+
if queueLen >= maxThreshold {
141+
return true
142+
}
143+
144+
// If we're in the middle, then we implement linear scaling of
145+
// the drop probability based on our thresholds. At this point,
146+
// minThreshold <= queueLen < maxThreshold. This also implies
147+
// minThreshold < maxThreshold, so denominator won't be zero.
148+
denominator := float64(maxThreshold - minThreshold)
149+
150+
p := float64(queueLen-minThreshold) / denominator
151+
152+
return cfg.randSrc() < p
153+
}
154+
}

0 commit comments

Comments
 (0)