Skip to content
Merged
Show file tree
Hide file tree
Changes from 128 commits
Commits
Show all changes
139 commits
Select commit Hold shift + click to select a range
3943d07
for save
u-veles-a Jul 29, 2025
0bd578c
for save
u-veles-a Aug 4, 2025
574c9f8
for save
u-veles-a Aug 5, 2025
63dc680
for save
u-veles-a Aug 5, 2025
6fddb82
for save
u-veles-a Aug 5, 2025
841f247
for save
u-veles-a Aug 7, 2025
a4f4640
for save
u-veles-a Aug 7, 2025
1c6b565
for save
u-veles-a Aug 7, 2025
a74334b
Merge branch 'pp' into rebuild_heads_container
u-veles-a Aug 7, 2025
9c4fa48
fix after merge
u-veles-a Aug 7, 2025
e36f2eb
rebuild querier, shard
u-veles-a Aug 7, 2025
641e852
for save
u-veles-a Aug 8, 2025
97c26b2
fix head
u-veles-a Aug 11, 2025
10deda4
Merge branch 'pp' into rebuild_heads_container
u-veles-a Aug 11, 2025
d0c43ce
add PerGoroutineRelabeler
u-veles-a Aug 12, 2025
1720ab6
Merge branch 'pp' into rebuild_heads_container
u-veles-a Aug 12, 2025
7c7896b
refactoring PerGoroutineRelabeler
u-veles-a Aug 14, 2025
4eb55a1
state and StatelessRelabeler
u-veles-a Aug 14, 2025
696c6f5
Merge branch 'pp' into rebuild_heads_container
u-veles-a Aug 22, 2025
85b2b0d
add catalog appender
u-veles-a Aug 26, 2025
141d22d
Merge branch 'pp' into rebuild_heads_container
u-veles-a Aug 26, 2025
2df6e10
add ready notifier
u-veles-a Aug 26, 2025
40b0e44
add builder
u-veles-a Aug 26, 2025
250828e
for save
u-veles-a Aug 26, 2025
da66d5d
for save
u-veles-a Aug 27, 2025
7f37a19
for save
u-veles-a Aug 28, 2025
3f99fe3
add chunkqurier
u-veles-a Aug 28, 2025
1c3c8db
rebuild
u-veles-a Aug 28, 2025
3fa04d5
fix fatal
u-veles-a Aug 28, 2025
f6c21c4
for save
u-veles-a Aug 29, 2025
76d1420
for save
u-veles-a Sep 1, 2025
098ff7d
for save
u-veles-a Sep 2, 2025
72bb59e
for save
u-veles-a Sep 4, 2025
4bca70a
for save
u-veles-a Sep 8, 2025
fbaca43
Merge branch 'pp' into rebuild_heads_container
u-veles-a Sep 8, 2025
42192e8
fix merge
u-veles-a Sep 8, 2025
df733d2
manager, mediator and minor edits
u-veles-a Sep 9, 2025
b1ea87f
adapter
u-veles-a Sep 10, 2025
d010bd6
adapter
u-veles-a Sep 11, 2025
d9e0fac
fix walreader
u-veles-a Sep 11, 2025
64d12f0
rebuild sreader
u-veles-a Sep 11, 2025
a57a26e
for save
u-veles-a Sep 11, 2025
7a7e259
for save
u-veles-a Sep 16, 2025
6f48484
rebuild state
u-veles-a Sep 18, 2025
e82d6a0
some fix
u-veles-a Sep 19, 2025
407c001
add test
u-veles-a Sep 22, 2025
552e885
add test
u-veles-a Sep 23, 2025
363bbf1
add test
u-veles-a Sep 23, 2025
9ced693
add wal reader test
u-veles-a Sep 24, 2025
a2247d9
for save
u-veles-a Sep 24, 2025
820ec16
Head keeper (#154)
cherep58 Sep 24, 2025
20f5c2f
Merge branch 'rebuild_heads_container' of github.com:deckhouse/prompp…
u-veles-a Sep 24, 2025
fb9a2e9
prototype, for save
u-veles-a Sep 24, 2025
1e816ce
debug
u-veles-a Sep 25, 2025
c8eeeaa
fixed roaring include error in CLion
cherep58 Sep 25, 2025
5e786dc
fixed linter warnings
cherep58 Sep 25, 2025
041b5bf
add log, letrics
u-veles-a Sep 25, 2025
6c0f8c4
smal fix
u-veles-a Sep 25, 2025
f2cf40d
Merge branch 'pp' into rebuild_heads_container
u-veles-a Sep 25, 2025
2a4d346
small fix
u-veles-a Sep 25, 2025
0527236
small fix
u-veles-a Sep 25, 2025
bb81c0a
some fix
u-veles-a Sep 26, 2025
02d56d7
fix metrics
u-veles-a Sep 26, 2025
cca12a7
fix state
u-veles-a Sep 26, 2025
55a2b5f
smal fix
u-veles-a Sep 29, 2025
e824b12
Merge branch 'pp' into rebuild_heads_container
u-veles-a Sep 29, 2025
00581e0
for save
u-veles-a Sep 29, 2025
2b46965
some fix
u-veles-a Sep 29, 2025
b4c1f68
some rebuild
u-veles-a Sep 29, 2025
a780b29
for save
u-veles-a Sep 29, 2025
5a804a1
some fix
u-veles-a Sep 30, 2025
8989ed9
refactored tests for relabeler
cherep58 Sep 30, 2025
061fa32
for save
u-veles-a Sep 30, 2025
ca54456
add notify to catalogGC
u-veles-a Sep 30, 2025
0039251
for save
u-veles-a Sep 30, 2025
6ae6659
added unit test for skip empty block
cherep58 Sep 30, 2025
7c321ca
Merge branch 'rebuild_heads_container' of github.com:deckhouse/prompp…
u-veles-a Oct 1, 2025
855d891
refactored stateless relabeler
cherep58 Oct 1, 2025
4e7d1ba
renamed LabelsBuilderStateMap to LabelsBuilder
cherep58 Oct 1, 2025
e79a043
refactored relabeler tests
cherep58 Oct 2, 2025
ea774f2
rewritten test for PerGoroutineRelabelerFixture
cherep58 Oct 2, 2025
087fbbb
refactored PerGoroutineRelabeler
cherep58 Oct 2, 2025
e13b156
fixed clang-tidy warning
cherep58 Oct 2, 2025
42c25d9
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 2, 2025
ea7a819
used std::string instead of std::ostringstream
cherep58 Oct 2, 2025
a6ffc25
optimized hard_validate function
cherep58 Oct 2, 2025
d421a5a
Rebuild head container lss snapshot copy (#161)
u-veles-a Oct 3, 2025
316d4d0
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 3, 2025
0f13ae0
add test
u-veles-a Oct 3, 2025
9ec7715
fixed bug
cherep58 Oct 6, 2025
9c4a4bb
fix test
u-veles-a Oct 6, 2025
3645096
fix test
u-veles-a Oct 6, 2025
6a256c8
used reserve_and_write
cherep58 Oct 6, 2025
322cf7b
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 6, 2025
d64b42f
add test state
u-veles-a Oct 6, 2025
c70b0e0
add test head wal
u-veles-a Oct 6, 2025
cfb0f6e
add test PerGoroutineRelabeler
u-veles-a Oct 6, 2025
8cdd96a
add twst wal
u-veles-a Oct 7, 2025
cbce844
add descriptions
u-veles-a Oct 7, 2025
3e91ef1
extracted stale nan tracking into separate method from relabeling pro…
cherep58 Oct 7, 2025
90eba40
add test querier
u-veles-a Oct 7, 2025
e8d9411
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 7, 2025
90080cc
Merge branch 'stalenan_inner_series' of https://github.com/deckhouse/…
cherep58 Oct 7, 2025
aed6be6
for save
u-veles-a Oct 7, 2025
fce6395
add test head
u-veles-a Oct 8, 2025
21f42c6
fixed bug in RelabelConfig with mutable variable
cherep58 Oct 8, 2025
bef44a3
Merge branch 'stalenan_inner_series' of https://github.com/deckhouse/…
cherep58 Oct 8, 2025
bc3bf1d
for save
u-veles-a Oct 8, 2025
d745cf7
fix test mediator
u-veles-a Oct 8, 2025
42aa0db
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 8, 2025
144e5b1
fix types
u-veles-a Oct 8, 2025
cca2324
Merge branch 'stalenan_inner_series' of https://github.com/deckhouse/…
cherep58 Oct 8, 2025
1f4b045
fixed unit tests
cherep58 Oct 8, 2025
a40214a
add commiter test
u-veles-a Oct 8, 2025
3a2c3a1
add merger test
u-veles-a Oct 9, 2025
bd6b4a3
for save
u-veles-a Oct 9, 2025
e7ff43c
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 9, 2025
9b952b1
Merge branch 'stalenan_inner_series' of https://github.com/deckhouse/…
cherep58 Oct 9, 2025
fde36f8
Lss snapshot copy (#145)
gshigin Oct 9, 2025
1f1acf5
Refactored relabeler classes (#162)
cherep58 Oct 9, 2025
b457a93
Merge branch 'pp' into rebuild_heads_container
u-veles-a Oct 9, 2025
ee32511
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 9, 2025
0c8b359
fix after merge
u-veles-a Oct 9, 2025
367f691
fixed state destruction at GC
cherep58 Oct 9, 2025
86a3d5b
add rotator test
u-veles-a Oct 10, 2025
03fde5a
some fix
u-veles-a Oct 10, 2025
06342f8
fix test
u-veles-a Oct 10, 2025
2d40a3e
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 10, 2025
bf3f4de
fix review
u-veles-a Oct 13, 2025
97f2f5d
add wal metrics
u-veles-a Oct 14, 2025
6242519
fix wal notifier
u-veles-a Oct 14, 2025
762a6fd
fix test
u-veles-a Oct 14, 2025
010493c
fix after review
u-veles-a Oct 15, 2025
c1eb20c
Merge branch 'rebuild_heads_container' into stalenan_inner_series_impl
cherep58 Oct 15, 2025
a7440c4
fix review
u-veles-a Oct 16, 2025
2fff699
for save
u-veles-a Oct 22, 2025
ee017e1
fix after review
u-veles-a Oct 22, 2025
389690a
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Oct 22, 2025
323f37c
Merge branch 'pp' into stalenan_inner_series_impl
vporoshok Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 116 additions & 80 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ import (
"syscall"
"time"

pphandler "github.com/prometheus/prometheus/pp-pkg/handler"
rwprocessor "github.com/prometheus/prometheus/pp-pkg/handler/processor"
pptsdb "github.com/prometheus/prometheus/pp-pkg/tsdb"

"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/alecthomas/kingpin/v2"
"github.com/alecthomas/units"
Expand All @@ -61,15 +57,21 @@ import (
"k8s.io/klog"
klogv2 "k8s.io/klog/v2"

"github.com/prometheus/prometheus/pp-pkg/receiver" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp-pkg/remote" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp-pkg/scrape" // PP_CHANGES.md: rebuild on cpp
pp_pkg_storage "github.com/prometheus/prometheus/pp-pkg/storage" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/appender" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head/catalog" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/head/ready" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/relabeler/remotewriter" // PP_CHANGES.md: rebuild on cpp
pp_pkg_handler "github.com/prometheus/prometheus/pp-pkg/handler" // PP_CHANGES.md: rebuild on cpp
rwprocessor "github.com/prometheus/prometheus/pp-pkg/handler/processor" // PP_CHANGES.md: rebuild on cpp
pp_pkg_logger "github.com/prometheus/prometheus/pp-pkg/logger" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp-pkg/remote" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp-pkg/scrape" // PP_CHANGES.md: rebuild on cpp
pp_pkg_storage "github.com/prometheus/prometheus/pp-pkg/storage" // PP_CHANGES.md: rebuild on cpp
pp_pkg_tsdb "github.com/prometheus/prometheus/pp-pkg/tsdb" // PP_CHANGES.md: rebuild on cpp

pp_storage "github.com/prometheus/prometheus/pp/go/storage" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/catalog" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/head/head" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/head/services" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/querier" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/ready" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/remotewriter" // PP_CHANGES.md: rebuild on cpp

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
Expand Down Expand Up @@ -720,9 +722,10 @@ func main() {
os.Exit(1)
}

reloadBlocksTriggerNotifier := receiver.NewReloadBlocksTriggerNotifier()
pp_pkg_logger.InitLogHandler(log.With(logger, "component", "pp"))

reloadBlocksTriggerNotifier := pp_storage.NewTriggerNotifier()
cfg.tsdb.ReloadBlocksExternalTrigger = reloadBlocksTriggerNotifier
ctxReceiver, cancelReceiver := context.WithCancel(context.Background())

dataDir, err := filepath.Abs(localStoragePath)
if err != nil {
Expand All @@ -742,44 +745,59 @@ func main() {
}

clock := clockwork.NewRealClock()
headCatalog, err := catalog.New(clock, fileLog, catalog.DefaultIDGenerator{}, int(catalogMaxLogFileSize), prometheus.DefaultRegisterer)
headCatalog, err := catalog.New(
clock,
fileLog,
catalog.DefaultIDGenerator{},
int(catalogMaxLogFileSize),
prometheus.DefaultRegisterer,
)
if err != nil {
level.Error(logger).Log("msg", "failed to create head catalog", "err", err)
os.Exit(1)
}

receiverReadyNotifier := ready.NewNotifiableNotifier()
// create receiver
receiver, err := receiver.NewReceiver(
ctxReceiver,
log.With(logger, "component", "receiver"),
prometheus.DefaultRegisterer,
receiverConfig,
localStoragePath,
cfgFile.RemoteWriteConfigs,
localStoragePath,
receiver.RotationInfo{
BlockDuration: time.Duration(cfg.tsdb.MinBlockDuration),
Seed: cfgFile.GlobalConfig.ExternalLabels.Hash(),
removedHeadTriggerNotifier := pp_storage.NewTriggerNotifier()
hManagerReadyNotifier := ready.NewNotifiableNotifier()
hManager, err := pp_storage.NewManager(
&pp_storage.Options{
Seed: cfgFile.GlobalConfig.ExternalLabels.Hash(),
BlockDuration: time.Duration(cfg.tsdb.MinBlockDuration),
CommitInterval: time.Duration(cfg.WalCommitInterval),
MaxRetentionPeriod: time.Duration(cfg.tsdb.RetentionDuration),
HeadRetentionPeriod: time.Duration(cfg.HeadRetentionTimeout),
KeeperCapacity: 2,
DataDir: localStoragePath,
MaxSegmentSize: cfg.WalMaxSamplesPerSegment,
NumberOfShards: receiverConfig.NumberOfShards,
},
clock,
headCatalog,
reloadBlocksTriggerNotifier,
receiverReadyNotifier,
time.Duration(cfg.WalCommitInterval),
time.Duration(cfg.tsdb.RetentionDuration),
time.Duration(cfg.HeadRetentionTimeout),
// x3 ScrapeInterval timeout for write block
time.Duration(cfgFile.GlobalConfig.ScrapeInterval*3),
cfg.WalMaxSamplesPerSegment,
appender.UnloadDataStorage,
removedHeadTriggerNotifier,
hManagerReadyNotifier,
prometheus.DefaultRegisterer,
)
if err != nil {
level.Error(logger).Log("msg", "failed to create a receiver", "err", err)
level.Error(logger).Log("msg", "failed to create a head manager", "err", err)
os.Exit(1)
}

remoteWriterReadyNotifier := ready.NewNotifiableNotifier()
remoteWriter := remotewriter.New(dataDir, headCatalog, clock, remoteWriterReadyNotifier, prometheus.DefaultRegisterer)
remoteWriter := remotewriter.New(
dataDir,
headCatalog,
clock,
remoteWriterReadyNotifier,
prometheus.DefaultRegisterer,
)

adapter := pp_pkg_storage.NewAdapter(
clock,
hManager.Proxy(),
hManager.MergeOutOfOrderChunks,
prometheus.DefaultRegisterer,
)

// PP_CHANGES.md: rebuild on cpp end

Expand All @@ -790,14 +808,11 @@ func main() {
// PP_CHANGES.md: rebuild on cpp start
remoteRead = pp_pkg_storage.NewRemoteRead(
log.With(logger, "component", "remote"),
prometheus.DefaultRegisterer,
localStorage.StartTime,
localStoragePath,
time.Duration(cfg.RemoteFlushDeadline),
)
fanoutStorage = storage.NewFanout(
logger,
pp_pkg_storage.NewQueryableStorage(receiver),
adapter,
localStorage,
remoteRead,
)
Expand Down Expand Up @@ -875,7 +890,7 @@ func main() {
scrapeManager, err := scrape.NewManager(
&cfg.scrape,
log.With(logger, "component", "scrape manager"),
receiver,
adapter,
prometheus.DefaultRegisterer,
)
if err != nil {
Expand Down Expand Up @@ -934,8 +949,8 @@ func main() {
queryEngine = promql.NewEngine(opts)

ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: receiver, // PP_CHANGES.md: rebuild on cpp
Queryable: receiver, // PP_CHANGES.md: rebuild on cpp
Appendable: adapter, // PP_CHANGES.md: rebuild on cpp
Queryable: adapter, // PP_CHANGES.md: rebuild on cpp
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
Expand Down Expand Up @@ -992,7 +1007,7 @@ func main() {
}

// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager.
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web, receiver) // PP_CHANGES.md: rebuild on cpp
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web, adapter) // PP_CHANGES.md: rebuild on cpp

// Monitor outgoing connections on default transport with conntrack.
http.DefaultTransport.(*http.Transport).DialContext = conntrack.NewDialContextFunc(
Expand All @@ -1004,8 +1019,8 @@ func main() {

reloaders := []reloader{
{ // PP_CHANGES.md: rebuild on cpp start
name: "receiver",
reloader: receiver.ApplyConfig,
name: "head_manager",
reloader: hManager.ApplyConfig,
}, { // PP_CHANGES.md: rebuild on cpp end
name: "db_storage",
reloader: localStorage.ApplyConfig,
Expand Down Expand Up @@ -1141,8 +1156,19 @@ func main() {
os.Exit(1)
}

multiNotifiable := ready.New().With(receiverReadyNotifier).With(remoteWriterReadyNotifier).Build()
opGC := catalog.NewGC(dataDir, headCatalog, multiNotifiable)
multiNotifiable := ready.NewMultiNotifiableBuilder().Add(
hManagerReadyNotifier,
).Add(
remoteWriterReadyNotifier,
).Build()
opGC := catalog.NewGC(
dataDir,
headCatalog,
clock,
multiNotifiable,
removedHeadTriggerNotifier,
time.Duration(cfg.tsdb.RetentionDuration),
)

var g run.Group
{
Expand Down Expand Up @@ -1340,7 +1366,7 @@ func main() {
return fmt.Errorf("opening storage failed: %w", err)
}

tsdb.DBSetBlocksToDelete(db, pptsdb.PPBlocksToDelete(db, dataDir, headCatalog))
tsdb.DBSetBlocksToDelete(db, pp_pkg_tsdb.PPBlocksToDelete(db, dataDir, headCatalog))
switch fsType := prom_runtime.Statfs(localStoragePath); fsType {
case "NFS_SUPER_MAGIC":
level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
Expand Down Expand Up @@ -1389,7 +1415,7 @@ func main() {
db, err := agent.Open(
logger,
prometheus.DefaultRegisterer,
receiver, // PP_CHANGES.md: rebuild on cpp
adapter, // PP_CHANGES.md: rebuild on cpp
localStoragePath,
&opts,
)
Expand Down Expand Up @@ -1444,37 +1470,42 @@ func main() {
)
}
{ // PP_CHANGES.md: rebuild on cpp start
// run receiver.
// run head manager.
cancel := make(chan struct{})
g.Add(
func() error {
<-dbOpen
return receiver.Run(ctxReceiver)
select {
case <-dbOpen:
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
return nil
}

return hManager.Run()
},
func(err error) {
receiverCancelCtx, receiverCancelCtxCancel := context.WithCancel(ctxReceiver)
defer receiverCancelCtxCancel()

level.Info(logger).Log("msg", "Stopping Receiver...")
if err := receiver.Shutdown(receiverCancelCtx); err != nil {
level.Error(logger).Log("msg", "Receiver shutdown failed", "err", err)
level.Info(logger).Log("msg", "Stopping head manager...", "msg", err)
close(cancel)
if err := hManager.Shutdown(context.Background()); err != nil {
level.Error(logger).Log("msg", "Head manager shutdown failed", "err", err)
}
cancelReceiver()
level.Info(logger).Log("msg", "Head manager stopped.")
},
)
} // PP_CHANGES.md: rebuild on cpp end
{ // PP_CHANGES.md: rebuild on cpp start
g.Add(
func() error { return <-head.UnrecoverableErrorChan },
func() error {
return <-querier.UnrecoverableErrorChan
},
func(err error) {
select {
case head.UnrecoverableErrorChan <- nil:
// stop execute func if need
default:
}
// stop execute func if need
querier.SendUnrecoverableError(nil)

if errors.Is(err, head.UnrecoverableError{}) {
if errors.Is(err, querier.UnrecoverableError{}) {
level.Error(logger).Log("msg", "Received unrecoverable error", "err", err)
}
level.Info(logger).Log("msg", "Unrecoverable Error Handler stopped.")
},
)
} // PP_CHANGES.md: rebuild on cpp end
Expand Down Expand Up @@ -1526,6 +1557,10 @@ func main() {
}

// PP_CHANGES.md: rebuild on cpp start the engine is really no longer in use before calling this to avoid
if err := fileLog.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close file log", "err", err)
}

if err := queryEngine.Close(); err != nil {
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
}
Expand Down Expand Up @@ -1969,6 +2004,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
EnableDelayedCompaction: opts.EnableDelayedCompaction,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
ReloadBlocksExternalTrigger: opts.ReloadBlocksExternalTrigger,
}
}

Expand Down Expand Up @@ -2056,7 +2092,7 @@ func readPromPPFeatures(logger log.Logger) {
fname, fvalue, _ := strings.Cut(feature, "=")
switch strings.TrimSpace(fname) {
case "head_copy_series_on_rotate":
appender.CopySeriesOnRotate = true
services.CopySeriesOnRotate = true
level.Info(logger).Log(
"msg",
"[FEATURE] Copying active series from current head to new head during rotation is enabled.",
Expand All @@ -2076,7 +2112,7 @@ func readPromPPFeatures(logger log.Logger) {
}
}

head.ExtraReadConcurrency = v
head.ExtraWorkers = v
level.Info(logger).Log(
"msg",
"[FEATURE] Concurrency reading is enabled.",
Expand All @@ -2089,7 +2125,7 @@ func readPromPPFeatures(logger log.Logger) {
if fvalue == "" {
level.Error(logger).Log(
"msg", "[FEATURE] The default number of shards is empty, no changes.",
"default_number_of_shards", receiver.DefaultNumberOfShards,
"default_number_of_shards", pp_storage.DefaultNumberOfShards,
)

continue
Expand All @@ -2100,36 +2136,36 @@ func readPromPPFeatures(logger log.Logger) {
case err != nil:
level.Error(logger).Log(
"msg", "[FEATURE] Error parsing head_numbehead_default_number_of_shardsr_of_shards value",
"default_number_of_shards", receiver.DefaultNumberOfShards,
"default_number_of_shards", pp_storage.DefaultNumberOfShards,
"err", err,
)

case v > math.MaxUint16:
level.Error(logger).Log(
"msg", "[FEATURE] The default number of shards is overflow(max 65535), no changes.",
"default_number_of_shards", receiver.DefaultNumberOfShards,
"default_number_of_shards", pp_storage.DefaultNumberOfShards,
)

case v < 1:
level.Error(logger).Log(
"msg", "[FEATURE] The default number of shards is incorrect(min 1), no changes.",
"default_number_of_shards", receiver.DefaultNumberOfShards,
"default_number_of_shards", pp_storage.DefaultNumberOfShards,
)

default:
receiver.DefaultNumberOfShards = uint16(v)
pp_storage.DefaultNumberOfShards = uint16(v)
level.Info(logger).Log(
"msg", "[FEATURE] Changed default number of shards.",
"default_number_of_shards", receiver.DefaultNumberOfShards,
"default_number_of_shards", pp_storage.DefaultNumberOfShards,
)
}

case "disable_commits_on_remote_write":
rwprocessor.AlwaysCommit = false
pphandler.OTLPAlwaysCommit = false
pp_pkg_handler.OTLPAlwaysCommit = false

case "unload_data_storage":
appender.UnloadDataStorage = true
pp_storage.UnloadDataStorage = true
_ = level.Info(logger).Log("msg", "[FEATURE] Data storage unloading is enabled.")
}
}
Expand Down
Loading
Loading