Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions pp/go/cppbridge/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,15 @@ type Sample struct {

// HeadDataStorage is Go wrapper around series_data::Data_storage.
type HeadDataStorage struct {
dataStorage uintptr
gcDestroyDetector *uint64
timeInterval atomic.Pointer[TimeInterval]
dataStorage uintptr
timeInterval atomic.Pointer[TimeInterval]
}

// NewHeadDataStorage - constructor.
func NewHeadDataStorage() *HeadDataStorage {
ds := &HeadDataStorage{
dataStorage: seriesDataDataStorageCtor(),
gcDestroyDetector: &gcDestroyDetector,
timeInterval: atomic.Pointer[TimeInterval]{},
dataStorage: seriesDataDataStorageCtor(),
timeInterval: atomic.Pointer[TimeInterval]{},
}
ds.timeInterval.Store(newInvalidTimeIntervalPtr())

Expand Down
89 changes: 89 additions & 0 deletions rules/concurrency_executer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package rules

import (
"sync"
)

// ConcurrencyExecuter executes eval rules in parallel in pre-launched goroutines.
type ConcurrencyExecuter interface {
// Execute eval rules in parallel in pre-launched goroutines via queue.
Execute(fn func())

// Run worker goroutines.
Run()

// Stop send signal for stop launched goroutines and waits until all goroutines stop.
Stop()
}

// ConcurrentRuleEvalExecuter executes eval rules in parallel in pre-launched goroutines,
// if there are no free goroutines, then it is executed on the calling goroutine.
type ConcurrentRuleEvalExecuter struct {
queue chan func()
stop chan struct{}
wg sync.WaitGroup
maxConcurrency int
}

// NewConcurrentRuleEvalExecuter init new [ConcurrentRuleEvalExecuter].
func NewConcurrentRuleEvalExecuter(maxConcurrency int) *ConcurrentRuleEvalExecuter {
return &ConcurrentRuleEvalExecuter{
queue: make(chan func()),
stop: make(chan struct{}),
wg: sync.WaitGroup{},
maxConcurrency: maxConcurrency,
}
}

// Execute eval rules in parallel in pre-launched goroutines via queue.
func (e *ConcurrentRuleEvalExecuter) Execute(fn func()) {
select {
case e.queue <- fn:
default:
fn()
}
}

// Run worker goroutines.
func (e *ConcurrentRuleEvalExecuter) Run() {
if e.isStopped() {
return
}

e.wg.Add(e.maxConcurrency)
for range e.maxConcurrency {
go e.workerLoop()
}
}

// Stop send signal for stop launched goroutines and waits until all goroutines stop.
func (e *ConcurrentRuleEvalExecuter) Stop() {
close(e.stop)
e.wg.Wait()
}

// isStopped check goroutines is stopped.
func (e *ConcurrentRuleEvalExecuter) isStopped() bool {
select {
case <-e.stop:
return true

default:
return false
}
}

// workerLoop main workers goroutines.
func (e *ConcurrentRuleEvalExecuter) workerLoop() {
defer e.wg.Done()

for {
select {
case <-e.stop:
return

case fn := <-e.queue:
fn()
}
}
}
70 changes: 35 additions & 35 deletions rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ type Group struct {
// defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc

// concurrencyController controls the rules evaluation concurrency.
concurrencyController RuleConcurrencyController
// // concurrencyController controls the rules evaluation concurrency.
// concurrencyController RuleConcurrencyController

// concurrencyExecuter controls the rules evaluation concurrency.
concurrencyExecuter ConcurrencyExecuter
}

// GroupEvalIterationFunc is used to implement and extend rule group
Expand Down Expand Up @@ -120,29 +123,30 @@ func NewGroup(o GroupOptions) *Group {
evalIterationFunc = DefaultEvalIterationFunc
}

concurrencyController := o.Opts.RuleConcurrencyController
if concurrencyController == nil {
concurrencyController = sequentialRuleEvalController{}
}
// concurrencyController := o.Opts.RuleConcurrencyController
// if concurrencyController == nil {
// concurrencyController = sequentialRuleEvalController{}
// }

return &Group{
name: o.Name,
file: o.File,
interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
seriesInCurrentEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
concurrencyController: concurrencyController,
name: o.Name,
file: o.File,
interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
seriesInCurrentEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
// concurrencyController: concurrencyController,
concurrencyExecuter: o.Opts.ConcurrencyExecuter,
}
}

Expand Down Expand Up @@ -495,7 +499,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
bs = g.opts.Batcher.BatchStorage()
)

if g.concurrencyController.IsConcurrent() {
if g.opts.ConcurrencyExecuter != nil {
samplesTotal = g.concurrencyEval(ctx, ts, bs)
} else {
samplesTotal = g.sequentiallyEval(ctx, ts, g.rules, bs)
Expand Down Expand Up @@ -1106,29 +1110,25 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, bs storage.Ba
default:
}

if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) {
wg.Add(1)

go concurrencyEval(i, rule, func() {
wg.Done()
ctrl.Done(ctx)
})
sequentiallyRules[i] = nil // placeholder for the series
} else {
if !rule.NoDependencyRules() {
sequentiallyRules[i] = rule
continue
}

wg.Add(1)
g.concurrencyExecuter.Execute(func() { concurrencyEval(i, g.rules[i], func() { wg.Done() }) })
sequentiallyRules[i] = nil // placeholder for the series
}

wg.Wait()

if err := concurrencyApp.Commit(); err != nil {
groupKey := GroupKey(g.File(), g.Name())
for i := range g.rules {
if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, g.rules[i]) {
if g.rules[i].NoDependencyRules() {
g.rules[i].SetHealth(HealthBad)
g.rules[i].SetLastError(err)
g.metrics.EvalFailures.WithLabelValues(groupKey).Inc()
ctrl.Done(ctx)
}
}

Expand Down
Loading
Loading