Skip to content

Commit 70ae054

Browse files
committed
Add per labelset tests
Signed-off-by: Essam Eldaly <[email protected]>
1 parent 7f40b8c commit 70ae054

File tree

4 files changed

+261
-121
lines changed

4 files changed

+261
-121
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package discardedseries
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
9+
10+
const (
11+
perLabelsetSeriesLimit = "per_labelset_series_limit"
12+
)
13+
14+
type labelsetCounterStruct struct {
15+
*sync.RWMutex
16+
labelsetSeriesMap map[uint64]*seriesCounterStruct
17+
}
18+
19+
type DiscardedSeriesPerLabelsetTracker struct {
20+
*sync.RWMutex
21+
userLabelsetMap map[string]*labelsetCounterStruct
22+
discardedSeriesPerLabelsetGauge *prometheus.GaugeVec
23+
}
24+
25+
func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker {
26+
tracker := &DiscardedSeriesPerLabelsetTracker{
27+
RWMutex: &sync.RWMutex{},
28+
userLabelsetMap: make(map[string]*labelsetCounterStruct),
29+
discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge,
30+
}
31+
return tracker
32+
}
33+
34+
func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) {
35+
t.RLock()
36+
labelsetCounter, ok := t.userLabelsetMap[user]
37+
t.RUnlock()
38+
if !ok {
39+
t.Lock()
40+
labelsetCounter, ok = t.userLabelsetMap[user]
41+
if !ok {
42+
labelsetCounter = &labelsetCounterStruct{
43+
RWMutex: &sync.RWMutex{},
44+
labelsetSeriesMap: make(map[uint64]*seriesCounterStruct),
45+
}
46+
t.userLabelsetMap[user] = labelsetCounter
47+
}
48+
t.Unlock()
49+
}
50+
51+
labelsetCounter.RLock()
52+
seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
53+
labelsetCounter.RUnlock()
54+
if !ok {
55+
labelsetCounter.Lock()
56+
seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
57+
if !ok {
58+
seriesCounter = &seriesCounterStruct{
59+
RWMutex: &sync.RWMutex{},
60+
seriesCountMap: make(map[uint64]struct{}),
61+
labelsetId: matchedLabelsetId,
62+
}
63+
labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter
64+
}
65+
labelsetCounter.Unlock()
66+
}
67+
68+
seriesCounter.RLock()
69+
_, ok = seriesCounter.seriesCountMap[series]
70+
seriesCounter.RUnlock()
71+
if !ok {
72+
seriesCounter.Lock()
73+
_, ok = seriesCounter.seriesCountMap[series]
74+
if !ok {
75+
seriesCounter.seriesCountMap[series] = struct{}{}
76+
}
77+
seriesCounter.Unlock()
78+
}
79+
}
80+
81+
func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() {
82+
usersToDelete := make([]string, 0)
83+
labelsetsToDelete := make([]uint64, 0)
84+
t.RLock()
85+
for user, labelsetCounter := range t.userLabelsetMap {
86+
labelsetCounter.RLock()
87+
if len(labelsetCounter.labelsetSeriesMap) == 0 {
88+
usersToDelete = append(usersToDelete, user)
89+
}
90+
for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap {
91+
seriesCounter.Lock()
92+
count := len(seriesCounter.seriesCountMap)
93+
t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count))
94+
clear(seriesCounter.seriesCountMap)
95+
if count == 0 {
96+
labelsetsToDelete = append(labelsetsToDelete, labelsetHash)
97+
}
98+
seriesCounter.Unlock()
99+
}
100+
labelsetCounter.RUnlock()
101+
if len(labelsetsToDelete) > 0 {
102+
labelsetCounter.Lock()
103+
for _, labelsetHash := range labelsetsToDelete {
104+
if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok {
105+
labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId
106+
t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId)
107+
delete(labelsetCounter.labelsetSeriesMap, labelsetHash)
108+
}
109+
}
110+
labelsetCounter.Unlock()
111+
}
112+
}
113+
t.RUnlock()
114+
if len(usersToDelete) > 0 {
115+
t.Lock()
116+
for _, user := range usersToDelete {
117+
delete(t.userLabelsetMap, user)
118+
}
119+
t.Unlock()
120+
}
121+
}
122+
123+
func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() {
124+
go func() {
125+
ticker := time.NewTicker(vendMetricsInterval)
126+
for range ticker.C {
127+
t.UpdateMetrics()
128+
}
129+
}()
130+
}
131+
132+
// only used in testing
133+
func (t *DiscardedSeriesPerLabelsetTracker) getSeriesCount(user string, labelsetLimitHash uint64) int {
134+
if labelsetCounter, ok := t.userLabelsetMap[user]; ok {
135+
if seriesCounter, ok := labelsetCounter.labelsetSeriesMap[labelsetLimitHash]; ok {
136+
return len(seriesCounter.seriesCountMap)
137+
}
138+
}
139+
return 0
140+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package discardedseries
2+
3+
import (
4+
"testing"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/testutil"
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestPerLabelsetDiscardedSeriesTracker(t *testing.T) {
13+
gauge := prometheus.NewGaugeVec(
14+
prometheus.GaugeOpts{
15+
Name: "cortex_discarded_series_per_labelset",
16+
Help: "The number of series that include discarded samples for each labelset.",
17+
},
18+
[]string{"reason", "user", "labelset"},
19+
)
20+
21+
tracker := NewDiscardedSeriesPerLabelsetTracker(gauge)
22+
user1 := "user1"
23+
user2 := "user2"
24+
series1 := labels.FromStrings("__name__", "1")
25+
series2 := labels.FromStrings("__name__", "2")
26+
labelset1 := uint64(10)
27+
labelset2 := uint64(20)
28+
labelset3 := uint64(30)
29+
labelsetId1 := "ten"
30+
labelsetId2 := "twenty"
31+
labelsetId3 := "thirty"
32+
33+
tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
34+
tracker.Track(user1, series1.Hash(), labelset2, labelsetId2)
35+
36+
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
37+
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
38+
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
39+
tracker.Track(user2, series2.Hash(), labelset1, labelsetId1)
40+
41+
require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1)
42+
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 1)
43+
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)
44+
45+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0)
46+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
47+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)
48+
49+
require.Equal(t, tracker.getSeriesCount(user2, labelset1), 2)
50+
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
51+
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)
52+
53+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
54+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
55+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
56+
57+
tracker.UpdateMetrics()
58+
59+
tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
60+
tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
61+
62+
require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1)
63+
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
64+
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)
65+
66+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1)
67+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 1)
68+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)
69+
70+
require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
71+
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
72+
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)
73+
74+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 2)
75+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
76+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
77+
78+
tracker.UpdateMetrics()
79+
80+
require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0)
81+
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
82+
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)
83+
84+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1)
85+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
86+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)
87+
88+
require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
89+
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
90+
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)
91+
92+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
93+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
94+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
95+
96+
tracker.UpdateMetrics()
97+
98+
require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0)
99+
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
100+
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)
101+
102+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0)
103+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
104+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)
105+
106+
require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
107+
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
108+
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)
109+
110+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
111+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
112+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
113+
}
114+
115+
func comparePerLabelsetSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, user string, labelsetLimitId string, val int) {
116+
gauge, _ := gaugeVec.GetMetricWithLabelValues("per_labelset_series_limit", user, labelsetLimitId)
117+
require.Equal(t, testutil.ToFloat64(gauge), float64(val))
118+
}

0 commit comments

Comments
 (0)