Skip to content

Commit 0e264fb

Browse files
committed
Move Rule evaluation to Source
This is a first version to move the rule evaluation from Icinga Notifications into the source, starting with Icinga DB. For an end user or another computer, the /process-event API endpoint was slightly modified. Two new HTTP request headers were introduced to tell which rules match and to share the state. Another new /event-rules API endpoint allows querying all rules.
1 parent 266f62d commit 0e264fb

File tree

5 files changed

+137
-52
lines changed

5 files changed

+137
-52
lines changed

internal/config/rule.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ func (r *RuntimeConfig) applyPendingRules() {
3939
}
4040

4141
// ObjectFilter{,Expr} are being initialized by config.IncrementalConfigurableInitAndValidatable.
42-
curElement.ObjectFilter = update.ObjectFilter
4342
curElement.ObjectFilterExpr = update.ObjectFilterExpr
4443

4544
return nil

internal/config/verify.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,6 @@ func (r *RuntimeConfig) debugVerifyRule(id int64, rule *rule.Rule) error {
248248
}
249249
}
250250

251-
if rule.ObjectFilterExpr.Valid && rule.ObjectFilter == nil {
252-
return fmt.Errorf("rule has a ObjectFilterExpr but ObjectFilter is nil")
253-
}
254-
255251
for escalationID, escalation := range rule.Escalations {
256252
if escalation == nil {
257253
return fmt.Errorf("rule.Escalations[%d] is nil", escalationID)

internal/incident/incident.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"sync"
8-
"time"
9-
107
"github.com/icinga/icinga-go-library/database"
118
"github.com/icinga/icinga-go-library/types"
129
"github.com/icinga/icinga-notifications/internal/config"
@@ -18,6 +15,10 @@ import (
1815
"github.com/icinga/icinga-notifications/internal/rule"
1916
"github.com/jmoiron/sqlx"
2017
"go.uber.org/zap"
18+
"strconv"
19+
"strings"
20+
"sync"
21+
"time"
2122
)
2223

2324
type ruleID = int64
@@ -178,7 +179,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error {
178179

179180
// Check if any (additional) rules match this object. Filters of rules that already have a state don't have
180181
// to be checked again, these rules already matched and stay effective for the ongoing incident.
181-
err = i.evaluateRules(ctx, tx, ev.ID)
182+
err = i.evaluateRules(ctx, tx, ev)
182183
if err != nil {
183184
return err
184185
}
@@ -398,26 +399,36 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event.
398399
// evaluateRules evaluates all the configured rules for this *incident.Object and
399400
// generates history entries for each matched rule.
400401
// Returns error on database failure.
401-
func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64) error {
402+
func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error {
402403
if i.Rules == nil {
403404
i.Rules = make(map[int64]struct{})
404405
}
405406

406-
for _, r := range i.runtimeConfig.Rules {
407-
if _, ok := i.Rules[r.ID]; !ok {
408-
matched, err := r.Eval(i.Object)
409-
if err != nil {
410-
i.logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err))
411-
}
407+
ruleIdsStr, ok := ev.ExtraTags["rules"]
408+
if !ok {
409+
return errors.New("event has no rules extra tag marker")
410+
}
411+
ruleIdsArr := strings.Split(ruleIdsStr, ",")
412412

413-
if err != nil || !matched {
414-
continue
415-
}
413+
ruleIds := make(map[ruleID]struct{})
414+
for _, idStr := range ruleIdsArr {
415+
idInt, err := strconv.ParseInt(idStr, 10, 64)
416+
if err != nil {
417+
return fmt.Errorf("cannot parse %q from rules as int64", idStr)
418+
}
419+
ruleIds[idInt] = struct{}{}
420+
}
416421

422+
for _, r := range i.runtimeConfig.Rules {
423+
if _, ok := ruleIds[r.ID]; !ok {
424+
continue
425+
}
426+
427+
if _, ok := i.Rules[r.ID]; !ok {
417428
i.Rules[r.ID] = struct{}{}
418429
i.logger.Infow("Rule matches", zap.Object("rule", r))
419430

420-
err = i.AddRuleMatched(ctx, tx, r)
431+
err := i.AddRuleMatched(ctx, tx, r)
421432
if err != nil {
422433
i.logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err))
423434
return err
@@ -426,7 +437,7 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64
426437
hr := &HistoryRow{
427438
IncidentID: i.Id,
428439
Time: types.UnixMilli(time.Now()),
429-
EventID: types.MakeInt(eventID, types.TransformZeroIntToNull),
440+
EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull),
430441
RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull),
431442
Type: RuleMatched,
432443
}

internal/listener/listener.go

Lines changed: 103 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/icinga/icinga-notifications/internal/daemon"
1414
"github.com/icinga/icinga-notifications/internal/event"
1515
"github.com/icinga/icinga-notifications/internal/incident"
16+
"github.com/icinga/icinga-notifications/internal/rule"
1617
"go.uber.org/zap"
1718
"net/http"
1819
"time"
@@ -39,9 +40,11 @@ func NewListener(db *database.DB, runtimeConfig *config.RuntimeConfig, logs *log
3940
debugMux.HandleFunc("/dump-config", l.DumpConfig)
4041
debugMux.HandleFunc("/dump-incidents", l.DumpIncidents)
4142
debugMux.HandleFunc("/dump-schedules", l.DumpSchedules)
43+
debugMux.HandleFunc("/dump-rules", l.DumpRules)
4244

4345
l.mux.Handle("/debug/", http.StripPrefix("/debug", l.requireDebugAuth(debugMux)))
4446
l.mux.HandleFunc("/process-event", l.ProcessEvent)
47+
l.mux.HandleFunc("/event-rules", l.RulesForFilters)
4548
return l
4649
}
4750

@@ -82,7 +85,45 @@ func (l *Listener) Run(ctx context.Context) error {
8285
}
8386
}
8487

85-
func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
88+
// getRuleVersion returns the latest rule version.
89+
//
90+
// Technically, the rule version is an encoded string representation of the latest changed_at value from each rule.
91+
// Being an implementation detail, it might change over time. For the moment, a simple equality check is enough.
92+
func (l *Listener) getRuleVersion() string {
93+
l.runtimeConfig.RLock()
94+
defer l.runtimeConfig.RUnlock()
95+
96+
var latest time.Time
97+
for _, r := range l.runtimeConfig.Rules {
98+
if t := r.ChangedAt.Time(); t.After(latest) {
99+
latest = t
100+
}
101+
}
102+
103+
if latest.IsZero() {
104+
return "NA"
105+
}
106+
107+
return fmt.Sprintf("%x", latest.UnixNano())
108+
}
109+
110+
// sourceFromAuthOrAbort extracts a *config.Source from the HTTP Basic Auth. If the credentials are wrong, (nil, false) is
111+
// returned and 401 was written back to the response writer.
112+
func (l *Listener) sourceFromAuthOrAbort(w http.ResponseWriter, r *http.Request) (*config.Source, bool) {
113+
if authUser, authPass, authOk := r.BasicAuth(); authOk {
114+
src := l.runtimeConfig.GetSourceFromCredentials(authUser, authPass, l.logger)
115+
if src != nil {
116+
return src, true
117+
}
118+
}
119+
120+
w.Header().Set("WWW-Authenticate", `Basic realm="icinga-notifications"`)
121+
w.WriteHeader(http.StatusUnauthorized)
122+
_, _ = fmt.Fprintln(w, "please provide the debug-password as basic auth credentials (user is ignored)")
123+
return nil, false
124+
}
125+
126+
func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) {
86127
// abort the current connection by sending the status code and an error both to the log and back to the client.
87128
abort := func(statusCode int, ev *event.Event, format string, a ...any) {
88129
msg := format
@@ -99,24 +140,37 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
99140
logger.Debugw("Abort listener submitted event processing")
100141
}
101142

102-
if req.Method != http.MethodPost {
143+
if r.Method != http.MethodPost {
103144
abort(http.StatusMethodNotAllowed, nil, "POST required")
104145
return
105146
}
106147

107-
var source *config.Source
108-
if authUser, authPass, authOk := req.BasicAuth(); authOk {
109-
source = l.runtimeConfig.GetSourceFromCredentials(authUser, authPass, l.logger)
148+
source, validAuth := l.sourceFromAuthOrAbort(w, r)
149+
if !validAuth {
150+
return
110151
}
111-
if source == nil {
112-
w.Header().Set("WWW-Authenticate", `Basic realm="icinga-notifications"`)
113-
abort(http.StatusUnauthorized, nil, "HTTP authorization required")
152+
153+
ruleIdsStr := r.Header.Get("X-Rule-Ids")
154+
ruleVersion := r.Header.Get("X-Rule-Version")
155+
156+
if latestRuleVersion := l.getRuleVersion(); ruleVersion != latestRuleVersion {
157+
abort(http.StatusFailedDependency,
158+
nil,
159+
"X-Rule-Version %q does not match %q, refetch rules",
160+
ruleVersion,
161+
latestRuleVersion)
162+
return
163+
}
164+
165+
// TODO: parse and verify ruleIdsStr
166+
if ruleIdsStr == "" {
167+
// Case for empty X-Rule-Ids where the event was just sent to check if new rules are available (previous if).
168+
abort(http.StatusNoContent, nil, "dismissed event due to empty X-Rule-Ids")
114169
return
115170
}
116171

117172
var ev event.Event
118-
err := json.NewDecoder(req.Body).Decode(&ev)
119-
if err != nil {
173+
if err := json.NewDecoder(r.Body).Decode(&ev); err != nil {
120174
abort(http.StatusBadRequest, nil, "cannot parse JSON body: %v", err)
121175
return
122176
}
@@ -136,8 +190,11 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
136190
return
137191
}
138192

193+
ev.ExtraTags = make(map[string]string)
194+
ev.ExtraTags["rules"] = ruleIdsStr
195+
139196
l.logger.Infow("Processing event", zap.String("event", ev.String()))
140-
err = incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev)
197+
err := incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev)
141198
if errors.Is(err, event.ErrSuperfluousStateChange) || errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent) {
142199
abort(http.StatusNotAcceptable, &ev, "%v", err)
143200
return
@@ -149,11 +206,20 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
149206

150207
l.logger.Infow("Successfully processed event", zap.String("event", ev.String()))
151208

152-
w.WriteHeader(http.StatusOK)
209+
w.WriteHeader(http.StatusAccepted)
153210
_, _ = fmt.Fprintln(w, "event processed successfully")
154211
_, _ = fmt.Fprintln(w)
155212
}
156213

214+
func (l *Listener) RulesForFilters(w http.ResponseWriter, r *http.Request) {
215+
_, validAuth := l.sourceFromAuthOrAbort(w, r)
216+
if !validAuth {
217+
return
218+
}
219+
220+
l.DumpRules(w, r)
221+
}
222+
157223
// requireDebugAuth is a middleware that checks if the valid debug password was provided. If there is no password
158224
// configured or the supplied password is incorrect, it sends an error code and does not redirect the request.
159225
func (l *Listener) requireDebugAuth(next http.Handler) http.Handler {
@@ -256,3 +322,28 @@ func (l *Listener) DumpSchedules(w http.ResponseWriter, r *http.Request) {
256322
_, _ = fmt.Fprintln(w)
257323
}
258324
}
325+
326+
// DumpRules is used as /debug prefixed endpoint to dump all rules. The authorization has to be done beforehand.
327+
func (l *Listener) DumpRules(w http.ResponseWriter, r *http.Request) {
328+
if r.Method != http.MethodGet {
329+
w.WriteHeader(http.StatusMethodNotAllowed)
330+
_, _ = fmt.Fprintln(w, "GET required")
331+
return
332+
}
333+
334+
type Response struct {
335+
Version string
336+
Rules map[int64]*rule.Rule
337+
}
338+
339+
var resp Response
340+
resp.Version = l.getRuleVersion()
341+
342+
l.runtimeConfig.RLock()
343+
resp.Rules = l.runtimeConfig.Rules
344+
l.runtimeConfig.RUnlock()
345+
346+
enc := json.NewEncoder(w)
347+
enc.SetIndent("", " ")
348+
_ = enc.Encode(resp)
349+
}

internal/rule/rule.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package rule
33
import (
44
"github.com/icinga/icinga-go-library/types"
55
"github.com/icinga/icinga-notifications/internal/config/baseconf"
6-
"github.com/icinga/icinga-notifications/internal/filter"
76
"github.com/icinga/icinga-notifications/internal/recipient"
87
"github.com/icinga/icinga-notifications/internal/timeperiod"
98
"go.uber.org/zap/zapcore"
@@ -16,21 +15,20 @@ type Rule struct {
1615
Name string `db:"name"`
1716
TimePeriod *timeperiod.TimePeriod `db:"-"`
1817
TimePeriodID types.Int `db:"timeperiod_id"`
19-
ObjectFilter filter.Filter `db:"-"`
2018
ObjectFilterExpr types.String `db:"object_filter"`
2119
Escalations map[int64]*Escalation `db:"-"`
2220
}
2321

2422
// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface.
2523
func (r *Rule) IncrementalInitAndValidate() error {
26-
if r.ObjectFilterExpr.Valid {
27-
f, err := filter.Parse(r.ObjectFilterExpr.String)
28-
if err != nil {
29-
return err
30-
}
24+
// if r.ObjectFilterExpr.Valid {
25+
// f, err := filter.Parse(r.ObjectFilterExpr.String)
26+
// if err != nil {
27+
// return err
28+
// }
3129

32-
r.ObjectFilter = f
33-
}
30+
// r.ObjectFilter = f
31+
// }
3432

3533
return nil
3634
}
@@ -50,16 +48,6 @@ func (r *Rule) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
5048
return nil
5149
}
5250

53-
// Eval evaluates the configured object filter for the provided filterable.
54-
// Returns always true if the current rule doesn't have a configured object filter.
55-
func (r *Rule) Eval(filterable filter.Filterable) (bool, error) {
56-
if r.ObjectFilter == nil {
57-
return true, nil
58-
}
59-
60-
return r.ObjectFilter.Eval(filterable)
61-
}
62-
6351
// ContactChannels stores a set of channel IDs for each set of individual contacts.
6452
type ContactChannels map[*recipient.Contact]map[int64]bool
6553

0 commit comments

Comments
 (0)