Skip to content

Commit c4cfec4

Browse files
committed
add source(API vs ruler) to requests and propagate through services. replace request source implementation on qfe when ruler calls qfe to unify experience across services
Signed-off-by: Erlan Zholdubai uulu <[email protected]>
1 parent 3d92801 commit c4cfec4

File tree

14 files changed

+96
-61
lines changed

14 files changed

+96
-61
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
* [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895
7474
* [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950
7575
* [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976
76+
* [ENHANCEMENT] Add source metadata to requests(api vs ruler) and used in resource based throttling to only reject adhoc queries. #6947
7677
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
7778
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
7879
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/api/middlewares.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (h HTTPHeaderMiddleware) injectRequestContext(r *http.Request) *http.Reques
3737
reqId = uuid.NewString()
3838
}
3939
requestContextMap[requestmeta.RequestIdKey] = reqId
40+
requestContextMap[requestmeta.RequestSourceKey] = requestmeta.SourceAPI
4041

4142
ctx := requestmeta.ContextWithRequestMetadataMap(r.Context(), requestContextMap)
4243
return r.WithContext(ctx)

pkg/frontend/transport/handler.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
util_api "github.com/cortexproject/cortex/pkg/util/api"
3333
"github.com/cortexproject/cortex/pkg/util/limiter"
3434
util_log "github.com/cortexproject/cortex/pkg/util/log"
35+
"github.com/cortexproject/cortex/pkg/util/requestmeta"
3536
)
3637

3738
const (
@@ -247,11 +248,11 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
247248
}
248249

249250
userID := tenant.JoinTenantIDs(tenantIDs)
251+
source := tripperware.GetSource(r)
250252

251253
if f.tenantFederationCfg.Enabled {
252254
maxTenant := f.tenantFederationCfg.MaxTenant
253255
if maxTenant > 0 && len(tenantIDs) > maxTenant {
254-
source := tripperware.GetSource(r.Header.Get("User-Agent"))
255256
if f.cfg.QueryStatsEnabled {
256257
f.rejectedQueries.WithLabelValues(reasonTooManyTenants, source, userID).Inc()
257258
}
@@ -291,15 +292,13 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
291292
}
292293
http.Error(w, err.Error(), statusCode)
293294
if f.cfg.QueryStatsEnabled && util.IsRequestBodyTooLarge(err) {
294-
source := tripperware.GetSource(r.Header.Get("User-Agent"))
295295
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, source, userID).Inc()
296296
}
297297
return
298298
}
299299
r.Body = io.NopCloser(&buf)
300300
}
301301

302-
source := tripperware.GetSource(r.Header.Get("User-Agent"))
303302
// Log request
304303
if f.cfg.QueryStatsEnabled {
305304
queryString = f.parseRequestQueryString(r, buf)
@@ -411,7 +410,7 @@ func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values, sourc
411410
logMessage = append(logMessage, "accept_encoding", acceptEncoding)
412411
}
413412

414-
shouldLog := source == tripperware.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == tripperware.SourceRuler)
413+
shouldLog := source == requestmeta.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == requestmeta.SourceRuler)
415414
if shouldLog {
416415
logMessage = append(logMessage, formatQueryString(queryString)...)
417416
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
@@ -547,7 +546,7 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query
547546
}
548547
}
549548

