From ffbe9390d08f89849bd400214b6681ee452d5ca5 Mon Sep 17 00:00:00 2001 From: "v.kavlakan" Date: Wed, 11 Jun 2025 10:52:28 +0200 Subject: [PATCH 1/2] catalog v3 --- cmd/prometheus/main.go | 16 +++- pp/go/relabeler/head/catalog/catalog.go | 79 +++++++++++++--- pp/go/relabeler/head/catalog/gc.go | 93 ++++++++++++++----- pp/go/relabeler/head/catalog/record.go | 19 ++-- pp/go/relabeler/head/head.go | 16 +++- pp/go/relabeler/head/load.go | 17 ++-- pp/go/relabeler/head/load_test.go | 10 +- pp/go/relabeler/head/manager/manager.go | 28 +++--- .../head/manager/time_bound_calculator.go | 51 ++++++++++ pp/go/relabeler/head/wal_writer.go | 6 +- pp/go/relabeler/remotewriter/datasource.go | 8 +- pp/go/relabeler/remotewriter/writeloop.go | 8 +- 12 files changed, 263 insertions(+), 88 deletions(-) create mode 100644 pp/go/relabeler/head/manager/time_bound_calculator.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 99ae9779b5..f38ad1ca59 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/prometheus/pp/go/relabeler/head/manager" "math" "math/bits" "net" @@ -692,7 +693,7 @@ func main() { os.Exit(1) } - fileLog, err := catalog.NewFileLogV2(filepath.Join(dataDir, "head.log")) + fileLog, err := catalog.NewFileLogV3(filepath.Join(dataDir, "head.log")) if err != nil { level.Error(logger).Log("msg", "failed to create file log", "err", err) os.Exit(1) @@ -1090,7 +1091,18 @@ func main() { } multiNotifiable := ready.New().With(receiverReadyNotifier).With(remoteWriterReadyNotifier).Build() - opGC := catalog.NewGC(dataDir, headCatalog, multiNotifiable) + opGC := catalog.NewGC( + clock, + dataDir, + headCatalog, + multiNotifiable, + time.Duration(cfg.tsdb.RetentionDuration), + manager.NewTimeBoundCalculator( + dataDir, + headCatalog, + prometheus.DefaultRegisterer, + ), + ) var g run.Group { diff --git a/pp/go/relabeler/head/catalog/catalog.go b/pp/go/relabeler/head/catalog/catalog.go index 204658acd6..24e0439f95 100644 --- a/pp/go/relabeler/head/catalog/catalog.go +++ b/pp/go/relabeler/head/catalog/catalog.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "math" "sort" "sync" @@ -41,7 +42,7 @@ type Catalog struct { log Log idGenerator IDGenerator records map[string]*Record - maxLogFileSize int + maxLogFileSize int corruptedHead prometheus.Counter activeHeadCreatedAt prometheus.Gauge } @@ -112,6 +113,8 @@ func (c *Catalog) Create(numberOfShards uint16) (r *Record, err error) { updatedAt: now, deletedAt: 0, referenceCount: 0, + mint: math.MaxInt64, + maxt: math.MaxInt64, status: StatusNew, } @@ -143,35 +146,89 @@ func (c *Catalog) SetStatus(id string, status Status) (_ *Record, err error) { return nil, fmt.Errorf("compact: %w", err) } - r, ok := c.records[id] + record, ok := c.records[id] + if !ok { + return nil, fmt.Errorf("not found: %s", id) + } + + return c.setStatus(record, status, record.mint, record.maxt) +} + +func (c *Catalog) SetStatusWithTimeBounds(id string, status Status, mint, maxt int64) (record *Record, err error) { + c.mtx.Lock() + defer c.mtx.Unlock() + + if err = c.compactIfNeeded(); err != nil { + return nil, fmt.Errorf("compact: %w", err) + } + + record, ok := c.records[id] if !ok { return nil, fmt.Errorf("not found: %s", id) } - if r.status == status { + return c.setStatus(record, status, mint, maxt) +} + +func (c *Catalog) setStatus(record *Record, status Status, mint, maxt int64) (_ *Record, err error) { + if record.status == status && record.mint == mint && record.maxt == maxt { if status == StatusActive { - c.activeHeadCreatedAt.Set(float64(r.createdAt)) + c.activeHeadCreatedAt.Set(float64(record.createdAt)) } - return r, nil + return record, nil } - changed := createRecordCopy(r) + changed := createRecordCopy(record) changed.status = status + changed.mint = mint + changed.maxt = maxt changed.updatedAt = c.clock.Now().UnixMilli() if err = c.log.Write(changed); err != nil { - return r, fmt.Errorf("log write: %w", err) + return record, fmt.Errorf("log write: %w", err) } - applyRecordChanges(r, changed) - c.records[id] = r + applyRecordChanges(record, changed) + c.records[record.id.String()] = record if status == StatusActive { - c.activeHeadCreatedAt.Set(float64(r.createdAt)) + c.activeHeadCreatedAt.Set(float64(record.createdAt)) } - return r, nil + return record, nil +} + +func (c *Catalog) SetTimeBounds(id string, mint, maxt int64) (_ *Record, err error) { + c.mtx.Lock() + defer c.mtx.Unlock() + + if err = c.compactIfNeeded(); err != nil { + return nil, fmt.Errorf("compact: %w", err) + } + + record, ok := c.records[id] + if !ok { + return nil, fmt.Errorf("not found: %s", id) + } + + if record.mint == mint && record.maxt == maxt { + return record, nil + } + + changed := createRecordCopy(record) + changed.mint = mint + changed.maxt = maxt + changed.updatedAt = c.clock.Now().UnixMilli() + + if err = c.log.Write(changed); err != nil { + return record, fmt.Errorf("log write: %w", err) + } + + applyRecordChanges(record, changed) + c.records[record.id.String()] = record + + return record, nil } func (c *Catalog) SetCorrupted(id string) (_ *Record, err error) { diff --git a/pp/go/relabeler/head/catalog/gc.go b/pp/go/relabeler/head/catalog/gc.go index 5b5e89b2f4..1029c05dd9 100644 --- a/pp/go/relabeler/head/catalog/gc.go +++ b/pp/go/relabeler/head/catalog/gc.go @@ -3,6 +3,8 @@ package catalog import ( "context" "errors" + "fmt" + "github.com/jonboulle/clockwork" "os" "path/filepath" "time" @@ -11,21 +13,31 @@ import ( "github.com/prometheus/prometheus/pp/go/relabeler/logger" ) +type TimeBoundCalculator interface { + CalculateTimeBounds(ctx context.Context, headRecord *Record) (mint int64, maxt int64, err error) +} + type GC struct { - dataDir string - catalog *Catalog - readyNotifiable ready.Notifiable - stop chan struct{} - stopped chan struct{} + clock clockwork.Clock + dataDir string + catalog *Catalog + readyNotifiable ready.Notifiable + retentionDuration time.Duration + timeBoundCalculator TimeBoundCalculator + stop chan struct{} + stopped chan struct{} } -func NewGC(dataDir string, catalog *Catalog, readyNotifiable ready.Notifiable) *GC { +func NewGC(clock clockwork.Clock, dataDir string, catalog *Catalog, readyNotifiable ready.Notifiable, retentionDuration time.Duration, timeBoundCalculator TimeBoundCalculator) *GC { return &GC{ - dataDir: dataDir, - catalog: catalog, - readyNotifiable: readyNotifiable, - stop: make(chan struct{}), - stopped: make(chan struct{}), + clock: clock, + dataDir: dataDir, + catalog: catalog, + readyNotifiable: readyNotifiable, + retentionDuration: retentionDuration, + timeBoundCalculator: timeBoundCalculator, + stop: make(chan struct{}), + stopped: make(chan struct{}), } } @@ -44,14 +56,14 @@ func (gc *GC) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-time.After(time.Minute): - gc.Iterate() + gc.Iterate(ctx) case <-gc.stop: return errors.New("stopped") } } } -func (gc *GC) Iterate() { +func (gc *GC) Iterate(ctx context.Context) { logger.Debugf("catalog gc iteration: head started") defer func() { logger.Debugf("catalog gc iteration: head ended") @@ -66,36 +78,71 @@ func (gc *GC) Iterate() { }, ) if err != nil { - logger.Debugf("catalog gc failed", err) + logger.Debugf("catalog gc failed: %v", err) return } + var retentionDurationIsExceeded bool + var headTimeBoundsIsCalculated bool for _, record := range records { if record.deletedAt != 0 { continue } + logger.Debugf("catalog gc iteration: head: %s", record.ID()) if record.ReferenceCount() > 0 { return } - if record.Corrupted() { - logger.Debugf("catalog gc iteration: head: %s: %s", record.ID(), "corrupted") - continue + retentionDurationIsExceeded, headTimeBoundsIsCalculated, err = gc.retentionDurationIsExceeded(ctx, record) + if err != nil { + logger.Errorf("calculate retention duration excess: %v", err) + return } - if err = os.RemoveAll(filepath.Join(gc.dataDir, record.Dir())); err != nil { - logger.Errorf("failed to remote head dir: %w", err) - return + if retentionDurationIsExceeded || (record.status == StatusPersisted && !record.Corrupted()) { + if err = gc.deleteRecord(ctx, record); err != nil { + logger.Errorf("delete record: %v", err) + return + } } - if err = gc.catalog.Delete(record.ID()); err != nil { - logger.Errorf("failed to delete head record: %w", err) + // avoid multiple calculations in one iteration + if headTimeBoundsIsCalculated { return } + } +} + +func (gc *GC) retentionDurationIsExceeded(ctx context.Context, record *Record) (_ bool, calculated bool, err error) { + mint, maxt := record.mint, record.maxt + if mint > maxt { + // recalculate mint & maxt + mint, maxt, err = gc.timeBoundCalculator.CalculateTimeBounds(ctx, record) + if err != nil { + return false, false, fmt.Errorf("calculate time bounds: %w", err) + } + calculated = true + } + + retentionDeadline := gc.clock.Now().Add(-gc.retentionDuration).UnixMilli() + retentionDurationIsExceeded := retentionDeadline > maxt + + return retentionDurationIsExceeded, calculated, nil +} - logger.Debugf("catalog gc iteration: head: %s: %s", record.ID(), "removed") +func (gc *GC) deleteRecord(_ context.Context, record *Record) (err error) { + if err = os.RemoveAll(filepath.Join(gc.dataDir, record.Dir())); err != nil { + return fmt.Errorf("remove head dir: %w", err) } + + if err = gc.catalog.Delete(record.ID()); err != nil { + return fmt.Errorf("delete head record: %w", err) + } + + logger.Debugf("catalog gc iteration: head: %s: %s", record.ID(), "removed") + + return nil } func (gc *GC) Stop() { diff --git a/pp/go/relabeler/head/catalog/record.go b/pp/go/relabeler/head/catalog/record.go index aabf4f220b..3ebabbec4f 100644 --- a/pp/go/relabeler/head/catalog/record.go +++ b/pp/go/relabeler/head/catalog/record.go @@ -96,16 +96,21 @@ func (r *Record) Acquire() func() { } } -func (r *Record) LastAppendedSegmentID() *uint32 { - return r.lastAppendedSegmentID.RawValue() -} - -func (r *Record) SetLastAppendedSegmentID(segmentID uint32) { - r.lastAppendedSegmentID.Set(segmentID) -} +//func (r *Record) LastAppendedSegmentID() *uint32 { +// return r.lastAppendedSegmentID.RawValue() +//} +// +//func (r *Record) SetLastAppendedSegmentID(segmentID uint32) { +// r.lastAppendedSegmentID.Set(segmentID) +//} func (r *Record) SetNumberOfSegments(numberOfSegments uint32) { r.numberOfSegments = numberOfSegments + if numberOfSegments > 0 { + r.lastAppendedSegmentID.Set(numberOfSegments) + } else { + r.lastAppendedSegmentID = optional.Optional[uint32]{} + } } func NewRecordWithData(id uuid.UUID, diff --git a/pp/go/relabeler/head/head.go b/pp/go/relabeler/head/head.go index 48497b937b..970528749d 100644 --- a/pp/go/relabeler/head/head.go +++ b/pp/go/relabeler/head/head.go @@ -154,13 +154,21 @@ func (rd *RelabelerData) updateOrCreateStatelessRelabeler( return sr, nil } -type LastAppendedSegmentIDSetter interface { - SetLastAppendedSegmentID(segmentID uint32) +//type LastAppendedSegmentIDSetter interface { +// SetLastAppendedSegmentID(segmentID uint32) +//} +// +//type NoOpLastAppendedSegmentIDSetter struct{} +// +//func (NoOpLastAppendedSegmentIDSetter) SetLastAppendedSegmentID(segmentID uint32) {} + +type NumberOfSegmentsSetter interface { + SetNumberOfSegments(numberOfSegments uint32) } -type NoOpLastAppendedSegmentIDSetter struct{} +type NoOpNumberOfSegmentsSetter struct{} -func (NoOpLastAppendedSegmentIDSetter) SetLastAppendedSegmentID(segmentID uint32) {} +func (NoOpNumberOfSegmentsSetter) SetNumberOfSegments(uint32) {} type Head struct { id string diff --git a/pp/go/relabeler/head/load.go b/pp/go/relabeler/head/load.go index 4727f06361..bdcf877558 100644 --- a/pp/go/relabeler/head/load.go +++ b/pp/go/relabeler/head/load.go @@ -27,7 +27,7 @@ func Create( configs []*config.InputRelabelerConfig, numberOfShards uint16, maxSegmentSize uint32, - lastAppendedSegmentIDSetter LastAppendedSegmentIDSetter, + numberOfSegmentsSetter NumberOfSegmentsSetter, registerer prometheus.Registerer, ) (_ *Head, err error) { lsses := make([]*LSS, numberOfShards) @@ -45,7 +45,7 @@ func Create( } }() - swn := newSegmentWriteNotifier(numberOfShards, lastAppendedSegmentIDSetter) + swn := newSegmentWriteNotifier(numberOfShards, numberOfSegmentsSetter) for shardID := uint16(0); shardID < numberOfShards; shardID++ { lsses[shardID], wals[shardID], dataStorages[shardID], err = createShard(dir, shardID, swn, maxSegmentSize) @@ -110,12 +110,12 @@ func Load( configs []*config.InputRelabelerConfig, numberOfShards uint16, maxSegmentSize uint32, - lastAppendedSegmentIDSetter LastAppendedSegmentIDSetter, + numberOfSegmentsSetter NumberOfSegmentsSetter, registerer prometheus.Registerer, ) (_ *Head, corrupted bool, numberOfSegments uint32, err error) { shardLoadResults := make([]ShardLoadResult, numberOfShards) wg := &sync.WaitGroup{} - swn := newSegmentWriteNotifier(numberOfShards, lastAppendedSegmentIDSetter) + swn := newSegmentWriteNotifier(numberOfShards, numberOfSegmentsSetter) for shardID := uint16(0); shardID < numberOfShards; shardID++ { wg.Add(1) shardWalFilePath := filepath.Join(dir, fmt.Sprintf("shard_%d.wal", shardID)) @@ -227,7 +227,7 @@ func (l *ShardLoader) Load() (result ShardLoadResult) { } decoder := cppbridge.NewHeadWalDecoder(targetLss, encoderVersion) - lastReadSegmentID := -1 + var numberOfSegments uint32 = 0 var bytesRead int for { @@ -248,18 +248,17 @@ func (l *ShardLoader) Load() (result ShardLoadResult) { } offset += bytesRead - lastReadSegmentID++ + numberOfSegments++ } - numberOfSegments := lastReadSegmentID + 1 - result.NumberOfSegments = uint32(numberOfSegments) // #nosec G115 // no overflow + result.NumberOfSegments = numberOfSegments sw, err := newSegmentWriter(l.shardID, shardWalFile, l.notifier) if err != nil { result.Err = err return } - l.notifier.Set(l.shardID, uint32(numberOfSegments)) // #nosec G115 // no overflow + l.notifier.Set(l.shardID, numberOfSegments) result.Wal = newShardWal(decoder.CreateEncoder(), l.maxSegmentSize, sw) if result.Err == nil { result.Corrupted = false diff --git a/pp/go/relabeler/head/load_test.go b/pp/go/relabeler/head/load_test.go index a120280d48..0d1eb72722 100644 --- a/pp/go/relabeler/head/load_test.go +++ b/pp/go/relabeler/head/load_test.go @@ -20,9 +20,9 @@ import ( const maxSegmentSize uint32 = 1024 -type noOpLastAppendedSegmentIDSetter struct{} +type noOpNumberOfSegmentsSetter struct{} -func (noOpLastAppendedSegmentIDSetter) SetLastAppendedSegmentID(_ uint32) {} +func (noOpNumberOfSegmentsSetter) SetNumberOfSegments(uint32) {} func TestLoad(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) @@ -49,7 +49,7 @@ func TestLoad(t *testing.T) { var numberOfShards uint16 = 2 headID := "test_head_id" - h, err := head.Create(headID, 0, tmpDir, cfgs, numberOfShards, maxSegmentSize, noOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + h, err := head.Create(headID, 0, tmpDir, cfgs, numberOfShards, maxSegmentSize, noOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) require.NoError(t, err) ls := model.NewLabelSetBuilder().Set("__name__", "wal_metric").Set("job", "test").Build() @@ -122,7 +122,7 @@ func TestLoad(t *testing.T) { require.NoError(t, h.Flush()) require.NoError(t, h.Close()) var corrupted bool - h, corrupted, _, err = head.Load(headID, 0, tmpDir, cfgs, numberOfShards, maxSegmentSize, noOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + h, corrupted, _, err = head.Load(headID, 0, tmpDir, cfgs, numberOfShards, maxSegmentSize, noOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) require.NoError(t, err) require.False(t, corrupted) @@ -224,7 +224,7 @@ func TestLoad(t *testing.T) { h.Stop() require.NoError(t, h.Close()) - h, corrupted, _, err = head.Load(headID, 0, tmpDir, cfgs, numberOfShards, maxSegmentSize, noOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + h, corrupted, _, err = head.Load(headID, 0, tmpDir, cfgs, numberOfShards, maxSegmentSize, noOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) require.NoError(t, err) require.False(t, corrupted) diff --git a/pp/go/relabeler/head/manager/manager.go b/pp/go/relabeler/head/manager/manager.go index fe69bac892..a10d60fbd3 100644 --- a/pp/go/relabeler/head/manager/manager.go +++ b/pp/go/relabeler/head/manager/manager.go @@ -30,6 +30,8 @@ type Catalog interface { sortLess func(lhs, rhs *catalog.Record) bool, ) ([]*catalog.Record, error) Create(numberOfShards uint16) (*catalog.Record, error) + SetStatusWithTimeBounds(id string, status catalog.Status, mint, maxt int64) (*catalog.Record, error) + SetTimeBounds(id string, mint, maxt int64) (*catalog.Record, error) SetStatus(id string, status catalog.Status) (*catalog.Record, error) SetCorrupted(id string) (*catalog.Record, error) } @@ -46,12 +48,12 @@ type Manager struct { registerer prometheus.Registerer } -// SetLastAppendedSegmentIDFn function to set the last added segment id. -type SetLastAppendedSegmentIDFn func(segmentID uint32) +// SetNumberOfSegmentsFunc sets number of segments. +type SetNumberOfSegmentsFunc func(numberOfSegments uint32) -// SetLastAppendedSegmentID to set the last added segment id. -func (fn SetLastAppendedSegmentIDFn) SetLastAppendedSegmentID(segmentID uint32) { - fn(segmentID) +// SetNumberOfSegments sets number of segments. +func (fn SetNumberOfSegmentsFunc) SetNumberOfSegments(numberOfSegments uint32) { + fn(numberOfSegments) } // New init new Manager. @@ -184,10 +186,7 @@ func (m *Manager) Restore(blockDuration time.Duration) (active relabeler.Head, r } func isNumberOfSegmentsMismatched(record *catalog.Record, loadedSegments uint32) bool { - if record.LastAppendedSegmentID() == nil { - return loadedSegments != 0 - } - return *record.LastAppendedSegmentID()+1 != loadedSegments + return record.NumberOfSegments() != loadedSegments } func (m *Manager) loadHead( @@ -208,7 +207,7 @@ func (m *Manager) loadHead( inputRelabelerConfigs, headRecord.NumberOfShards(), m.maxSegmentSize, - SetLastAppendedSegmentIDFn(func(segmentID uint32) { headRecord.SetLastAppendedSegmentID(segmentID) }), + SetNumberOfSegmentsFunc(func(numberOfSegments uint32) { headRecord.SetNumberOfSegments(numberOfSegments) }), m.registerer, ) if err != nil { @@ -221,13 +220,13 @@ func (m *Manager) loadHead( case headRecord.Status() == catalog.StatusActive: // numberOfSegments here is actual number of segments. if numberOfSegments > 0 { - headRecord.SetLastAppendedSegmentID(numberOfSegments - 1) + headRecord.SetNumberOfSegments(numberOfSegments) } case isNumberOfSegmentsMismatched(headRecord, numberOfSegments): corrupted = true // numberOfSegments here is actual number of segments. if numberOfSegments > 0 { - headRecord.SetLastAppendedSegmentID(numberOfSegments - 1) + headRecord.SetNumberOfSegments(numberOfSegments) } logger.Errorf("head: %s number of segments mismatched", headRecord.ID()) } @@ -282,7 +281,7 @@ func (m *Manager) BuildWithConfig( inputRelabelerConfigs, numberOfShards, m.maxSegmentSize, - SetLastAppendedSegmentIDFn(func(segmentID uint32) { headRecord.SetLastAppendedSegmentID(segmentID) }), + SetNumberOfSegmentsFunc(func(numberOfSegments uint32) { headRecord.SetNumberOfSegments(numberOfSegments) }), m.registerer, ) if err != nil { @@ -300,7 +299,8 @@ func (m *Manager) createDiscardableRotatableHead(h relabeler.Head, releaseHeadFn return NewDiscardableRotatableHead( h, func(id string, err error) error { - if _, rotateErr := m.catalog.SetStatus(id, catalog.StatusRotated); rotateErr != nil { + stats := h.Status(1) + if _, rotateErr := m.catalog.SetStatusWithTimeBounds(id, catalog.StatusRotated, stats.HeadStats.MinTime, stats.HeadStats.MaxTime); rotateErr != nil { return errors.Join(err, rotateErr) } m.counter.With(prometheus.Labels{"type": "rotated"}).Inc() diff --git a/pp/go/relabeler/head/manager/time_bound_calculator.go b/pp/go/relabeler/head/manager/time_bound_calculator.go new file mode 100644 index 0000000000..5f4c7478ea --- /dev/null +++ b/pp/go/relabeler/head/manager/time_bound_calculator.go @@ -0,0 +1,51 @@ +package manager + +import ( + "context" + "errors" + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pp/go/relabeler/head" + "github.com/prometheus/prometheus/pp/go/relabeler/head/catalog" + "path/filepath" +) + +type TimeBoundCalculator struct { + dir string + catalog Catalog + registerer prometheus.Registerer +} + +func NewTimeBoundCalculator(dir string, catalog Catalog, registerer prometheus.Registerer) *TimeBoundCalculator { + return &TimeBoundCalculator{dir: dir, catalog: catalog, registerer: registerer} +} + +func (c *TimeBoundCalculator) CalculateTimeBounds(_ context.Context, record *catalog.Record) (mint int64, maxt int64, err error) { + h, _, _, err := head.Load( + record.ID(), + 0, + filepath.Join(c.dir, record.Dir()), + nil, + record.NumberOfShards(), + 0, + head.NoOpNumberOfSegmentsSetter{}, + c.registerer, + ) + if err != nil { + return 0, 0, err + } + + stats := h.Status(1) + mint, maxt = stats.HeadStats.MinTime, stats.HeadStats.MaxTime + h.Stop() + + if err = errors.Join(h.Close(), h.Discard()); err != nil { + return mint, maxt, err + } + + if _, err = c.catalog.SetTimeBounds(record.ID(), mint, maxt); err != nil { + return mint, maxt, fmt.Errorf("set time bounds in record: %w", err) + } + + return mint, maxt, nil +} diff --git a/pp/go/relabeler/head/wal_writer.go b/pp/go/relabeler/head/wal_writer.go index 459e0e9483..0ef97aa14f 100644 --- a/pp/go/relabeler/head/wal_writer.go +++ b/pp/go/relabeler/head/wal_writer.go @@ -129,10 +129,10 @@ func (w *segmentWriter) Close() error { type segmentWriteNotifier struct { mtx sync.Mutex shards []uint32 - setter LastAppendedSegmentIDSetter + setter NumberOfSegmentsSetter } -func newSegmentWriteNotifier(numberOfShards uint16, setter LastAppendedSegmentIDSetter) *segmentWriteNotifier { +func newSegmentWriteNotifier(numberOfShards uint16, setter NumberOfSegmentsSetter) *segmentWriteNotifier { return &segmentWriteNotifier{ shards: make([]uint32, numberOfShards), setter: setter, @@ -145,7 +145,7 @@ func (swn *segmentWriteNotifier) NotifySegmentIsWritten(shardID uint16) { swn.shards[shardID]++ minNumberOfSegments := slices.Min(swn.shards) if minNumberOfSegments > 0 { - swn.setter.SetLastAppendedSegmentID(minNumberOfSegments - 1) + swn.setter.SetNumberOfSegments(minNumberOfSegments) } } diff --git a/pp/go/relabeler/remotewriter/datasource.go b/pp/go/relabeler/remotewriter/datasource.go index 183898087d..c19cee91ad 100644 --- a/pp/go/relabeler/remotewriter/datasource.go +++ b/pp/go/relabeler/remotewriter/datasource.go @@ -5,18 +5,18 @@ import ( "context" "errors" "fmt" - "github.com/prometheus/prometheus/pp/go/util/optional" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pp/go/util/optional" "io" "os" "path/filepath" "sync" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/pp/go/cppbridge" "github.com/prometheus/prometheus/pp/go/relabeler/head/catalog" "github.com/prometheus/prometheus/pp/go/relabeler/logger" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" ) type CorruptMarker interface { @@ -212,7 +212,7 @@ func newSegmentReadyChecker(headRecord *catalog.Record) *segmentReadyChecker { } func (src *segmentReadyChecker) SegmentIsReady(segmentID uint32) (ready bool, outOfRange bool) { - ready = src.headRecord.LastAppendedSegmentID() != nil && *src.headRecord.LastAppendedSegmentID() >= segmentID + ready = segmentID < src.headRecord.NumberOfSegments() outOfRange = (src.headRecord.Status() != catalog.StatusNew && src.headRecord.Status() != catalog.StatusActive) && !ready return ready, outOfRange } diff --git a/pp/go/relabeler/remotewriter/writeloop.go b/pp/go/relabeler/remotewriter/writeloop.go index a794388cc5..4d0473a0d0 100644 --- a/pp/go/relabeler/remotewriter/writeloop.go +++ b/pp/go/relabeler/remotewriter/writeloop.go @@ -203,11 +203,7 @@ func (wl *writeLoop) nextIterator(ctx context.Context, protobufWriter ProtobufWr var targetSegmentID uint32 if cleanStart { - if nextHeadRecord.LastAppendedSegmentID() != nil { - targetSegmentID = *nextHeadRecord.LastAppendedSegmentID() - } else { - targetSegmentID = crw.GetTargetSegmentID() - } + targetSegmentID = nextHeadRecord.NumberOfSegments() } else { targetSegmentID = crw.GetTargetSegmentID() } @@ -298,7 +294,7 @@ func nextHead(ctx context.Context, dataDir string, headCatalog Catalog, headID s return nil, fmt.Errorf("nextHead: no new heads: appropriate head not found") } -func validateHead(ctx context.Context, headDir string) error { +func validateHead(_ context.Context, headDir string) error { dir, err := os.Open(headDir) if err != nil { return err From 07a7196d6dae86f404d44aead9af0c49ff316216 Mon Sep 17 00:00:00 2001 From: "v.kavlakan" Date: Tue, 24 Jun 2025 12:47:55 +0200 Subject: [PATCH 2/2] test fix --- pp/go/relabeler/appender/appender_test.go | 30 +++++++++++------------ 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pp/go/relabeler/appender/appender_test.go b/pp/go/relabeler/appender/appender_test.go index 3c470fe684..1621e58ece 100644 --- a/pp/go/relabeler/appender/appender_test.go +++ b/pp/go/relabeler/appender/appender_test.go @@ -119,7 +119,7 @@ func (s *AppenderSuite) TestManagerRelabelerKeep() { }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() @@ -264,7 +264,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabeling() { }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -419,7 +419,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingAddNewLabel() { }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -578,7 +578,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithExternalLabelsEnd() { }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.Require().NoError(err) @@ -737,7 +737,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithExternalLabelsRelabel( }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -900,7 +900,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithTargetLabels() { }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -1093,7 +1093,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithTargetLabels_Conflicti }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -1287,7 +1287,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithTargetLabels_Conflicti }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -1487,7 +1487,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithRotate() { var generation uint64 headID := "head_id" - hd, err := head.Create(headID, generation, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, generation, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) headsToClose = append(headsToClose, hd) @@ -1510,7 +1510,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithRotate() { inputRelabelerConfigs, numberOfShards, 0, - head.NoOpLastAppendedSegmentIDSetter{}, + head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer, ) s.Require().NoError(buildErr) @@ -1977,7 +1977,7 @@ func (s *AppenderSuite) TestManagerRelabelerKeepWithStaleNans() { }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -2112,7 +2112,7 @@ func (s *AppenderSuite) TestManagerRelabelerKeepWithStaleNans_WithNullTimestamp( }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -2244,7 +2244,7 @@ func (s *AppenderSuite) TestManagerRelabelerKeepWithStaleNans_HonorTimestamps() }() headID := "head_id" - hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, 0, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) defer func() { _ = hd.Close() }() s.T().Log("make appender") @@ -2403,7 +2403,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithRotateWithStaleNans() var generation uint64 headID := fmt.Sprintf("head_id_%d", generation) - hd, err := head.Create(headID, generation, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + hd, err := head.Create(headID, generation, tmpDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(err) headsToClose = append(headsToClose, hd) @@ -2419,7 +2419,7 @@ func (s *AppenderSuite) TestManagerRelabelerRelabelingWithRotateWithStaleNans() tmpDirsToRemove = append(tmpDirsToRemove, newDir) generation++ newHeadID := fmt.Sprintf("head_id_%d", generation) - newHead, buildErr := head.Create(newHeadID, generation, newDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpLastAppendedSegmentIDSetter{}, prometheus.DefaultRegisterer) + newHead, buildErr := head.Create(newHeadID, generation, newDir, inputRelabelerConfigs, numberOfShards, 0, head.NoOpNumberOfSegmentsSetter{}, prometheus.DefaultRegisterer) s.Require().NoError(buildErr) headsToClose = append(headsToClose, newHead) return newHead, nil