Skip to content

Commit 7f40b8c

Browse files
committed
Seperate and fix per labelset tracker
Signed-off-by: Essam Eldaly <[email protected]>
1 parent 9650f34 commit 7f40b8c

File tree

4 files changed

+205
-124
lines changed

4 files changed

+205
-124
lines changed

pkg/ingester/ingester.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,27 +1259,27 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12591259
switch cause := errors.Cause(err); {
12601260
case errors.Is(cause, storage.ErrOutOfBounds):
12611261
sampleOutOfBoundsCount++
1262-
i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, &copiedLabels)
1262+
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, copiedLabels.Hash())
12631263
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12641264

12651265
case errors.Is(cause, storage.ErrOutOfOrderSample):
12661266
sampleOutOfOrderCount++
1267-
i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, &copiedLabels)
1267+
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, copiedLabels.Hash())
12681268
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12691269

12701270
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
12711271
newValueForTimestampCount++
1272-
i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, &copiedLabels)
1272+
go i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, copiedLabels.Hash())
12731273
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12741274

12751275
case errors.Is(cause, storage.ErrTooOldSample):
12761276
sampleTooOldCount++
1277-
i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, &copiedLabels)
1277+
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, copiedLabels.Hash())
12781278
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12791279

12801280
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
12811281
perUserSeriesLimitCount++
1282-
i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, &copiedLabels)
1282+
go i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, copiedLabels.Hash())
12831283
updateFirstPartial(func() error {
12841284
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
12851285
})
@@ -1292,14 +1292,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12921292

12931293
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
12941294
perMetricSeriesLimitCount++
1295-
i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, &copiedLabels)
1295+
go i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, copiedLabels.Hash())
12961296
updateFirstPartial(func() error {
12971297
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels))
12981298
})
12991299

