@@ -54,6 +54,7 @@ import (
54
54
"github.com/grafana/pyroscope/pkg/tenant"
55
55
"github.com/grafana/pyroscope/pkg/usagestats"
56
56
"github.com/grafana/pyroscope/pkg/util"
57
+ "github.com/grafana/pyroscope/pkg/util/spanlogger"
57
58
"github.com/grafana/pyroscope/pkg/validation"
58
59
)
59
60
@@ -327,6 +328,7 @@ func (d *Distributor) PushBatch(ctx context.Context, req *distributormodel.PushR
327
328
itErr := util .RecoverPanic (func () error {
328
329
return d .pushSeries (ctx , s , req .RawProfileType , tenantID )
329
330
})()
331
+
330
332
if itErr != nil {
331
333
itErr = fmt .Errorf ("push series with index %d and id %s failed: %w" , index , s .ID , itErr )
332
334
}
@@ -339,27 +341,93 @@ func (d *Distributor) PushBatch(ctx context.Context, req *distributormodel.PushR
339
341
return res .Err ()
340
342
}
341
343
344
+ type lazyUsageGroups func () []validation.UsageGroupMatchName
345
+
346
+ func (l lazyUsageGroups ) String () string {
347
+ groups := l ()
348
+ result := make ([]string , len (groups ))
349
+ for pos := range groups {
350
+ result [pos ] = groups [pos ].ResolvedName
351
+ }
352
+ return fmt .Sprintf ("%v" , result )
353
+ }
354
+
355
+ type pushLog struct {
356
+ fields []any
357
+ lvl func (log.Logger ) log.Logger
358
+ msg string
359
+ }
360
+
361
+ func newPushLog (capacity int ) * pushLog {
362
+ fields := make ([]any , 2 , (capacity + 1 )* 2 )
363
+ fields [0 ] = "msg"
364
+ return & pushLog {
365
+ fields : fields ,
366
+ }
367
+ }
368
+
369
+ func (p * pushLog ) addFields (fields ... any ) {
370
+ p .fields = append (p .fields , fields ... )
371
+ }
372
+
373
+ func (p * pushLog ) log (logger log.Logger , err error ) {
374
+ // determine log level
375
+ if p .lvl == nil {
376
+ if err != nil {
377
+ p .lvl = level .Warn
378
+ } else {
379
+ p .lvl = level .Debug
380
+ }
381
+ }
382
+
383
+ if err != nil {
384
+ p .addFields ("err" , err )
385
+ }
386
+
387
+ // update message
388
+ if p .msg == "" {
389
+ if err != nil {
390
+ p .msg = "profile rejected"
391
+ } else {
392
+ p .msg = "profile accepted"
393
+ }
394
+ }
395
+ p .fields [1 ] = p .msg
396
+ p .lvl (logger ).Log (p .fields ... )
397
+ }
398
+
342
399
func (d * Distributor ) pushSeries (ctx context.Context , req * distributormodel.ProfileSeries , origin distributormodel.RawProfileType , tenantID string ) (err error ) {
343
400
if req .Profile == nil {
344
401
return noNewProfilesReceivedError ()
345
402
}
346
403
now := model .Now ()
347
404
348
- logger := log .With (d .logger , "tenant" , tenantID )
405
+ logger := spanlogger .FromContext (ctx , log .With (d .logger , "tenant" , tenantID ))
406
+ finalLog := newPushLog (10 )
407
+ defer func () {
408
+ finalLog .log (logger , err )
409
+ }()
349
410
350
411
req .TenantID = tenantID
351
412
serviceName := phlaremodel .Labels (req .Labels ).Get (phlaremodel .LabelNameServiceName )
352
413
if serviceName == "" {
353
414
req .Labels = append (req .Labels , & typesv1.LabelPair {Name : phlaremodel .LabelNameServiceName , Value : phlaremodel .AttrServiceNameFallback })
415
+ } else {
416
+ finalLog .addFields ("service_name" , serviceName )
354
417
}
355
418
sort .Sort (phlaremodel .Labels (req .Labels ))
356
419
420
+ if req .ID != "" {
421
+ finalLog .addFields ("profile_id" , req .ID )
422
+ }
423
+
357
424
req .TotalProfiles = 1
358
425
req .TotalBytesUncompressed = calculateRequestSize (req )
359
426
d .metrics .observeProfileSize (tenantID , StageReceived , req .TotalBytesUncompressed )
360
427
361
428
if err := d .checkIngestLimit (req ); err != nil {
362
- level .Debug (logger ).Log ("msg" , "rejecting push request due to global ingest limit" , "tenant" , tenantID )
429
+ finalLog .msg = "rejecting profile due to global ingest limit"
430
+ finalLog .lvl = level .Debug
363
431
validation .DiscardedProfiles .WithLabelValues (string (validation .IngestLimitReached ), tenantID ).Add (float64 (req .TotalProfiles ))
364
432
validation .DiscardedBytes .WithLabelValues (string (validation .IngestLimitReached ), tenantID ).Add (float64 (req .TotalBytesUncompressed ))
365
433
return err
@@ -372,10 +440,13 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
372
440
usageGroups := d .limits .DistributorUsageGroups (tenantID )
373
441
374
442
profName := phlaremodel .Labels (req .Labels ).Get (ProfileName )
443
+ finalLog .addFields ("profile_type" , profName )
375
444
376
445
groups := d .usageGroupEvaluator .GetMatch (tenantID , usageGroups , req .Labels )
446
+ finalLog .addFields ("matched_usage_groups" , lazyUsageGroups (groups .Names ))
377
447
if err := d .checkUsageGroupsIngestLimit (req , groups .Names ()); err != nil {
378
- level .Debug (logger ).Log ("msg" , "rejecting push request due to usage group ingest limit" , "tenant" , tenantID )
448
+ finalLog .msg = "rejecting profile due to usage group ingest limit"
449
+ finalLog .lvl = level .Debug
379
450
validation .DiscardedProfiles .WithLabelValues (string (validation .IngestLimitReached ), tenantID ).Add (float64 (req .TotalProfiles ))
380
451
validation .DiscardedBytes .WithLabelValues (string (validation .IngestLimitReached ), tenantID ).Add (float64 (req .TotalBytesUncompressed ))
381
452
groups .CountDiscardedBytes (string (validation .IngestLimitReached ), req .TotalBytesUncompressed )
@@ -384,12 +455,11 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
384
455
385
456
willSample , samplingSource := d .shouldSample (tenantID , groups .Names ())
386
457
if ! willSample {
387
- level .Debug (logger ).Log (
388
- "msg" , "skipping push request due to sampling" ,
389
- "tenant" , tenantID ,
458
+ finalLog .addFields (
390
459
"usage_group" , samplingSource .UsageGroup ,
391
460
"probability" , samplingSource .Probability ,
392
461
)
462
+ finalLog .msg = "skipping profile due to sampling"
393
463
validation .DiscardedProfiles .WithLabelValues (string (validation .SkippedBySamplingRules ), tenantID ).Add (float64 (req .TotalProfiles ))
394
464
validation .DiscardedBytes .WithLabelValues (string (validation .SkippedBySamplingRules ), tenantID ).Add (float64 (req .TotalBytesUncompressed ))
395
465
groups .CountDiscardedBytes (string (validation .SkippedBySamplingRules ), req .TotalBytesUncompressed )
@@ -402,6 +472,9 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
402
472
}
403
473
404
474
profLanguage := d .GetProfileLanguage (req )
475
+ if profLanguage != "" {
476
+ finalLog .addFields ("detected_language" , profLanguage )
477
+ }
405
478
406
479
usagestats .NewCounter (fmt .Sprintf ("distributor_profile_type_%s_received" , profName )).Inc (1 )
407
480
d .profileReceivedStats .Inc (1 , profLanguage )
@@ -410,16 +483,23 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
410
483
}
411
484
p := req .Profile
412
485
decompressedSize := p .SizeVT ()
413
- d .metrics .observeProfileSize (tenantID , StageSampled , int64 (decompressedSize )) //todo use req.TotalBytesUncompressed to include labels size
486
+ profTime := model .TimeFromUnixNano (p .TimeNanos ).Time ()
487
+ finalLog .addFields (
488
+ "profile_time" , profTime ,
489
+ "ingestion_delay" , now .Time ().Sub (profTime ),
490
+ "decompressed_size" , decompressedSize ,
491
+ "sample_count" , len (p .Sample ),
492
+ )
493
+ d .metrics .observeProfileSize (tenantID , StageSampled , int64 (decompressedSize )) //todo use req.TotalBytesUncompressed to include labels siz
414
494
d .metrics .receivedDecompressedBytes .WithLabelValues (profName , tenantID ).Observe (float64 (decompressedSize )) // deprecated TODO remove
415
495
d .metrics .receivedSamples .WithLabelValues (profName , tenantID ).Observe (float64 (len (p .Sample )))
416
496
d .profileSizeStats .Record (float64 (decompressedSize ), profLanguage )
417
497
groups .CountReceivedBytes (profName , int64 (decompressedSize ))
418
498
419
499
validated , err := validation .ValidateProfile (d .limits , tenantID , p , decompressedSize , req .Labels , now )
420
500
if err != nil {
421
- _ = level .Debug (logger ).Log ("msg" , "invalid profile" , "err" , err )
422
501
reason := string (validation .ReasonOf (err ))
502
+ finalLog .addFields ("reason" , reason )
423
503
validation .DiscardedProfiles .WithLabelValues (reason , tenantID ).Add (float64 (req .TotalProfiles ))
424
504
validation .DiscardedBytes .WithLabelValues (reason , tenantID ).Add (float64 (req .TotalBytesUncompressed ))
425
505
groups .CountDiscardedBytes (reason , req .TotalBytesUncompressed )
0 commit comments