550-
shouldLog := source == tripperware.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == tripperware.SourceRuler)
549+
shouldLog := source == requestmeta.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == requestmeta.SourceRuler)
551550
if shouldLog {
552551
logMessage = append(logMessage, formatQueryString(queryString)...)
553552
if error != nil {

pkg/frontend/transport/handler_test.go

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ import (
2828
"github.com/cortexproject/cortex/pkg/querier"
2929
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
3030
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
31-
"github.com/cortexproject/cortex/pkg/querier/tripperware"
3231
"github.com/cortexproject/cortex/pkg/tenant"
3332
util_api "github.com/cortexproject/cortex/pkg/util/api"
3433
"github.com/cortexproject/cortex/pkg/util/limiter"
3534
util_log "github.com/cortexproject/cortex/pkg/util/log"
35+
"github.com/cortexproject/cortex/pkg/util/requestmeta"
3636
)
3737

3838
type roundTripperFunc func(*http.Request) (*http.Response, error)
@@ -218,7 +218,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
218218
}, nil
219219
}),
220220
additionalMetricsCheckFunc: func(h *Handler) {
221-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, tripperware.SourceAPI, userID))
221+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, requestmeta.SourceAPI, userID))
222222
assert.Equal(t, float64(1), v)
223223
},
224224
expectedStatusCode: http.StatusRequestEntityTooLarge,
@@ -234,7 +234,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
234234
}, nil
235235
}),
236236
additionalMetricsCheckFunc: func(h *Handler) {
237-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, tripperware.SourceAPI, userID))
237+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, requestmeta.SourceAPI, userID))
238238
assert.Equal(t, float64(1), v)
239239
},
240240
expectedStatusCode: http.StatusTooManyRequests,
@@ -250,7 +250,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
250250
}, nil
251251
}),
252252
additionalMetricsCheckFunc: func(h *Handler) {
253-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, tripperware.SourceAPI, userID))
253+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, requestmeta.SourceAPI, userID))
254254
assert.Equal(t, float64(1), v)
255255
},
256256
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -266,7 +266,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
266266
}, nil
267267
}),
268268
additionalMetricsCheckFunc: func(h *Handler) {
269-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, tripperware.SourceAPI, userID))
269+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, requestmeta.SourceAPI, userID))
270270
assert.Equal(t, float64(1), v)
271271
},
272272
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -282,7 +282,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
282282
}, nil
283283
}),
284284
additionalMetricsCheckFunc: func(h *Handler) {
285-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, tripperware.SourceAPI, userID))
285+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, requestmeta.SourceAPI, userID))
286286
assert.Equal(t, float64(1), v)
287287
},
288288
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -298,7 +298,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
298298
}, nil
299299
}),
300300
additionalMetricsCheckFunc: func(h *Handler) {
301-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, tripperware.SourceAPI, userID))
301+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, requestmeta.SourceAPI, userID))
302302
assert.Equal(t, float64(1), v)
303303
},
304304
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -314,7 +314,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
314314
}, nil
315315
}),
316316
additionalMetricsCheckFunc: func(h *Handler) {
317-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, tripperware.SourceAPI, userID))
317+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, requestmeta.SourceAPI, userID))
318318
assert.Equal(t, float64(1), v)
319319
},
320320
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -330,7 +330,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
330330
}, nil
331331
}),
332332
additionalMetricsCheckFunc: func(h *Handler) {
333-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, tripperware.SourceAPI, userID))
333+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, requestmeta.SourceAPI, userID))
334334
assert.Equal(t, float64(1), v)
335335
},
336336
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -346,7 +346,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
346346
}, nil
347347
}),
348348
additionalMetricsCheckFunc: func(h *Handler) {
349-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, tripperware.SourceAPI, userID))
349+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, requestmeta.SourceAPI, userID))
350350
assert.Equal(t, float64(1), v)
351351
},
352352
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -362,7 +362,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
362362
}, nil
363363
}),
364364
additionalMetricsCheckFunc: func(h *Handler) {
365-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, tripperware.SourceAPI, userID))
365+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, requestmeta.SourceAPI, userID))
366366
assert.Equal(t, float64(1), v)
367367
},
368368
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -378,7 +378,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
378378
}, nil
379379
}),
380380
additionalMetricsCheckFunc: func(h *Handler) {
381-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, tripperware.SourceAPI, userID))
381+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, requestmeta.SourceAPI, userID))
382382
assert.Equal(t, float64(1), v)
383383
},
384384
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -395,7 +395,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
395395
}, nil
396396
}),
397397
additionalMetricsCheckFunc: func(h *Handler) {
398-
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResourceExhausted, tripperware.SourceAPI, userID))
398+
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResourceExhausted, requestmeta.SourceAPI, userID))
399399
assert.Equal(t, float64(1), v)
400400
},
401401
expectedStatusCode: http.StatusUnprocessableEntity,
@@ -412,7 +412,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
412412
}, nil
413413
}),
414414
additionalMetricsCheckFunc: func(h *Handler) {
415-
v := promtest.ToFloat64(h.slowQueries.WithLabelValues(tripperware.SourceAPI, userID))
415+
v := promtest.ToFloat64(h.slowQueries.WithLabelValues(requestmeta.SourceAPI, userID))
416416
assert.Equal(t, float64(1), v)
417417
},
418418
expectedStatusCode: http.StatusOK,
@@ -474,12 +474,12 @@ func TestReportQueryStatsFormat(t *testing.T) {
474474
tests := map[string]testCase{
475475
"should not include query and header details if empty": {
476476
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0`,
477-
source: tripperware.SourceAPI,
477+
source: requestmeta.SourceAPI,
478478
},
479479
"should include query length and string at the end": {
480480
queryString: url.Values(map[string][]string{"query": {"up"}}),
481481
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 param_query=up`,
482-
source: tripperware.SourceAPI,
482+
source: requestmeta.SourceAPI,
483483
},
484484
"should include query stats": {
485485
queryStats: &querier_stats.QueryStats{
@@ -496,27 +496,27 @@ func TestReportQueryStatsFormat(t *testing.T) {
496496
},
497497
},
498498
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 query_storage_wall_time_seconds=6000`,
499-
source: tripperware.SourceAPI,
499+
source: requestmeta.SourceAPI,
500500
},
501501
"should include user agent": {
502502
header: http.Header{"User-Agent": []string{"Grafana"}},
503503
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 user_agent=Grafana`,
504-
source: tripperware.SourceAPI,
504+
source: requestmeta.SourceAPI,
505505
},
506506
"should include engine type": {
507507
header: http.Header{http.CanonicalHeaderKey(engine.TypeHeader): []string{string(engine.Thanos)}},
508508
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 engine_type=thanos`,
509-
source: tripperware.SourceAPI,
509+
source: requestmeta.SourceAPI,
510510
},
511511
"should include block store type": {
512512
header: http.Header{http.CanonicalHeaderKey(querier.BlockStoreTypeHeader): []string{"parquet"}},
513513
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 block_store_type=parquet`,
514-
source: tripperware.SourceAPI,
514+
source: requestmeta.SourceAPI,
515515
},
516516
"should include response error": {
517517
responseErr: errors.New("foo_err"),
518518
expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 error=foo_err`,
519-
source: tripperware.SourceAPI,
519+
source: requestmeta.SourceAPI,
520520
},
521521
"should include query priority": {
522522
queryString: url.Values(map[string][]string{"query": {"up"}}),
@@ -525,7 +525,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
525525
PriorityAssigned: true,
526526
},
527527
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 priority=99 param_query=up`,
528-
source: tripperware.SourceAPI,
528+
source: requestmeta.SourceAPI,
529529
},
530530
"should include data fetch min and max time": {
531531
queryString: url.Values(map[string][]string{"query": {"up"}}),
@@ -534,7 +534,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
534534
DataSelectMinTime: 1704067200000,
535535
},
536536
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
537-
source: tripperware.SourceAPI,
537+
source: requestmeta.SourceAPI,
538538
},
539539
"should include query stats with store gateway stats": {
540540
queryStats: &querier_stats.QueryStats{
@@ -553,16 +553,16 @@ func TestReportQueryStatsFormat(t *testing.T) {
553553
},
554554
},
555555
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`,
556-
source: tripperware.SourceAPI,
556+
source: requestmeta.SourceAPI,
557557
},
558558
"should not report a log": {
559559
expectedLog: ``,
560-
source: tripperware.SourceRuler,
560+
source: requestmeta.SourceRuler,
561561
enabledRulerQueryStatsLog: false,
562562
},
563563
"should report a log": {
564564
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0`,
565-
source: tripperware.SourceRuler,
565+
source: requestmeta.SourceRuler,
566566
enabledRulerQueryStatsLog: true,
567567
},
568568
}
@@ -571,6 +571,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
571571
t.Run(testName, func(t *testing.T) {
572572
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, EnabledRulerQueryStatsLog: testData.enabledRulerQueryStatsLog}, tenantfederation.Config{}, http.DefaultTransport, logger, nil)
573573
req.Header = testData.header
574+
req = req.WithContext(requestmeta.ContextWithRequestSource(context.Background(), testData.source))
574575
handler.reportQueryStats(req, testData.source, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
575576
data, err := io.ReadAll(outputBuf)
576577
require.NoError(t, err)
@@ -718,7 +719,7 @@ func Test_TenantFederation_MaxTenant(t *testing.T) {
718719
require.Contains(t, string(body), test.expectedErrMsg)
719720

720721
if strings.Contains(test.expectedErrMsg, "too many tenants") {
721-
v := promtest.ToFloat64(handler.rejectedQueries.WithLabelValues(reasonTooManyTenants, tripperware.SourceAPI, test.orgId))
722+
v := promtest.ToFloat64(handler.rejectedQueries.WithLabelValues(reasonTooManyTenants, requestmeta.SourceAPI, test.orgId))
722723
assert.Equal(t, float64(1), v)
723724
}
724725
}

0 commit comments

Comments
 (0)