13001300
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
13011301
perLabelSetSeriesLimitCount++
1302-
i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, &copiedLabels)
1302+
go i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, copiedLabels.Hash())
1303+
for _, matchedLabelset := range matchedLabelSetLimits {
1304+
go i.validateMetrics.DiscardedSeriesPerLabelsetTracker.Track(userID, copiedLabels.Hash(), matchedLabelset.Hash, matchedLabelset.Id)
1305+
}
13031306
// We only track per labelset discarded samples for throttling by labelset limit.
13041307
reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit)
13051308
updateFirstPartial(func() error {

pkg/util/discardedseries/tracker.go

Lines changed: 141 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,41 @@ import (
55
"time"
66

77
"github.com/prometheus/client_golang/prometheus"
8-
"github.com/prometheus/prometheus/model/labels"
98
)
109

1110
const (
12-
vendMetricsInterval = 30 * time.Second
11+
vendMetricsInterval = 30 * time.Second
12+
perLabelsetSeriesLimit = "per_labelset_series_limit"
1313
)
1414

15-
type labelCounterStruct struct {
16-
*sync.RWMutex
17-
*labels.Labels
18-
inCurrentCycle bool
19-
}
20-
2115
type seriesCounterStruct struct {
2216
*sync.RWMutex
23-
seriesCountMap map[uint64]*labelCounterStruct
17+
seriesCountMap map[uint64]struct{}
18+
labelsetId string
2419
}
2520

2621
type userCounterStruct struct {
2722
*sync.RWMutex
2823
userSeriesMap map[string]*seriesCounterStruct
2924
}
3025

26+
type labelsetCounterStruct struct {
27+
*sync.RWMutex
28+
labelsetSeriesMap map[uint64]*seriesCounterStruct
29+
}
30+
3131
type DiscardedSeriesTracker struct {
3232
*sync.RWMutex
3333
reasonUserMap map[string]*userCounterStruct
3434
discardedSeriesGauge *prometheus.GaugeVec
3535
}
3636

37+
type DiscardedSeriesPerLabelsetTracker struct {
38+
*sync.RWMutex
39+
userLabelsetMap map[string]*labelsetCounterStruct
40+
discardedSeriesPerLabelsetGauge *prometheus.GaugeVec
41+
}
42+
3743
func NewDiscardedSeriesTracker(discardedSeriesGauge *prometheus.GaugeVec) *DiscardedSeriesTracker {
3844
tracker := &DiscardedSeriesTracker{
3945
RWMutex: &sync.RWMutex{},
@@ -43,8 +49,7 @@ func NewDiscardedSeriesTracker(discardedSeriesGauge *prometheus.GaugeVec) *Disca
4349
return tracker
4450
}
4551

46-
func (t *DiscardedSeriesTracker) Track(reason string, user string, labels *labels.Labels) {
47-
series := labels.Hash()
52+
func (t *DiscardedSeriesTracker) Track(reason string, user string, series uint64) {
4853
t.RLock()
4954
userCounter, ok := t.reasonUserMap[reason]
5055
t.RUnlock()
@@ -70,33 +75,24 @@ func (t *DiscardedSeriesTracker) Track(reason string, user string, labels *label
7075
if !ok {
7176
seriesCounter = &seriesCounterStruct{
7277
RWMutex: &sync.RWMutex{},
73-
seriesCountMap: make(map[uint64]*labelCounterStruct),
78+
seriesCountMap: make(map[uint64]struct{}),
7479
}
7580
userCounter.userSeriesMap[user] = seriesCounter
7681
}
7782
userCounter.Unlock()
7883
}
7984

8085
seriesCounter.RLock()
81-
labelCounter, ok := seriesCounter.seriesCountMap[series]
86+
_, ok = seriesCounter.seriesCountMap[series]
8287
seriesCounter.RUnlock()
8388
if !ok {
8489
seriesCounter.Lock()
85-
labelCounter, ok = seriesCounter.seriesCountMap[series]
90+
_, ok = seriesCounter.seriesCountMap[series]
8691
if !ok {
87-
labelCounter = &labelCounterStruct{
88-
Labels: labels,
89-
RWMutex: &sync.RWMutex{},
90-
inCurrentCycle: true,
91-
}
92-
seriesCounter.seriesCountMap[series] = labelCounter
92+
seriesCounter.seriesCountMap[series] = struct{}{}
9393
}
9494
seriesCounter.Unlock()
9595
}
96-
97-
labelCounter.Lock()
98-
labelCounter.inCurrentCycle = true
99-
labelCounter.Unlock()
10096
}
10197

10298
func (t *DiscardedSeriesTracker) UpdateMetrics() {
@@ -106,31 +102,25 @@ func (t *DiscardedSeriesTracker) UpdateMetrics() {
106102
userCounter.RLock()
107103
for user, seriesCounter := range userCounter.userSeriesMap {
108104
seriesCounter.Lock()
109-
for hash, labelCounter := range seriesCounter.seriesCountMap {
110-
labelCounter.Lock()
111-
if labelCounter.inCurrentCycle {
112-
t.discardedSeriesGauge.WithLabelValues(reason, user, labelCounter.String()).Set(1.0)
113-
labelCounter.inCurrentCycle = false
114-
} else {
115-
t.discardedSeriesGauge.DeleteLabelValues(reason, user, labelCounter.String())
116-
delete(seriesCounter.seriesCountMap, hash)
117-
}
118-
labelCounter.Unlock()
119-
}
120-
if len(seriesCounter.seriesCountMap) == 0 {
105+
count := len(seriesCounter.seriesCountMap)
106+
t.discardedSeriesGauge.WithLabelValues(reason, user).Set(float64(count))
107+
clear(seriesCounter.seriesCountMap)
108+
if count == 0 {
121109
usersToDelete = append(usersToDelete, user)
122110
}
123111
seriesCounter.Unlock()
124112
}
125113
userCounter.RUnlock()
126-
userCounter.Lock()
127-
for _, user := range usersToDelete {
128-
_, ok := userCounter.userSeriesMap[user]
129-
if ok && userCounter.userSeriesMap[user].seriesCountMap != nil {
130-
delete(userCounter.userSeriesMap, user)
114+
if len(usersToDelete) > 0 {
115+
userCounter.Lock()
116+
for _, user := range usersToDelete {
117+
if _, ok := userCounter.userSeriesMap[user]; ok {
118+
t.discardedSeriesGauge.DeleteLabelValues(reason, user)
119+
delete(userCounter.userSeriesMap, user)
120+
}
131121
}
122+
userCounter.Unlock()
132123
}
133-
userCounter.Unlock()
134124
}
135125
t.RUnlock()
136126
}
@@ -144,17 +134,118 @@ func (t *DiscardedSeriesTracker) StartVendDiscardedSeriesMetricGoroutine() {
144134
}()
145135
}
146136

147-
// only used in testing
137+
func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker {
138+
tracker := &DiscardedSeriesPerLabelsetTracker{
139+
RWMutex: &sync.RWMutex{},
140+
userLabelsetMap: make(map[string]*labelsetCounterStruct),
141+
discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge,
142+
}
143+
return tracker
144+
}
145+
146+
func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) {
147+
t.RLock()
148+
labelsetCounter, ok := t.userLabelsetMap[user]
149+
t.RUnlock()
150+
if !ok {
151+
t.Lock()
152+
labelsetCounter, ok = t.userLabelsetMap[user]
153+
if !ok {
154+
labelsetCounter = &labelsetCounterStruct{
155+
RWMutex: &sync.RWMutex{},
156+
labelsetSeriesMap: make(map[uint64]*seriesCounterStruct),
157+
}
158+
t.userLabelsetMap[user] = labelsetCounter
159+
}
160+
t.Unlock()
161+
}
162+
163+
labelsetCounter.RLock()
164+
seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
165+
labelsetCounter.RUnlock()
166+
if !ok {
167+
labelsetCounter.Lock()
168+
seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
169+
if !ok {
170+
seriesCounter = &seriesCounterStruct{
171+
RWMutex: &sync.RWMutex{},
172+
seriesCountMap: make(map[uint64]struct{}),
173+
labelsetId: matchedLabelsetId,
174+
}
175+
labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter
176+
}
177+
labelsetCounter.Unlock()
178+
}
179+
180+
seriesCounter.RLock()
181+
_, ok = seriesCounter.seriesCountMap[series]
182+
seriesCounter.RUnlock()
183+
if !ok {
184+
seriesCounter.Lock()
185+
_, ok = seriesCounter.seriesCountMap[series]
186+
if !ok {
187+
seriesCounter.seriesCountMap[series] = struct{}{}
188+
}
189+
seriesCounter.Unlock()
190+
}
191+
}
192+
193+
func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() {
194+
usersToDelete := make([]string, 0)
195+
labelsetsToDelete := make([]uint64, 0)
196+
t.RLock()
197+
for user, labelsetCounter := range t.userLabelsetMap {
198+
labelsetCounter.RLock()
199+
if len(labelsetCounter.labelsetSeriesMap) == 0 {
200+
usersToDelete = append(usersToDelete, user)
201+
}
202+
for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap {
203+
seriesCounter.Lock()
204+
count := len(seriesCounter.seriesCountMap)
205+
t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count))
206+
clear(seriesCounter.seriesCountMap)
207+
if count == 0 {
208+
labelsetsToDelete = append(labelsetsToDelete, labelsetHash)
209+
}
210+
seriesCounter.Unlock()
211+
}
212+
labelsetCounter.RUnlock()
213+
if len(labelsetsToDelete) > 0 {
214+
labelsetCounter.Lock()
215+
for _, labelsetHash := range labelsetsToDelete {
216+
if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok {
217+
labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId
218+
t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId)
219+
delete(labelsetCounter.labelsetSeriesMap, labelsetHash)
220+
}
221+
}
222+
labelsetCounter.Unlock()
223+
}
224+
}
225+
t.RUnlock()
226+
if len(usersToDelete) > 0 {
227+
t.Lock()
228+
for _, user := range usersToDelete {
229+
delete(t.userLabelsetMap, user)
230+
}
231+
t.Unlock()
232+
}
233+
}
234+
235+
func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() {
236+
go func() {
237+
ticker := time.NewTicker(vendMetricsInterval)
238+
for range ticker.C {
239+
t.UpdateMetrics()
240+
}
241+
}()
242+
}
243+
148244
func (t *DiscardedSeriesTracker) getSeriesCount(reason string, user string) int {
149-
count := 0
150245
if userCounter, ok := t.reasonUserMap[reason]; ok {
151246
if seriesCounter, ok := userCounter.userSeriesMap[user]; ok {
152-
for _, label := range seriesCounter.seriesCountMap {
153-
if label.inCurrentCycle {
154-
count++
155-
}
156-
}
247+
return len(seriesCounter.seriesCountMap)
157248
}
158249
}
159-
return count
250+
return 0
160251
}

0 commit comments

Comments
 (0)