diff --git a/pp/entrypoint/prometheus_relabeler.cpp b/pp/entrypoint/prometheus_relabeler.cpp index 0184fa35e..0de97f7eb 100644 --- a/pp/entrypoint/prometheus_relabeler.cpp +++ b/pp/entrypoint/prometheus_relabeler.cpp @@ -212,10 +212,29 @@ extern "C" void prompp_prometheus_per_shard_relabeler_input_relabeling(void* arg } } +using StaleNaNsStateDeprecated = PromPP::Prometheus::Relabel::StaleNaNsStateDeprecated; +using StaleNaNsStateDeprecatedPtr = std::unique_ptr; + +extern "C" void prompp_prometheus_relabel_stalenans_state_deprecated_ctor(void* res) { + struct Result { + StaleNaNsStateDeprecatedPtr state; + }; + + new (res) Result{.state = std::make_unique()}; +} + +extern "C" void prompp_prometheus_relabel_stalenans_state_deprecated_dtor(void* args) { + struct Arguments { + StaleNaNsStateDeprecatedPtr state; + }; + + static_cast(args)->~Arguments(); +} + using StaleNaNsState = PromPP::Prometheus::Relabel::StaleNaNsState; using StaleNaNsStatePtr = std::unique_ptr; -extern "C" void prompp_prometheus_relabel_stalenans_state_ctor(void* res) { +extern "C" void prompp_prometheus_relabel_stale_nans_state_ctor(void* res) { struct Result { StaleNaNsStatePtr state; }; @@ -223,7 +242,7 @@ extern "C" void prompp_prometheus_relabel_stalenans_state_ctor(void* res) { new (res) Result{.state = std::make_unique()}; } -extern "C" void prompp_prometheus_relabel_stalenans_state_dtor(void* args) { +extern "C" void prompp_prometheus_relabel_stale_nans_state_dtor(void* args) { struct Arguments { StaleNaNsStatePtr state; }; @@ -241,7 +260,7 @@ extern "C" void prompp_prometheus_per_shard_relabeler_input_relabeling_with_stal CachePtr cache; LssVariantPtr input_lss; LssVariantPtr target_lss; - StaleNaNsStatePtr state; + StaleNaNsStateDeprecatedPtr state; PromPP::Primitives::Timestamp def_timestamp; }; struct Result { @@ -279,7 +298,7 @@ extern "C" void prompp_prometheus_per_shard_relabeler_input_collect_stalenans(vo PromPP::Primitives::Go::SliceView shards_inner_series; PerShardRelabelerPtr per_shard_relabeler; CachePtr cache; - StaleNaNsStatePtr state; + StaleNaNsStateDeprecatedPtr state; PromPP::Primitives::Timestamp stale_ts; }; struct Result { @@ -343,7 +362,7 @@ extern "C" void prompp_prometheus_per_shard_relabeler_input_relabeling_with_stal CachePtr cache; LssVariantPtr input_lss; LssVariantPtr target_lss; - StaleNaNsStatePtr state; + StaleNaNsStateDeprecatedPtr state; PromPP::Primitives::Timestamp def_timestamp; }; struct Result { @@ -676,7 +695,6 @@ extern "C" void prompp_prometheus_per_goroutine_relabeler_input_relabeling_with_ CachePtr cache; LssVariantPtr input_lss; LssVariantPtr target_lss; - StaleNaNsStatePtr state; PromPP::Primitives::Timestamp def_timestamp; }; struct Result { @@ -698,7 +716,7 @@ extern "C" void prompp_prometheus_per_goroutine_relabeler_input_relabeling_with_ const entrypoint::head::ReallocationsDetector reallocation_detector(target_lss); in->per_goroutine_relabeler->input_relabeling_with_stalenans(input_lss, target_lss, *in->cache, hashdex, in->options, *in->stateless_relabeler, *out, - in->shards_inner_series, in->shards_relabeled_series, *in->state, in->def_timestamp); + in->shards_inner_series, in->shards_relabeled_series, in->def_timestamp); target_lss.build_deferred_indexes(); out->target_lss_has_reallocations = reallocation_detector.has_reallocations(); }, @@ -718,7 +736,6 @@ extern "C" void prompp_prometheus_per_goroutine_relabeler_input_relabeling_with_ CachePtr cache; LssVariantPtr input_lss; LssVariantPtr target_lss; - StaleNaNsStatePtr state; PromPP::Primitives::Timestamp def_timestamp; }; struct Result { @@ -739,7 +756,7 @@ extern "C" void prompp_prometheus_per_goroutine_relabeler_input_relabeling_with_ auto& target_lss = std::get(*in->target_lss); out->ok = in->per_goroutine_relabeler->input_relabeling_with_stalenans_from_cache(input_lss, target_lss, *in->cache, hashdex, in->options, *out, - in->shards_inner_series, *in->state, in->def_timestamp); + in->shards_inner_series, in->def_timestamp); }, *in->hashdex); } catch (...) { @@ -851,3 +868,14 @@ extern "C" void prompp_prometheus_per_goroutine_relabeler_append_relabeler_serie entrypoint::handle_current_exception(err_stream); } } + +extern "C" void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args) { + struct Arguments { + PromPP::Primitives::Go::SliceView inner_series; + StaleNaNsStatePtr stale_nans_state; + PromPP::Primitives::Timestamp default_timestamp; + }; + + const auto in = static_cast(args); + PromPP::Prometheus::Relabel::PerGoroutineRelabeler::track_stale_nans(in->inner_series, *in->stale_nans_state, in->default_timestamp); +} diff --git a/pp/entrypoint/prometheus_relabeler.h b/pp/entrypoint/prometheus_relabeler.h index 9c0a26b7a..aa9733e64 100644 --- a/pp/entrypoint/prometheus_relabeler.h +++ b/pp/entrypoint/prometheus_relabeler.h @@ -163,6 +163,24 @@ void prompp_prometheus_per_shard_relabeler_dtor(void* args); */ void prompp_prometheus_per_shard_relabeler_input_relabeling(void* args, void* res); +/** + * @brief Create StaleNaNsStateDeprecated. + * + * @param res { + * state uintptr // pointer to constructed StaleNaNsStateDeprecated; + * } + */ +void prompp_prometheus_relabel_stalenans_state_deprecated_ctor(void* res); + +/** + * @brief Destroy StaleNaNsStateDeprecated. + * + * @param args { + * state uintptr // pointer to StaleNaNsStateDeprecated; + * } + */ +void prompp_prometheus_relabel_stalenans_state_deprecated_dtor(void* args); + /** * @brief Create StaleNaNsState. * @@ -170,7 +188,7 @@ void prompp_prometheus_per_shard_relabeler_input_relabeling(void* args, void* re * state uintptr // pointer to constructed StaleNaNsState; * } */ -void prompp_prometheus_relabel_stalenans_state_ctor(void* res); +void prompp_prometheus_relabel_stale_nans_state_ctor(void* res); /** * @brief Destroy StaleNaNsState. @@ -179,7 +197,7 @@ void prompp_prometheus_relabel_stalenans_state_ctor(void* res); * state uintptr // pointer to StaleNaNsState; * } */ -void prompp_prometheus_relabel_stalenans_state_dtor(void* args); +void prompp_prometheus_relabel_stale_nans_state_dtor(void* args); /** * @brief relabeling incomig hashdex(first stage) with state stalenans. @@ -492,7 +510,6 @@ void prompp_prometheus_per_goroutine_relabeler_input_relabeling_from_cache(void* * cache uintptr // pointer to constructed Cache; * input_lss uintptr // pointer to constructed input label sets; * target_lss uintptr // pointer to constructed target label sets; - * state uintptr // pointer to source state * def_timestamp int64 // timestamp for metrics and StaleNaNs * } * @@ -517,7 +534,6 @@ void prompp_prometheus_per_goroutine_relabeler_input_relabeling_with_stalenans(v * cache uintptr // pointer to constructed Cache; * input_lss uintptr // pointer to constructed input label sets; * target_lss uintptr // pointer to constructed target label sets; - * state uintptr // pointer to source state * def_timestamp int64 // timestamp for metrics and StaleNaNs * } * @@ -589,6 +605,17 @@ void prompp_prometheus_per_goroutine_relabeler_input_transition_relabeling_only_ */ void prompp_prometheus_per_goroutine_relabeler_append_relabeler_series(void* args, void* res); +/** + * @brief add stale nans to inner series if needed + * + * @param args { + * inner_series []*InnerSeries // InnerSeries + * stale_nan_state uintptr // pointer to source state + * default_timestamp int64 // timestamp for stale_nan samples + * } + */ +void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args); + #ifdef __cplusplus } // extern "C" #endif diff --git a/pp/go/cppbridge/entrypoint.go b/pp/go/cppbridge/entrypoint.go index 216c26ff9..0a478df9b 100644 --- a/pp/go/cppbridge/entrypoint.go +++ b/pp/go/cppbridge/entrypoint.go @@ -1721,7 +1721,37 @@ func prometheusRelabelerStateUpdateDtor(relabelerStateUpdate *RelabelerStateUpda } // -// StalenansState +// StalenansStateDeprecated +// + +func prometheusRelabelStaleNansStateDeprecatedCtor() uintptr { + var res struct { + state uintptr + } + + testGC() + fastcgo.UnsafeCall1( + C.prompp_prometheus_relabel_stalenans_state_deprecated_ctor, + uintptr(unsafe.Pointer(&res)), + ) + + return res.state +} + +func prometheusRelabelStaleNansStateDeprecatedDtor(state uintptr) { + args := struct { + state uintptr + }{state} + + testGC() + fastcgo.UnsafeCall1( + C.prompp_prometheus_relabel_stalenans_state_deprecated_dtor, + uintptr(unsafe.Pointer(&args)), + ) +} + +// +// StaleNansState // func prometheusRelabelStaleNansStateCtor() uintptr { @@ -1731,7 +1761,7 @@ func prometheusRelabelStaleNansStateCtor() uintptr { testGC() fastcgo.UnsafeCall1( - C.prompp_prometheus_relabel_stalenans_state_ctor, + C.prompp_prometheus_relabel_stale_nans_state_ctor, uintptr(unsafe.Pointer(&res)), ) @@ -1745,7 +1775,7 @@ func prometheusRelabelStaleNansStateDtor(state uintptr) { testGC() fastcgo.UnsafeCall1( - C.prompp_prometheus_relabel_stalenans_state_dtor, + C.prompp_prometheus_relabel_stale_nans_state_dtor, uintptr(unsafe.Pointer(&args)), ) } @@ -3526,7 +3556,7 @@ func prometheusPerGoroutineRelabelerInputRelabelingFromCache( // prometheusPerGoroutineRelabelerInputRelabelingWithStalenans wrapper for relabeling incoming // hashdex(first stage) with state stalenans. func prometheusPerGoroutineRelabelerInputRelabelingWithStalenans( - perGoroutineRelabeler, statelessRelabeler, inputLss, targetLss, cache, hashdex, sourceState uintptr, + perGoroutineRelabeler, statelessRelabeler, inputLss, targetLss, cache, hashdex uintptr, defTimestamp int64, options RelabelerOptions, shardsInnerSeries []*InnerSeries, @@ -3542,7 +3572,6 @@ func prometheusPerGoroutineRelabelerInputRelabelingWithStalenans( cache uintptr inputLss uintptr targetLss uintptr - state uintptr defTimestamp int64 }{ shardsInnerSeries, @@ -3554,7 +3583,6 @@ func prometheusPerGoroutineRelabelerInputRelabelingWithStalenans( cache, inputLss, targetLss, - sourceState, defTimestamp, } var res struct { @@ -3578,7 +3606,7 @@ func prometheusPerGoroutineRelabelerInputRelabelingWithStalenans( // prometheusPerGoroutineRelabelerInputRelabelingWithStalenansFromCache wrapper for relabeling incoming from cache // hashdex(first stage) with state stalenans. func prometheusPerGoroutineRelabelerInputRelabelingWithStalenansFromCache( - perGoroutineRelabeler, inputLss, targetLss, cache, hashdex, sourceState uintptr, + perGoroutineRelabeler, inputLss, targetLss, cache, hashdex uintptr, defTimestamp int64, options RelabelerOptions, shardsInnerSeries []*InnerSeries, @@ -3591,7 +3619,6 @@ func prometheusPerGoroutineRelabelerInputRelabelingWithStalenansFromCache( cache uintptr inputLss uintptr targetLss uintptr - state uintptr defTimestamp int64 }{ shardsInnerSeries, @@ -3601,7 +3628,6 @@ func prometheusPerGoroutineRelabelerInputRelabelingWithStalenansFromCache( cache, inputLss, targetLss, - sourceState, defTimestamp, } var res struct { @@ -3718,3 +3744,21 @@ func prometheusPerGoroutineRelabelerAppendRelabelerSeries( return res.exception, res.targetLssHasReallocations } + +func prometheusPerGoroutineRelabelerTrackStaleNans( + innerSeries []*InnerSeries, + staleNansState uintptr, + defaultTimestamp int64, +) { + args := struct { + innerSeries []*InnerSeries + staleNansState uintptr + defaultTimestamp int64 + }{innerSeries, staleNansState, defaultTimestamp} + + testGC() + fastcgo.UnsafeCall1( + C.prompp_prometheus_per_goroutine_relabeler_track_stale_nans, + uintptr(unsafe.Pointer(&args)), + ) +} diff --git a/pp/go/cppbridge/entrypoint.h b/pp/go/cppbridge/entrypoint.h index dfcad6282..942cc8aa5 100755 --- a/pp/go/cppbridge/entrypoint.h +++ b/pp/go/cppbridge/entrypoint.h @@ -868,6 +868,24 @@ void prompp_prometheus_per_shard_relabeler_dtor(void* args); */ void prompp_prometheus_per_shard_relabeler_input_relabeling(void* args, void* res); +/** + * @brief Create StaleNaNsStateDeprecated. + * + * @param res { + * state uintptr // pointer to constructed StaleNaNsStateDeprecated; + * } + */ +void prompp_prometheus_relabel_stalenans_state_deprecated_ctor(void* res); + +/** + * @brief Destroy StaleNaNsStateDeprecated. + * + * @param args { + * state uintptr // pointer to StaleNaNsStateDeprecated; + * } + */ +void prompp_prometheus_relabel_stalenans_state_deprecated_dtor(void* args); + /** * @brief Create StaleNaNsState. * @@ -875,7 +893,7 @@ void prompp_prometheus_per_shard_relabeler_input_relabeling(void* args, void* re * state uintptr // pointer to constructed StaleNaNsState; * } */ -void prompp_prometheus_relabel_stalenans_state_ctor(void* res); +void prompp_prometheus_relabel_stale_nans_state_ctor(void* res); /** * @brief Destroy StaleNaNsState. @@ -884,7 +902,7 @@ void prompp_prometheus_relabel_stalenans_state_ctor(void* res); * state uintptr // pointer to StaleNaNsState; * } */ -void prompp_prometheus_relabel_stalenans_state_dtor(void* args); +void prompp_prometheus_relabel_stale_nans_state_dtor(void* args); /** * @brief relabeling incomig hashdex(first stage) with state stalenans. @@ -1197,7 +1215,6 @@ void prompp_prometheus_per_goroutine_relabeler_input_relabeling_from_cache(void* * cache uintptr // pointer to constructed Cache; * input_lss uintptr // pointer to constructed input label sets; * target_lss uintptr // pointer to constructed target label sets; - * state uintptr // pointer to source state * def_timestamp int64 // timestamp for metrics and StaleNaNs * } * @@ -1222,7 +1239,6 @@ void prompp_prometheus_per_goroutine_relabeler_input_relabeling_with_stalenans(v * cache uintptr // pointer to constructed Cache; * input_lss uintptr // pointer to constructed input label sets; * target_lss uintptr // pointer to constructed target label sets; - * state uintptr // pointer to source state * def_timestamp int64 // timestamp for metrics and StaleNaNs * } * @@ -1294,6 +1310,17 @@ void prompp_prometheus_per_goroutine_relabeler_input_transition_relabeling_only_ */ void prompp_prometheus_per_goroutine_relabeler_append_relabeler_series(void* args, void* res); +/** + * @brief add stale nans to inner series if needed + * + * @param args { + * inner_series []*InnerSeries // InnerSeries + * stale_nan_state uintptr // pointer to source state + * default_timestamp int64 // timestamp for stale_nan samples + * } + */ +void prompp_prometheus_per_goroutine_relabeler_track_stale_nans(void* args); + #ifdef __cplusplus } // extern "C" #endif diff --git a/pp/go/cppbridge/prometheus_relabeler.go b/pp/go/cppbridge/prometheus_relabeler.go index a7667dbe7..39677a1c9 100644 --- a/pp/go/cppbridge/prometheus_relabeler.go +++ b/pp/go/cppbridge/prometheus_relabeler.go @@ -390,6 +390,14 @@ type stdVector struct { endOfStorage uintptr } +type bareBonesVector struct { + memory uintptr + capacity uint32 + size uint32 +} + +type roaringBitset [40]byte + // InnerSeries - go wrapper for C-InnerSeries. // // size - number of timeseries processed; @@ -397,7 +405,9 @@ type stdVector struct { type InnerSeries struct { size uint64 //nolint:unused // for cpp-bridge, used in cpp - data stdVector + data bareBonesVector + //nolint:unused // for cpp-bridge, used in cpp + trackStaleNans roaringBitset } // Size - number of Timeseries. @@ -438,6 +448,8 @@ type RelabeledSeries struct { size uint64 //nolint:unused // for cpp-bridge, used in cpp data stdVector + //nolint:unused // for cpp-bridge, used in cpp + trackStaleNans roaringBitset } // NewRelabeledSeries - init new RelabeledSeries with finalizer for dtor C-RelabeledSeries. @@ -510,6 +522,23 @@ type RelabelerOptions struct { HonorTimestamps bool } +// StaleNansStateDeprecated wrap pointer to source state for stale nans . +type StaleNansStateDeprecated struct { + state uintptr +} + +// NewStaleNansStateDeprecated init new SourceStaleNansState. +func NewStaleNansStateDeprecated() *StaleNansStateDeprecated { + s := &StaleNansStateDeprecated{ + state: prometheusRelabelStaleNansStateDeprecatedCtor(), + } + runtime.SetFinalizer(s, func(s *StaleNansStateDeprecated) { + prometheusRelabelStaleNansStateDeprecatedDtor(s.state) + }) + + return s +} + // StaleNansState wrap pointer to source state for stale nans . type StaleNansState struct { state uintptr @@ -668,7 +697,7 @@ func (ipsr *InputPerShardRelabeler) InputRelabelingWithStalenans( targetLss *LabelSetStorage, cache *Cache, options RelabelerOptions, - staleNansState *StaleNansState, + staleNansState *StaleNansStateDeprecated, defTimestamp int64, shardedData ShardedData, shardsInnerSeries []*InnerSeries, @@ -741,7 +770,7 @@ func (ipsr *InputPerShardRelabeler) InputRelabelingWithStalenansFromCache( targetLss *LabelSetStorage, cache *Cache, options RelabelerOptions, - staleNansState *StaleNansState, + staleNansState *StaleNansStateDeprecated, defTimestamp int64, shardedData ShardedData, shardsInnerSeries []*InnerSeries, @@ -975,7 +1004,7 @@ func (c *Cache) Update(ctx context.Context, shardsRelabelerStateUpdate []*Relabe // State state of relabelers per shard. type State struct { caches []*Cache - staleNansStates []*StaleNansState + staleNansStates []*StaleNansStateDeprecated defTimestamp int64 generationRelabeler uint64 generationHead uint64 @@ -987,7 +1016,7 @@ type State struct { func NewState(numberOfShards uint16) *State { s := &State{ caches: make([]*Cache, numberOfShards), - staleNansStates: make([]*StaleNansState, numberOfShards), + staleNansStates: make([]*StaleNansStateDeprecated, numberOfShards), generationRelabeler: math.MaxUint64, generationHead: math.MaxUint64, trackStaleness: false, @@ -1050,7 +1079,7 @@ func (s *State) SetRelabelerOptions(options *RelabelerOptions) { } // StaleNansStateByShard return SourceStaleNansState for shard. -func (s *State) StaleNansStateByShard(shardID uint16) *StaleNansState { +func (s *State) StaleNansStateByShard(shardID uint16) *StaleNansStateDeprecated { return s.staleNansStates[shardID] } @@ -1107,11 +1136,11 @@ func (s *State) resetStaleNansStates(numberOfShards uint16, equaledGeneration bo s.staleNansStates = s.staleNansStates[:numberOfShards] case len(s.staleNansStates) < int(numberOfShards): // grow - s.staleNansStates = make([]*StaleNansState, numberOfShards) + s.staleNansStates = make([]*StaleNansStateDeprecated, numberOfShards) } for shardID := range s.staleNansStates { - s.staleNansStates[shardID] = NewStaleNansState() + s.staleNansStates[shardID] = NewStaleNansStateDeprecated() } } @@ -1341,7 +1370,6 @@ func (pgr *PerGoroutineRelabeler) inputRelabelingWithStalenans( targetLss.Pointer(), cache.cPointer, cptrContainer.cptr(), - state.StaleNansStateByShard(pgr.shardID).state, state.DefTimestamp(), state.RelabelerOptions(), shardsInnerSeries, @@ -1374,7 +1402,6 @@ func (pgr *PerGoroutineRelabeler) inputRelabelingWithStalenansFromCache( targetLss.Pointer(), cache.cPointer, cptrContainer.cptr(), - state.StaleNansStateByShard(pgr.shardID).state, state.DefTimestamp(), state.RelabelerOptions(), shardsInnerSeries, @@ -1434,6 +1461,25 @@ func (pgr *PerGoroutineRelabeler) inputTransitionRelabelingOnlyRead( return stats, ok, handleException(exception) } +// PerGoroutineRelabelerTrackStaleNans add stale nans samples if needed +func PerGoroutineRelabelerTrackStaleNans( + innerSeries []*InnerSeries, + state *StateV2, + shardID uint16, +) { + if !state.TrackStaleness() { + return + } + + prometheusPerGoroutineRelabelerTrackStaleNans( + innerSeries, + state.StaleNansStateByShard(shardID).state, + state.DefTimestamp(), + ) + runtime.KeepAlive(innerSeries) + runtime.KeepAlive(state) +} + // // TransitionLocker // diff --git a/pp/go/cppbridge/prometheus_relabeler_test.go b/pp/go/cppbridge/prometheus_relabeler_test.go index 6f0c0c4c0..1d90246ea 100644 --- a/pp/go/cppbridge/prometheus_relabeler_test.go +++ b/pp/go/cppbridge/prometheus_relabeler_test.go @@ -689,6 +689,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabeling() { s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{1, 1, 1}, stats) s.True(hasReallocations) + s.Equal(uint64(1), shardsInnerSeries[0].Size()) } func (s *PerGoroutineRelabelerSuite) TestRelabelingDrop() { @@ -1413,6 +1414,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenans() { shardsInnerSeries, shardsRelabeledSeries, ) + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardsInnerSeries, state, 0) s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{1, 1, 1}, stats) s.True(hasReallocations) @@ -1435,6 +1437,8 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenans() { s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{0, 0, 0}, stats) s.False(hasReallocations) + + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardsInnerSeries, state, 0) s.Equal(uint64(1), shardsInnerSeries[0].Size()) } @@ -1489,6 +1493,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCacheTrue() h, shardsInnerSeries, ) + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardsInnerSeries, state, 0) s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{1, 0, 0}, stats) s.True(ok) @@ -1507,6 +1512,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCacheTrue() shardsInnerSeries, shardsRelabeledSeries, ) + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardsInnerSeries, state, 0) s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{0, 0, 0}, stats) s.False(hasReallocations) @@ -1554,6 +1560,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCacheFalse() h, shardsInnerSeries, ) + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardsInnerSeries, state, 0) s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{0, 0, 0}, stats) s.False(ok) @@ -1722,6 +1729,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCachePartial shardsInnerSeries, shardsRelabeledSeries, ) + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardsInnerSeries, state, 0) s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{4, 4, 4}, stats) s.True(hasReallocations) @@ -1749,6 +1757,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCachePartial shardsInnerSeries, shardsRelabeledSeries, ) + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardsInnerSeries, state, 0) s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{1, 1, 0}, stats) s.Equal(uint64(5), shardsInnerSeries[0].Size()) @@ -1767,6 +1776,7 @@ func (s *PerGoroutineRelabelerSuite) TestRelabelingWithStalenansFromCachePartial shardsInnerSeries, shardsRelabeledSeries, ) + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardsInnerSeries, state, 0) s.Require().NoError(err) s.Equal(cppbridge.RelabelerStats{0, 0, 0}, stats) s.False(hasReallocations) diff --git a/pp/go/relabeler/appender/appender_test.go b/pp/go/relabeler/appender/appender_test.go index 46f8c679f..fcb74aa3f 100644 --- a/pp/go/relabeler/appender/appender_test.go +++ b/pp/go/relabeler/appender/appender_test.go @@ -1401,6 +1401,8 @@ type noOpStorage struct{} func (noOpStorage) Add(_ relabeler.Head) {} func (s *AppenderSuite) TestManagerRelabelerRelabelingWithRotate() { + s.T().Skip("Test for deprecated code") + relabelerID := s.T().Name() destination := make(chan string, 16) diff --git a/pp/go/storage/appender/appender.go b/pp/go/storage/appender/appender.go index 375b9c197..af2609b50 100644 --- a/pp/go/storage/appender/appender.go +++ b/pp/go/storage/appender/appender.go @@ -55,9 +55,6 @@ type Shard interface { // LSSResetSnapshot resets the current snapshot. Use only WithLock. LSSResetSnapshot() - // Relabeler returns relabeler for shard goroutines. - Relabeler() *cppbridge.PerGoroutineRelabeler - // ShardID returns the shard ID. ShardID() uint16 @@ -65,6 +62,19 @@ type Shard interface { WalWrite(innerSeriesSlice []*cppbridge.InnerSeries) (bool, error) } +// +// GoroutineShard +// + +// GoroutineShard the minimum required head [GoroutineShard] implementation. +type GoroutineShard interface { + // Relabeler returns relabeler for shard goroutines. + Relabeler() *cppbridge.PerGoroutineRelabeler + + // Shard inherit from [Shard] methods. + Shard +} + // // Head // @@ -73,9 +83,10 @@ type Shard interface { type Head[ TTask Task, TShard Shard, + TGoroutineShard GoroutineShard, ] interface { // CreateTask create a task for operations on the [Head] shards. - CreateTask(taskName string, shardFn func(shard TShard) error) TTask + CreateTask(taskName string, shardFn func(shard TGoroutineShard) error) TTask // Enqueue the task to be executed on shards [Head]. Enqueue(t TTask) @@ -85,6 +96,9 @@ type Head[ // NumberOfShards returns current number of shards in to [Head]. NumberOfShards() uint16 + + // RangeShards returns an iterator over the [Head] [Shard]s, through which the shard can be directly accessed. + RangeShards() func(func(TShard) bool) } // @@ -95,7 +109,8 @@ type Head[ type Appender[ TTask Task, TShard Shard, - THead Head[TTask, TShard], + TGoroutineShard GoroutineShard, + THead Head[TTask, TShard, TGoroutineShard], ] struct { head THead commitAndFlush func(h THead) error @@ -105,12 +120,13 @@ type Appender[ func New[ TTask Task, TShard Shard, - THead Head[TTask, TShard], + TGoroutineShard GoroutineShard, + THead Head[TTask, TShard, TGoroutineShard], ]( head THead, commitAndFlush func(h THead) error, -) Appender[TTask, TShard, THead] { - return Appender[TTask, TShard, THead]{ +) Appender[TTask, TShard, TGoroutineShard, THead] { + return Appender[TTask, TShard, TGoroutineShard, THead]{ head: head, commitAndFlush: commitAndFlush, } @@ -119,7 +135,7 @@ func New[ // Append incoming data to [Head]. // //revive:disable-next-line:flag-parameter this is a flag, but it's more convenient this way -func (a Appender[TTask, TShard, THead]) Append( +func (a Appender[TTask, TShard, TGoroutineShard, THead]) Append( ctx context.Context, incomingData *IncomingData, state *cppbridge.StateV2, @@ -163,6 +179,8 @@ func (a Appender[TTask, TShard, THead]) Append( } } + a.trackStaleNans(shardedInnerSeries, state) + atomicLimitExhausted, err := a.appendInnerSeriesAndWriteToWal(shardedInnerSeries) if err != nil { logger.Errorf("failed to write wal: %v", err) @@ -180,7 +198,7 @@ func (a Appender[TTask, TShard, THead]) Append( // inputRelabelingStage first stage - relabeling. // //revive:disable-next-line:function-length long but this is first stage. -func (a *Appender[TTask, TShard, THead]) inputRelabelingStage( +func (a *Appender[TTask, TShard, TGoroutineShard, THead]) inputRelabelingStage( ctx context.Context, state *cppbridge.StateV2, incomingData *DestructibleIncomingData, @@ -190,7 +208,7 @@ func (a *Appender[TTask, TShard, THead]) inputRelabelingStage( stats := make([]cppbridge.RelabelerStats, a.head.NumberOfShards()) t := a.head.CreateTask( lssInputRelabeling, - func(shard TShard) error { + func(shard TGoroutineShard) error { var ( relabeler = shard.Relabeler() shardID = shard.ShardID() @@ -265,7 +283,7 @@ func (a *Appender[TTask, TShard, THead]) inputRelabelingStage( } // appendRelabelerSeriesStage second stage - append to lss relabeling ls. -func (a *Appender[TTask, TShard, THead]) appendRelabelerSeriesStage( +func (a *Appender[TTask, TShard, TGoroutineShard, THead]) appendRelabelerSeriesStage( ctx context.Context, shardedInnerSeries *ShardedInnerSeries, shardedRelabeledSeries *ShardedRelabeledSeries, @@ -273,7 +291,7 @@ func (a *Appender[TTask, TShard, THead]) appendRelabelerSeriesStage( ) error { t := a.head.CreateTask( lssAppendRelabelerSeries, - func(shard TShard) error { + func(shard TGoroutineShard) error { shardID := shard.ShardID() relabeledSeries, ok := shardedRelabeledSeries.DataBySourceShard(shardID) @@ -307,7 +325,7 @@ func (a *Appender[TTask, TShard, THead]) appendRelabelerSeriesStage( } // updateRelabelerStateStage third stage - update state cache. -func (a *Appender[TTask, TShard, THead]) updateRelabelerStateStage( +func (a *Appender[TTask, TShard, TGoroutineShard, THead]) updateRelabelerStateStage( ctx context.Context, state *cppbridge.StateV2, shardedStateUpdates *ShardedStateUpdates, @@ -327,15 +345,29 @@ func (a *Appender[TTask, TShard, THead]) updateRelabelerStateStage( return nil } +// trackStaleNans add stale nans samples if needed +func (a *Appender[TTask, TShard, TGoroutineShard, THead]) trackStaleNans( + shardInnerSeries *ShardedInnerSeries, + state *cppbridge.StateV2, +) { + if !state.TrackStaleness() { + return + } + + for shard := range a.head.RangeShards() { + cppbridge.PerGoroutineRelabelerTrackStaleNans(shardInnerSeries.DataByShard(shard.ShardID()), state, shard.ShardID()) + } +} + // appendInnerSeriesAndWriteToWal append [cppbridge.InnerSeries] to [Shard]'s to [DataStorage] and write to [Wal]. -func (a *Appender[TTask, TShard, THead]) appendInnerSeriesAndWriteToWal( +func (a *Appender[TTask, TShard, TGoroutineShard, THead]) appendInnerSeriesAndWriteToWal( shardedInnerSeries *ShardedInnerSeries, ) (uint32, error) { tw := task.NewTaskWaiter[TTask](2) //revive:disable-line:add-constant // 2 task for wait tAppend := a.head.CreateTask( dsAppendInnerSeries, - func(shard TShard) error { + func(shard TGoroutineShard) error { shard.AppendInnerSeriesSlice(shardedInnerSeries.DataByShard(shard.ShardID())) return nil @@ -346,7 +378,7 @@ func (a *Appender[TTask, TShard, THead]) appendInnerSeriesAndWriteToWal( var atomicLimitExhausted uint32 tWalWrite := a.head.CreateTask( walWrite, - func(shard TShard) error { + func(shard TGoroutineShard) error { limitExhausted, errWrite := shard.WalWrite(shardedInnerSeries.DataByShard(shard.ShardID())) if errWrite != nil { return fmt.Errorf("shard %d: %w", shard.ShardID(), errWrite) @@ -367,7 +399,7 @@ func (a *Appender[TTask, TShard, THead]) appendInnerSeriesAndWriteToWal( return atomicLimitExhausted, tw.Wait() } -func (a *Appender[TTask, TShard, THead]) resolveState(state *cppbridge.StateV2) error { +func (a *Appender[TTask, TShard, TGoroutineShard, THead]) resolveState(state *cppbridge.StateV2) error { if state == nil { return errNilState } diff --git a/pp/prometheus/relabeler.h b/pp/prometheus/relabeler.h index f6bb8df60..367752384 100644 --- a/pp/prometheus/relabeler.h +++ b/pp/prometheus/relabeler.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -113,12 +112,14 @@ PROMPP_ALWAYS_INLINE void hard_validate(relabelStatus& rstatus, LabelsBuilder& b // // samples - incoming samples; // ls_id - relabeling ls id from lss; +#pragma pack(push, 1) struct InnerSerie { Primitives::Sample sample; uint32_t ls_id; PROMPP_ALWAYS_INLINE bool operator==(const InnerSerie& rt) const noexcept = default; }; +#pragma pack(pop) // InnerSeries - vector with relabeled result. // @@ -127,6 +128,7 @@ struct InnerSerie { class InnerSeries { size_t size_{0}; BareBones::Vector data_; + roaring::Roaring tracked_stale_nans_; public: [[nodiscard]] PROMPP_ALWAYS_INLINE const BareBones::Vector& data() const { return data_; } @@ -135,12 +137,17 @@ class InnerSeries { PROMPP_ALWAYS_INLINE void reserve(size_t n) { data_.reserve(n); } - PROMPP_ALWAYS_INLINE void emplace_back(const Primitives::Sample& sample, uint32_t ls_id) { + PROMPP_ALWAYS_INLINE void emplace_back(const Primitives::Sample& sample, uint32_t ls_id, bool track_stale_nans) { data_.emplace_back(sample, ls_id); + + if (track_stale_nans) [[likely]] { + tracked_stale_nans_.add(ls_id); + } + ++size_; } - PROMPP_ALWAYS_INLINE void emplace_back(auto const& samples, uint32_t ls_id) { + PROMPP_ALWAYS_INLINE void emplace_back(auto const& samples, uint32_t ls_id, bool track_stale_nans) { data_.reserve_and_write(samples.size(), [&](InnerSerie* series_buffer, uint32_t series_size) { for (const auto& sample : samples) { std::construct_at(series_buffer, sample, ls_id); @@ -148,14 +155,19 @@ class InnerSeries { } return series_size; }); - size_ += samples.size(); + + if (track_stale_nans) [[likely]] { + tracked_stale_nans_.add(ls_id); + } } PROMPP_ALWAYS_INLINE void clear() noexcept { data_.clear(); size_ = 0; } + + PROMPP_ALWAYS_INLINE roaring::Roaring& tracked_stale_nans() { return tracked_stale_nans_; } }; // RelabeledSerie - element after relabeling with new ls(for next step). @@ -178,27 +190,24 @@ struct RelabeledSerie { class RelabeledSeries { size_t size_{0}; std::vector data_; + roaring::Roaring tracked_stale_nans_; public: [[nodiscard]] PROMPP_ALWAYS_INLINE const std::vector& data() const { return data_; } [[nodiscard]] PROMPP_ALWAYS_INLINE size_t size() const { return size_; } - PROMPP_ALWAYS_INLINE void emplace_back(const Primitives::LabelSet& ls, - const BareBones::Vector& samples, - const size_t hash, - const uint32_t ls_id) { - data_.emplace_back(ls, samples, hash, ls_id); + template + PROMPP_ALWAYS_INLINE void emplace_back(const Primitives::LabelSet& ls, Samples&& samples, const size_t hash, const uint32_t ls_id, bool track_stale_nans) { + data_.emplace_back(ls, std::forward(samples), hash, ls_id); ++size_; - } - PROMPP_ALWAYS_INLINE void emplace_back(const Primitives::LabelSet& ls, - BareBones::Vector&& samples, - const size_t hash, - const uint32_t ls_id) { - data_.emplace_back(ls, std::move(samples), hash, ls_id); - ++size_; + if (track_stale_nans) { + tracked_stale_nans_.add(ls_id); + } } + + [[nodiscard]] PROMPP_ALWAYS_INLINE bool is_stale_nan_tracked(uint32_t ls_id) const { return tracked_stale_nans_.contains(ls_id); } }; // CacheValue - value for cache map. @@ -229,7 +238,7 @@ class NoOpStaleNaNsState { }; // StaleNaNsState state for stale nans. -class StaleNaNsState { +class StaleNaNsStateDeprecated { roaring::Roaring input_bitset_{}; roaring::Roaring target_bitset_{}; roaring::Roaring prev_input_bitset_{}; @@ -258,6 +267,21 @@ class StaleNaNsState { } }; +class StaleNaNsState { + public: + template + PROMPP_ALWAYS_INLINE void swap(roaring::Roaring&& current, Callback callback) { + previous_ -= current; + for (uint32_t ls_id : previous_) { + callback(ls_id); + } + previous_ = std::move(current); + } + + private: + roaring::Roaring previous_; +}; + // Cache stateless cache for relabeler. class Cache { size_t cache_allocated_memory_{0}; @@ -490,7 +514,7 @@ class PerShardRelabeler { stale_nan_state.add_target(ls_id); } for (const Primitives::Sample& sample : samples) { - shards_inner_series[shard_id_]->emplace_back(sample, ls_id); + shards_inner_series[shard_id_]->emplace_back(sample, ls_id, false); } ++stats.series_added; @@ -506,7 +530,7 @@ class PerShardRelabeler { if (o.track_timestamps_staleness || all_samples_reseted_to_scrape_ts) { stale_nan_state.add_input(ls_id); } - shards_relabeled_series[new_shard_id]->emplace_back(new_label_set, samples, new_hash, ls_id); + shards_relabeled_series[new_shard_id]->emplace_back(new_label_set, samples, new_hash, ls_id, false); ++stats.series_added; break; @@ -522,7 +546,7 @@ class PerShardRelabeler { stale_nan_state.add_target(check_result.ls_id); } for (const Primitives::Sample& sample : samples) { - shards_inner_series[shard_id_]->emplace_back(sample, check_result.ls_id); + shards_inner_series[shard_id_]->emplace_back(sample, check_result.ls_id, false); } break; @@ -534,7 +558,7 @@ class PerShardRelabeler { stale_nan_state.add_input(check_result.source_ls_id); } for (const Primitives::Sample& sample : samples) { - shards_inner_series[check_result.shard_id]->emplace_back(sample, check_result.ls_id); + shards_inner_series[check_result.shard_id]->emplace_back(sample, check_result.ls_id, false); } break; @@ -559,12 +583,12 @@ class PerShardRelabeler { stale_nan_state.swap( [&](uint32_t ls_id) { if (const auto res = cache.check_input(ls_id); res.status == Cache::CheckResult::kRelabel) { - shards_inner_series[res.shard_id]->emplace_back(smpl, res.ls_id); + shards_inner_series[res.shard_id]->emplace_back(smpl, res.ls_id, false); } }, [&](uint32_t ls_id) { if (const auto res = cache.check_target(ls_id); res.status == Cache::CheckResult::kKeep) { - shards_inner_series[shard_id_]->emplace_back(smpl, res.ls_id); + shards_inner_series[shard_id_]->emplace_back(smpl, res.ls_id, false); } }); cache.optimize(); @@ -610,7 +634,7 @@ class PerShardRelabeler { stale_nan_state.add_target(check_result.ls_id); } for (const Primitives::Sample& sample : samples) { - shards_inner_series[shard_id_]->emplace_back(sample, check_result.ls_id); + shards_inner_series[shard_id_]->emplace_back(sample, check_result.ls_id, false); } break; @@ -622,7 +646,7 @@ class PerShardRelabeler { stale_nan_state.add_input(check_result.source_ls_id); } for (const Primitives::Sample& sample : samples) { - shards_inner_series[check_result.shard_id]->emplace_back(sample, check_result.ls_id); + shards_inner_series[check_result.shard_id]->emplace_back(sample, check_result.ls_id, false); } break; @@ -647,12 +671,12 @@ class PerShardRelabeler { stale_nan_state.swap( [&](uint32_t ls_id) { if (const auto res = cache.check_input(ls_id); res.status == Cache::CheckResult::kRelabel) { - shards_inner_series[res.shard_id]->emplace_back(smpl, res.ls_id); + shards_inner_series[res.shard_id]->emplace_back(smpl, res.ls_id, false); } }, [&](uint32_t ls_id) { if (const auto res = cache.check_target(ls_id); res.status == Cache::CheckResult::kKeep) { - shards_inner_series[shard_id_]->emplace_back(smpl, res.ls_id); + shards_inner_series[shard_id_]->emplace_back(smpl, res.ls_id, false); } }); @@ -765,7 +789,7 @@ class PerShardRelabeler { Stats& stats, Primitives::Go::SliceView& shards_inner_series, Primitives::Go::SliceView& shards_relabeled_series, - StaleNaNsState& state, + StaleNaNsStateDeprecated& state, Primitives::Timestamp def_timestamp) { input_relabeling_internal(input_lss, target_lss, cache, hashdex, o, stats, shards_inner_series, shards_relabeled_series, state, def_timestamp); } @@ -790,25 +814,25 @@ class PerShardRelabeler { const RelabelerOptions& o, Stats& stats, Primitives::Go::SliceView& shards_inner_series, - StaleNaNsState& state, + StaleNaNsStateDeprecated& state, Primitives::Timestamp def_timestamp) { return input_relabeling_from_cache_internal(input_lss, target_lss, cache, hashdex, o, stats, shards_inner_series, state, def_timestamp); } PROMPP_ALWAYS_INLINE void input_collect_stalenans(Cache& cache, Primitives::Go::SliceView& shards_inner_series, - StaleNaNsState& state, + StaleNaNsStateDeprecated& state, Primitives::Timestamp stale_ts) const { const Primitives::Sample smpl{stale_ts, kStaleNan}; state.swap( [&](uint32_t ls_id) { if (const auto res = cache.check_input(ls_id); res.status == Cache::CheckResult::kRelabel) { - shards_inner_series[res.shard_id]->emplace_back(smpl, res.ls_id); + shards_inner_series[res.shard_id]->emplace_back(smpl, res.ls_id, false); } }, [&](uint32_t ls_id) { if (const auto res = cache.check_target(ls_id); res.status == Cache::CheckResult::kKeep) { - shards_inner_series[shard_id_]->emplace_back(smpl, res.ls_id); + shards_inner_series[shard_id_]->emplace_back(smpl, res.ls_id, false); } }); cache.optimize(); @@ -830,7 +854,7 @@ class PerShardRelabeler { uint32_t ls_id = lss.find_or_emplace(relabeled_serie.ls, relabeled_serie.hash); for (const Primitives::Sample& sample : relabeled_serie.samples) { - inner_series->emplace_back(sample, ls_id); + inner_series->emplace_back(sample, ls_id, false); } relabeler_state_update->emplace_back(relabeled_serie.ls_id, ls_id); } @@ -862,7 +886,7 @@ class PerShardRelabeler { } if (res.status == Cache::CheckResult::kRelabel) { - encoders_inner_series[res.shard_id]->emplace_back(inner_serie.sample, res.ls_id); + encoders_inner_series[res.shard_id]->emplace_back(inner_serie.sample, res.ls_id, false); return; } @@ -880,7 +904,7 @@ class PerShardRelabeler { } const auto& new_label_set = builder_.label_set(); - relabeled_series->emplace_back(new_label_set, BareBones::Vector{inner_serie.sample}, hash_value(new_label_set), inner_serie.ls_id); + relabeled_series->emplace_back(new_label_set, BareBones::Vector{inner_serie.sample}, hash_value(new_label_set), inner_serie.ls_id, false); }); } @@ -963,7 +987,7 @@ class PerGoroutineRelabeler { return changed; } - template + template PROMPP_ALWAYS_INLINE bool input_relabeling_from_cache_internal(InputLSS& input_lss, TargetLSS& target_lss, Cache& cache, @@ -971,12 +995,11 @@ class PerGoroutineRelabeler { const RelabelerOptions& options, Stats& stats, Primitives::Go::SliceView& shards_inner_series, - StNaNsState& stale_nan_state, Primitives::Timestamp def_timestamp) { bool result{true}; size_t samples_count{}; fill_inner_series(hashdex, hashdex.begin(), shards_inner_series, [&](auto& item) { - Cache::CheckResult check_result = cache.check(input_lss, target_lss, timeseries_buf_.label_set(), item.hash()); + const auto check_result = cache.check(input_lss, target_lss, timeseries_buf_.label_set(), item.hash()); switch (check_result.status) { case Cache::CheckResult::kNotFound: { result = false; @@ -986,22 +1009,15 @@ class PerGoroutineRelabeler { case Cache::CheckResult::kKeep: { auto& samples = timeseries_buf_.samples(); const bool all_samples_reseted_to_scrape_ts = resolve_timestamps(def_timestamp, samples, options); - if (options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts) { - stale_nan_state.add_target(check_result.ls_id); - } - - shards_inner_series[shard_id_]->emplace_back(samples, check_result.ls_id); + shards_inner_series[shard_id_]->emplace_back(samples, check_result.ls_id, options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts); break; } case Cache::CheckResult::kRelabel: { auto& samples = timeseries_buf_.samples(); const bool all_samples_reseted_to_scrape_ts = resolve_timestamps(def_timestamp, samples, options); - if (options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts) { - stale_nan_state.add_input(check_result.source_ls_id); - } - - shards_inner_series[check_result.shard_id]->emplace_back(samples, check_result.ls_id); + shards_inner_series[check_result.shard_id]->emplace_back(samples, check_result.ls_id, + options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts); break; } @@ -1022,14 +1038,10 @@ class PerGoroutineRelabeler { return true; }); - if (result) { - add_stale_nans(cache, shards_inner_series, stale_nan_state, def_timestamp); - } - return result; } - template + template PROMPP_ALWAYS_INLINE void input_relabeling_internal(InputLSS& input_lss, TargetLSS& target_lss, Cache& cache, @@ -1039,11 +1051,10 @@ class PerGoroutineRelabeler { Stats& stats, Primitives::Go::SliceView& shards_inner_series, Primitives::Go::SliceView& shards_relabeled_series, - StNaNsState& stale_nan_state, Primitives::Timestamp def_timestamp) { size_t samples_count{}; fill_inner_series(hashdex, skip_shard_inner_series(hashdex, shards_inner_series[shard_id_]->size()), shards_inner_series, [&](auto& item) { - Cache::CheckResult check_result = cache.check(input_lss, target_lss, timeseries_buf_.label_set(), item.hash()); + const auto check_result = cache.check(input_lss, target_lss, timeseries_buf_.label_set(), item.hash()); switch (check_result.status) { case Cache::CheckResult::kNotFound: { builder_.reset(timeseries_buf_.label_set()); @@ -1060,11 +1071,7 @@ class PerGoroutineRelabeler { cache.add_keep(ls_id); auto& samples = timeseries_buf_.samples(); const bool all_samples_reseted_to_scrape_ts = resolve_timestamps(def_timestamp, samples, options); - if (options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts) { - stale_nan_state.add_target(ls_id); - } - - shards_inner_series[shard_id_]->emplace_back(samples, ls_id); + shards_inner_series[shard_id_]->emplace_back(samples, ls_id, options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts); ++stats.series_added; break; @@ -1077,11 +1084,8 @@ class PerGoroutineRelabeler { const size_t new_shard_id = new_hash % number_of_shards_; auto& samples = timeseries_buf_.samples(); const bool all_samples_reseted_to_scrape_ts = resolve_timestamps(def_timestamp, samples, options); - if (options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts) { - stale_nan_state.add_input(ls_id); - } - - shards_relabeled_series[new_shard_id]->emplace_back(new_label_set, samples, new_hash, ls_id); + shards_relabeled_series[new_shard_id]->emplace_back(new_label_set, samples, new_hash, ls_id, + options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts); ++stats.series_added; break; @@ -1094,22 +1098,15 @@ class PerGoroutineRelabeler { case Cache::CheckResult::kKeep: { auto& samples = timeseries_buf_.samples(); const bool all_samples_reseted_to_scrape_ts = resolve_timestamps(def_timestamp, samples, options); - if (options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts) { - stale_nan_state.add_target(check_result.ls_id); - } - - shards_inner_series[shard_id_]->emplace_back(samples, check_result.ls_id); + shards_inner_series[shard_id_]->emplace_back(samples, check_result.ls_id, options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts); break; } case Cache::CheckResult::kRelabel: { auto& samples = timeseries_buf_.samples(); const bool all_samples_reseted_to_scrape_ts = resolve_timestamps(def_timestamp, samples, options); - if (options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts) { - stale_nan_state.add_input(check_result.source_ls_id); - } - - shards_inner_series[check_result.shard_id]->emplace_back(samples, check_result.ls_id); + shards_inner_series[check_result.shard_id]->emplace_back(samples, check_result.ls_id, + options.track_timestamps_staleness || all_samples_reseted_to_scrape_ts); break; } @@ -1130,7 +1127,6 @@ class PerGoroutineRelabeler { return true; }); - add_stale_nans(cache, shards_inner_series, stale_nan_state, def_timestamp); cache.optimize(); } @@ -1217,8 +1213,7 @@ class PerGoroutineRelabeler { Stats& stats, Primitives::Go::SliceView& shards_inner_series, Primitives::Go::SliceView& shards_relabeled_series) { - NoOpStaleNaNsState state{}; - input_relabeling_internal(input_lss, target_lss, cache, hashdex, options, stateless_relabeler, stats, shards_inner_series, shards_relabeled_series, state, + input_relabeling_internal(input_lss, target_lss, cache, hashdex, options, stateless_relabeler, stats, shards_inner_series, shards_relabeled_series, Primitives::kNullTimestamp); } @@ -1230,8 +1225,7 @@ class PerGoroutineRelabeler { const RelabelerOptions& options, Stats& stats, Primitives::Go::SliceView& shards_inner_series) { - NoOpStaleNaNsState state{}; - return input_relabeling_from_cache_internal(input_lss, target_lss, cache, hashdex, options, stats, shards_inner_series, state, Primitives::kNullTimestamp); + return input_relabeling_from_cache_internal(input_lss, target_lss, cache, hashdex, options, stats, shards_inner_series, Primitives::kNullTimestamp); } template @@ -1244,9 +1238,8 @@ class PerGoroutineRelabeler { Stats& stats, Primitives::Go::SliceView& shards_inner_series, Primitives::Go::SliceView& shards_relabeled_series, - StaleNaNsState& state, Primitives::Timestamp def_timestamp) { - input_relabeling_internal(input_lss, target_lss, cache, hashdex, options, stateless_relabeler, stats, shards_inner_series, shards_relabeled_series, state, + input_relabeling_internal(input_lss, target_lss, cache, hashdex, options, stateless_relabeler, stats, shards_inner_series, shards_relabeled_series, def_timestamp); } @@ -1258,9 +1251,8 @@ class PerGoroutineRelabeler { const RelabelerOptions& options, Stats& stats, Primitives::Go::SliceView& shards_inner_series, - StaleNaNsState& state, Primitives::Timestamp def_timestamp) { - return input_relabeling_from_cache_internal(input_lss, target_lss, cache, hashdex, options, stats, shards_inner_series, state, def_timestamp); + return input_relabeling_from_cache_internal(input_lss, target_lss, cache, hashdex, options, stats, shards_inner_series, def_timestamp); } // input_transition_relabeling transparent relabeling. @@ -1272,7 +1264,7 @@ class PerGoroutineRelabeler { fill_inner_series(hashdex, skip_shard_inner_series(hashdex, shards_inner_series[shard_id_]->size()), shards_inner_series, [&](auto& item) { const auto previous_size = target_lss.size(); auto ls_id = target_lss.find_or_emplace(timeseries_buf_.label_set(), item.hash()); - shards_inner_series[shard_id_]->emplace_back(timeseries_buf_.samples(), ls_id); + shards_inner_series[shard_id_]->emplace_back(timeseries_buf_.samples(), ls_id, false); if (target_lss.size() > previous_size) { ++stats.series_added; @@ -1291,7 +1283,7 @@ class PerGoroutineRelabeler { bool result = true; fill_inner_series(hashdex, hashdex.begin(), shards_inner_series, [&](auto& item) { if (auto ls_id = target_lss.find(timeseries_buf_.label_set(), item.hash()); ls_id.has_value()) { - shards_inner_series[shard_id_]->emplace_back(timeseries_buf_.samples(), *ls_id); + shards_inner_series[shard_id_]->emplace_back(timeseries_buf_.samples(), *ls_id, false); stats.samples_added += timeseries_buf_.samples().size(); return true; } @@ -1318,7 +1310,7 @@ class PerGoroutineRelabeler { for (const auto& relabeled_serie : relabeled_series->data()) { uint32_t ls_id = target_lss.find_or_emplace(relabeled_serie.ls, relabeled_serie.hash); - inner_series->emplace_back(relabeled_serie.samples, ls_id); + inner_series->emplace_back(relabeled_serie.samples, ls_id, relabeled_series->is_stale_nan_tracked(relabeled_serie.ls_id)); relabeler_state_update->emplace_back(relabeled_serie.ls_id, ls_id); } } @@ -1346,23 +1338,17 @@ class PerGoroutineRelabeler { } } - template - PROMPP_ALWAYS_INLINE void add_stale_nans(Cache& cache, - Primitives::Go::SliceView& shards_inner_series, - StNaNsState& stale_nan_state, - Primitives::Timestamp def_timestamp) { - const Primitives::Sample smpl{def_timestamp, kStaleNan}; - stale_nan_state.swap( - [&](uint32_t ls_id) { - if (const auto res = cache.check_input(ls_id); res.status == Cache::CheckResult::kRelabel) { - shards_inner_series[res.shard_id]->emplace_back(smpl, res.ls_id); - } - }, - [&](uint32_t ls_id) { - if (const auto res = cache.check_target(ls_id); res.status == Cache::CheckResult::kKeep) { - shards_inner_series[shard_id_]->emplace_back(smpl, res.ls_id); - } - }); + static void track_stale_nans(Primitives::Go::SliceView& inner_series, StaleNaNsState& state, Primitives::Timestamp def_timestamp) { + roaring::Roaring current_state; + + for (const auto& series : inner_series) { + if (series != nullptr && series->size() > 0) { + current_state |= series->tracked_stale_nans(); + } + } + + const Primitives::Sample sample{def_timestamp, kStaleNan}; + state.swap(std::move(current_state), [&](uint32_t ls_id) { inner_series[0]->emplace_back(sample, ls_id, false); }); } }; diff --git a/pp/prometheus/tests/relabeler_tests.cpp b/pp/prometheus/tests/relabeler_tests.cpp index 591fd4dff..fe806f1ea 100644 --- a/pp/prometheus/tests/relabeler_tests.cpp +++ b/pp/prometheus/tests/relabeler_tests.cpp @@ -30,6 +30,7 @@ using PromPP::Prometheus::Relabel::PerShardRelabeler; using PromPP::Prometheus::Relabel::RelabelerStateUpdate; using PromPP::Prometheus::Relabel::relabelStatus; using PromPP::Prometheus::Relabel::StaleNaNsState; +using PromPP::Prometheus::Relabel::StaleNaNsStateDeprecated; using PromPP::Prometheus::Relabel::StatelessRelabeler; using enum PromPP::Prometheus::Relabel::rAction; using enum relabelStatus; @@ -301,7 +302,7 @@ TEST_F(PerGoroutineRelabelerFixture, ReplaceToNewLS2) { // Act relabeler.input_relabeling(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); // Assert EXPECT_EQ(relabeled_results_[0]->size(), 0); @@ -323,7 +324,7 @@ TEST_F(PerGoroutineRelabelerFixture, ReplaceToNewLS3) { // Act relabeler.input_relabeling(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[0], relabeled_results_[0], &update_data); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[0], relabeled_results_[0], &update_data); PerShardRelabeler::update_relabeler_state(cache_, &update_data, 1); // Assert @@ -346,18 +347,20 @@ TEST_F(PerGoroutineRelabelerFixture, InputRelabelingWithStalenans_Default) { RelabelerStateUpdate update_data; // Act - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, 1000); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, - 2000); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 1000); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 1000); + + // Act + relabeler.input_relabeling(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_); + reset(); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 2000); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 2000); // Assert - EXPECT_EQ(relabeled_results_[0]->size(), 0); - EXPECT_EQ(relabeled_results_[1]->size(), 0); - EXPECT_EQ(shards_inner_series_[0]->size(), 0); - EXPECT_EQ(shards_inner_series_[1]->size(), 2); - EXPECT_TRUE(std::ranges::equal(std::vector{{.sample = Sample(1000, 0.1), .ls_id = 0}, {.sample = Sample(2000, kStaleNan), .ls_id = 0}}, - shards_inner_series_[1]->data())); + EXPECT_EQ(shards_inner_series_[0]->size(), 1); + EXPECT_EQ(shards_inner_series_[1]->size(), 0); + EXPECT_TRUE(std::ranges::equal(std::vector{{.sample = Sample(2000, kStaleNan), .ls_id = 0}}, shards_inner_series_[0]->data())); EXPECT_EQ((Stats{.samples_added = 1, .series_added = 1}), stats_); EXPECT_EQ(update_data.size(), 0); } @@ -373,18 +376,18 @@ TEST_F(PerGoroutineRelabelerFixture, InputRelabelingWithStalenans_DefaultHonorTi o_.honor_timestamps = true; // Act - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, 1000); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, - 2000); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 1000); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 1000); + + reset(); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 2000); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 2000); // Assert - EXPECT_EQ(relabeled_results_[0]->size(), 0); - EXPECT_EQ(relabeled_results_[1]->size(), 0); - EXPECT_EQ(shards_inner_series_[0]->size(), 0); - EXPECT_EQ(shards_inner_series_[1]->size(), 2); - EXPECT_TRUE(std::ranges::equal(std::vector{{.sample = Sample(1000, 0.1), .ls_id = 0}, {.sample = Sample(2000, kStaleNan), .ls_id = 0}}, - shards_inner_series_[1]->data())); + EXPECT_EQ(shards_inner_series_[0]->size(), 1); + EXPECT_EQ(shards_inner_series_[1]->size(), 0); + EXPECT_TRUE(std::ranges::equal(std::vector{{.sample = Sample(2000, kStaleNan), .ls_id = 0}}, shards_inner_series_[0]->data())); EXPECT_EQ((Stats{.samples_added = 1, .series_added = 1}), stats_); EXPECT_EQ(update_data.size(), 0); } @@ -399,18 +402,18 @@ TEST_F(PerGoroutineRelabelerFixture, InputRelabelingWithStalenans_WithMetricTime RelabelerStateUpdate update_data; // Act - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, 1000); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, - 2000); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 1000); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 1000); + + reset(); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 2000); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 2000); // Assert - EXPECT_EQ(relabeled_results_[0]->size(), 0); - EXPECT_EQ(relabeled_results_[1]->size(), 0); - EXPECT_EQ(shards_inner_series_[0]->size(), 0); - EXPECT_EQ(shards_inner_series_[1]->size(), 2); - EXPECT_TRUE(std::ranges::equal(std::vector{{.sample = Sample(1000, 0.1), .ls_id = 0}, {.sample = Sample(2000, kStaleNan), .ls_id = 0}}, - shards_inner_series_[1]->data())); + EXPECT_EQ(shards_inner_series_[0]->size(), 1); + EXPECT_EQ(shards_inner_series_[1]->size(), 0); + EXPECT_TRUE(std::ranges::equal(std::vector{{.sample = Sample(2000, kStaleNan), .ls_id = 0}}, shards_inner_series_[0]->data())); EXPECT_EQ((Stats{.samples_added = 1, .series_added = 1}), stats_); EXPECT_EQ(update_data.size(), 0); } @@ -426,10 +429,12 @@ TEST_F(PerGoroutineRelabelerFixture, InputRelabelingWithStalenans_HonorTimestamp RelabelerStateUpdate update_data; // Act - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, 1000); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, - 2000); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 1000); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 1000); + + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 2000); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 2000); // Assert EXPECT_EQ(relabeled_results_[0]->size(), 0); @@ -453,18 +458,18 @@ TEST_F(PerGoroutineRelabelerFixture, InputRelabelingWithStalenans_HonorTimestamp RelabelerStateUpdate update_data; // Act - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, 1000); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); - relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, state, - 2000); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 1000); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 1000); + + reset(); + relabeler.input_relabeling_with_stalenans(lss_, lss_, cache_, HashdexTest{}, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_, 2000); + PerGoroutineRelabeler::track_stale_nans(shards_inner_series_, state, 2000); // Assert - EXPECT_EQ(relabeled_results_[0]->size(), 0); - EXPECT_EQ(relabeled_results_[1]->size(), 0); - EXPECT_EQ(shards_inner_series_[0]->size(), 0); - EXPECT_EQ(shards_inner_series_[1]->size(), 2); - EXPECT_TRUE(std::ranges::equal(std::vector{{.sample = Sample(1500, 0.1), .ls_id = 0}, {.sample = Sample(2000, kStaleNan), .ls_id = 0}}, - shards_inner_series_[1]->data())); + EXPECT_EQ(shards_inner_series_[0]->size(), 1); + EXPECT_EQ(shards_inner_series_[1]->size(), 0); + EXPECT_TRUE(std::ranges::equal(std::vector{{.sample = Sample(2000, kStaleNan), .ls_id = 0}}, shards_inner_series_[0]->data())); EXPECT_EQ((Stats{.samples_added = 1, .series_added = 1}), stats_); EXPECT_EQ(update_data.size(), 0); } @@ -480,7 +485,7 @@ TEST_F(PerGoroutineRelabelerFixture, TargetLabels_HappyPath) { // Act relabeler.input_relabeling(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[1], relabeled_results_[1], &update_data); PerShardRelabeler::update_relabeler_state(cache_, &update_data, 1); // Assert @@ -505,7 +510,7 @@ TEST_F(PerGoroutineRelabelerFixture, TargetLabels_ExportedLabel) { // Act relabeler.input_relabeling(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[0], relabeled_results_[0], &update_data); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[0], relabeled_results_[0], &update_data); PerShardRelabeler::update_relabeler_state(cache_, &update_data, 1); // Assert @@ -531,7 +536,7 @@ TEST_F(PerGoroutineRelabelerFixture, TargetLabels_ExportedLabel_Honor) { // Act relabeler.input_relabeling(lss_, lss_, cache_, hx_, o_, stateless_relabeler, stats_, shards_inner_series_, relabeled_results_); - PerShardRelabeler::append_relabeler_series(lss_, shards_inner_series_[0], relabeled_results_[0], &update_data); + PerGoroutineRelabeler::append_relabeler_series(lss_, shards_inner_series_[0], relabeled_results_[0], &update_data); PerShardRelabeler::update_relabeler_state(cache_, &update_data, 1); // Assert @@ -815,13 +820,13 @@ INSTANTIATE_TEST_SUITE_P(Invalid, ValidateCase{.value = "a\xc5z", .expected = false}, ValidateCase{.value = "\x80\x8F\x90\x9FzxcasdAA:", .expected = false})); -class StaleNaNsStateFixture : public testing::Test {}; +class StaleNaNsStateDeprecatedFixture : public testing::Test {}; -TEST_F(StaleNaNsStateFixture, Swap) { +TEST_F(StaleNaNsStateDeprecatedFixture, Swap) { // Arrange static constexpr uint32_t kCurrentLsId = 42; - StaleNaNsState result; + StaleNaNsStateDeprecated result; result.add_input(kCurrentLsId); std::vector input_ls_ids; diff --git a/pp/wal/decoder.h b/pp/wal/decoder.h index eeff67233..e205be515 100644 --- a/pp/wal/decoder.h +++ b/pp/wal/decoder.h @@ -103,9 +103,7 @@ class GenericDecoder { decoder_.process_segment([&last_ls_id, &samples, &container](uint32_t ls_id, int64_t ts, double v) PROMPP_LAMBDA_INLINE { if (ls_id != last_ls_id) { if (!samples.empty()) { - for (const PromPP::Primitives::Sample& sample : samples) { - container.emplace_back(sample, last_ls_id); - } + container.emplace_back(samples, last_ls_id, false); } samples.clear(); last_ls_id = ls_id; @@ -114,9 +112,7 @@ class GenericDecoder { }); if (!samples.empty()) { - for (const PromPP::Primitives::Sample& sample : samples) { - container.emplace_back(sample, last_ls_id); - } + container.emplace_back(samples, last_ls_id, false); } }