|
| 1 | +package queue |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "errors" |
| 6 | + "math/rand" |
| 7 | + |
| 8 | + "github.com/lightningnetwork/lnd/fn/v2" |
| 9 | +) |
| 10 | + |
| 11 | +// DropPredicate decides whether to drop an item when the queue is full. |
| 12 | +// It receives the current queue length and the item, and returns true to drop, |
| 13 | +// false to enqueue. |
| 14 | +type DropPredicate[T any] func(queueLen int, item T) bool |
| 15 | + |
| 16 | +// ErrQueueFullAndDropped is returned by Enqueue when the item is dropped |
| 17 | +// due to the DropPredicate. |
| 18 | +var ErrQueueFullAndDropped = errors.New("queue full and item dropped") |
| 19 | + |
| 20 | +// BackpressureQueue is a generic, fixed-capacity queue with predicate-based |
| 21 | +// drop behavior. When full, it uses the DropPredicate to perform early drops |
| 22 | +// (e.g., RED-style). |
| 23 | +type BackpressureQueue[T any] struct { |
| 24 | + ch chan T |
| 25 | + dropPredicate DropPredicate[T] |
| 26 | +} |
| 27 | + |
| 28 | +// NewBackpressureQueue creates a new BackpressureQueue with the given capacity |
| 29 | +// and drop predicate. |
| 30 | +func NewBackpressureQueue[T any](capacity int, |
| 31 | + predicate DropPredicate[T]) *BackpressureQueue[T] { |
| 32 | + |
| 33 | + return &BackpressureQueue[T]{ |
| 34 | + ch: make(chan T, capacity), |
| 35 | + dropPredicate: predicate, |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +// Enqueue attempts to add an item to the queue, respecting context |
| 40 | +// cancellation. Returns ErrQueueFullAndDropped if dropped, or context error if |
| 41 | +// ctx is done before enqueue. Otherwise, `nil` is returned on success. |
| 42 | +func (q *BackpressureQueue[T]) Enqueue(ctx context.Context, |
| 43 | + item T) error { |
| 44 | + |
| 45 | + // First, consult the drop predicate based on the current queue length. |
| 46 | + // If the predicate decides to drop the item, return true (dropped). |
| 47 | + if q.dropPredicate(len(q.ch), item) { |
| 48 | + return ErrQueueFullAndDropped |
| 49 | + } |
| 50 | + |
| 51 | + // If the predicate decides not to drop, attempt to enqueue the item. |
| 52 | + select { |
| 53 | + case q.ch <- item: |
| 54 | + return nil |
| 55 | + |
| 56 | + default: |
| 57 | + // Channel is full, and the predicate decided not to drop. We |
| 58 | + // must block until space is available or context is cancelled. |
| 59 | + select { |
| 60 | + case q.ch <- item: |
| 61 | + return nil |
| 62 | + |
| 63 | + case <-ctx.Done(): |
| 64 | + return ctx.Err() |
| 65 | + } |
| 66 | + } |
| 67 | +} |
| 68 | + |
| 69 | +// Dequeue retrieves the next item from the queue, blocking until available or |
| 70 | +// context done. Returns the item or an error if ctx is done before an item is |
| 71 | +// available. |
| 72 | +func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T] { |
| 73 | + select { |
| 74 | + |
| 75 | + case item := <-q.ch: |
| 76 | + return fn.Ok(item) |
| 77 | + |
| 78 | + case <-ctx.Done(): |
| 79 | + return fn.Err[T](ctx.Err()) |
| 80 | + } |
| 81 | +} |
| 82 | + |
| 83 | +// redConfig holds configuration for RandomEarlyDrop. |
| 84 | +type redConfig struct { |
| 85 | + randSrc func() float64 |
| 86 | +} |
| 87 | + |
| 88 | +// REDOption is a functional option for configuring RandomEarlyDrop. |
| 89 | +type REDOption func(*redConfig) |
| 90 | + |
| 91 | +// WithRandSource provides a custom random number source (a function that |
| 92 | +// returns a float64 between 0.0 and 1.0). |
| 93 | +func WithRandSource(src func() float64) REDOption { |
| 94 | + return func(cfg *redConfig) { |
| 95 | + cfg.randSrc = src |
| 96 | + } |
| 97 | +} |
| 98 | + |
| 99 | +// RandomEarlyDrop returns a DropPredicate that implements Random Early |
| 100 | +// Detection (RED), inspired by TCP-RED queue management. |
| 101 | +// |
| 102 | +// RED prevents sudden buffer overflows by proactively dropping packets before |
| 103 | +// the queue is full. It establishes two thresholds: |
| 104 | +// |
| 105 | +// 1. minThreshold: queue length below which no drops occur. |
| 106 | +// 2. maxThreshold: queue length at or above which all items are dropped. |
| 107 | +// |
| 108 | +// Between these points, the drop probability p increases linearly: |
| 109 | +// |
| 110 | +// p = (queueLen - minThreshold) / (maxThreshold - minThreshold) |
| 111 | +// |
| 112 | +// For example, with minThreshold=15 and maxThreshold=35: |
| 113 | +// - At queueLen=15, p=0.0 (0% drop chance) |
| 114 | +// - At queueLen=25, p=0.5 (50% drop chance) |
| 115 | +// - At queueLen=35, p=1.0 (100% drop chance) |
| 116 | +// |
| 117 | +// This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy, |
| 118 | +// and gives early back-pressure signals to senders. |
| 119 | +func RandomEarlyDrop[T any](minThreshold, maxThreshold int, opts ...REDOption) DropPredicate[T] { |
| 120 | + cfg := redConfig{ |
| 121 | + randSrc: rand.Float64, |
| 122 | + } |
| 123 | + |
| 124 | + for _, opt := range opts { |
| 125 | + opt(&cfg) |
| 126 | + } |
| 127 | + if cfg.randSrc == nil { |
| 128 | + cfg.randSrc = rand.Float64 |
| 129 | + } |
| 130 | + |
| 131 | + return func(queueLen int, _ T) bool { |
| 132 | + // If the queue is below the minimum threshold, then we never |
| 133 | + // drop. |
| 134 | + if queueLen < minThreshold { |
| 135 | + return false |
| 136 | + } |
| 137 | + |
| 138 | + // If the queue is at or above the maximum threshold, then we |
| 139 | + // always drop. |
| 140 | + if queueLen >= maxThreshold { |
| 141 | + return true |
| 142 | + } |
| 143 | + |
| 144 | + // If we're in the middle, then we implement linear scaling of |
| 145 | + // the drop probability based on our thresholds. At this point, |
| 146 | + // minThreshold <= queueLen < maxThreshold. This also implies |
| 147 | + // minThreshold < maxThreshold, so denominator won't be zero. |
| 148 | + denominator := float64(maxThreshold - minThreshold) |
| 149 | + |
| 150 | + p := float64(queueLen-minThreshold) / denominator |
| 151 | + |
| 152 | + return cfg.randSrc() < p |
| 153 | + } |
| 154 | +} |
0 commit comments