Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion common/quotas/dynamicratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,20 @@ type DynamicRateLimiter struct {
ttl time.Duration
lastUpdateTime atomic.Pointer[time.Time]
minBurst int
burstMultiplier float64
}

type DynamicRateLimiterOpts struct {
TTL time.Duration
MinBurst int
BurstMultiplier float64
TimeSource clock.TimeSource
}

var _defaultOpts = DynamicRateLimiterOpts{
TTL: _ttl,
MinBurst: _minBurst,
BurstMultiplier: 1,
TimeSource: clock.NewRealTimeSource(),
}

Expand All @@ -64,6 +67,12 @@ func NewDynamicRateLimiter(rps RPSFunc) Limiter {
return NewDynamicRateLimiterWithOpts(rps, _defaultOpts)
}

func NewBurstyDynamicRateLimiter(rps RPSFunc, multiplier float64) Limiter {
opts := _defaultOpts
opts.BurstMultiplier = multiplier
return NewDynamicRateLimiterWithOpts(rps, opts)
}

func NewDynamicRateLimiterWithOpts(rps RPSFunc, opts DynamicRateLimiterOpts) Limiter {
ts := opts.TimeSource
if ts == nil {
Expand All @@ -74,6 +83,7 @@ func NewDynamicRateLimiterWithOpts(rps RPSFunc, opts DynamicRateLimiterOpts) Lim
timeSource: ts,
ttl: opts.TTL,
minBurst: opts.MinBurst,
burstMultiplier: opts.BurstMultiplier,
}
now := res.timeSource.Now()
res.lastUpdateTime.Store(&now)
Expand Down Expand Up @@ -116,7 +126,7 @@ func (d *DynamicRateLimiter) maybeRefreshRps() {

func (d *DynamicRateLimiter) getLimitAndBurst() (rate.Limit, int) {
rps := d.rps()
burst := max(int(math.Ceil(rps)), d.minBurst)
burst := max(int(math.Ceil(rps)*d.burstMultiplier), d.minBurst)
// If we have 0 rps we have to zero out the burst to immediately cut off new permits
if rps == 0 {
burst = 0
Expand Down
2 changes: 1 addition & 1 deletion common/quotas/global/collection/internal/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ const (
// NewFallbackLimiter returns a quotas.Limiter that uses a simpler fallback when necessary,
// and attempts to keep both the fallback and the "real" limiter "warm" by mirroring calls
// between the two regardless of which is being used.
func NewFallbackLimiter(fallback quotas.Limiter) *FallbackLimiter {
func NewFallbackLimiter(fallback quotas.Limiter) *FallbackLimiter {
l := &FallbackLimiter{
// start from 0 as a default, the limiter is unused until it is updated.
//
Expand Down
21 changes: 19 additions & 2 deletions common/quotas/permember/permember.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,37 @@ func NewPerMemberDynamicRateLimiterFactory(
}
}

func NewPerMemberBurstyDynamicRateLimiterFactory(
service string,
globalRPS dynamicproperties.IntPropertyFnWithDomainFilter,
instanceRPS dynamicproperties.IntPropertyFnWithDomainFilter,
resolver membership.Resolver,
burstMultiplier float64,
) quotas.LimiterFactory {
return perMemberFactory{
service: service,
globalRPS: globalRPS,
instanceRPS: instanceRPS,
resolver: resolver,
burstMultiplier: burstMultiplier,
}
}

type perMemberFactory struct {
service string
globalRPS dynamicproperties.IntPropertyFnWithDomainFilter
instanceRPS dynamicproperties.IntPropertyFnWithDomainFilter
resolver membership.Resolver
burstMultiplier float64
}

func (f perMemberFactory) GetLimiter(domain string) quotas.Limiter {
return quotas.NewDynamicRateLimiter(func() float64 {
return quotas.NewBurstyDynamicRateLimiter(func() float64 {
return PerMember(
f.service,
float64(f.globalRPS(domain)),
float64(f.instanceRPS(domain)),
f.resolver,
)
})
}, f.burstMultiplier)
}
15 changes: 11 additions & 4 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,14 @@ func (s *Service) createGlobalQuotaCollections() (globalRatelimiterCollections,

// to safely shadow global ratelimits, we must make duplicate *quota.Collection collections
// so they do not share data when the global limiter decides to use its local fallback.
// these are then combined into the global/algorithm.Collection to handle all limiting calls
local, global := s.createBaseLimiters(), s.createBaseLimiters()
// these are then combined into the global/algorithm.Collection to handle all limiting calls.
//
// local limiters have a burst multiplier of 1, which means RPS == burst, as this is the historical behavior.
//
// global limiters use a larger burst in their fallback limiter only (i.e. when in "local, no data yet" mode)
// to improve behavior for users who are low volume but send small bursts of requests that might exceed the
// normal local limit. this gives some time for the collection to gather data and adjust to the real load.
local, global := s.createBaseLimiters(1), s.createBaseLimiters(5)

user, err := create("user", local.user, global.user, s.config.GlobalDomainUserRPS)
combinedErr = multierr.Combine(combinedErr, err)
Expand All @@ -264,13 +270,14 @@ func (s *Service) createGlobalQuotaCollections() (globalRatelimiterCollections,
async: async,
}, combinedErr
}
func (s *Service) createBaseLimiters() ratelimiterCollections {
func (s *Service) createBaseLimiters(burstMultiplier float64) ratelimiterCollections {
create := func(shared, perInstance dynamicproperties.IntPropertyFnWithDomainFilter) *quotas.Collection {
return quotas.NewCollection(permember.NewPerMemberDynamicRateLimiterFactory(
return quotas.NewCollection(permember.NewPerMemberBurstyDynamicRateLimiterFactory(
service.Frontend,
shared,
perInstance,
s.GetMembershipResolver(),
burstMultiplier,
))
}
return ratelimiterCollections{
Expand Down
Loading