Skip to content

Commit e6fcb19

Browse files
committed
use Batch helper for alerts / 2
1 parent b055bcd commit e6fcb19

File tree

1 file changed

+20
-23
lines changed

1 file changed

+20
-23
lines changed

pkg/database/alerts.go

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
log "github.com/sirupsen/logrus"
1515

1616
"github.com/crowdsecurity/go-cs-lib/cstime"
17-
"github.com/crowdsecurity/go-cs-lib/slicetools"
1817

1918
"github.com/crowdsecurity/crowdsec/pkg/csnet"
2019
"github.com/crowdsecurity/crowdsec/pkg/database/ent"
@@ -373,7 +372,7 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models
373372
return alertRef.ID, inserted, deleted, nil
374373
}
375374

376-
func (c *Client) createDecisionChunk(ctx context.Context, simulated bool, stopAtTime time.Time, decisions []*models.Decision) ([]*ent.Decision, error) {
375+
func (c *Client) createDecisionBatch(ctx context.Context, simulated bool, stopAtTime time.Time, decisions []*models.Decision) ([]*ent.Decision, error) {
377376
decisionCreate := []*ent.DecisionCreate{}
378377

379378
for _, decisionItem := range decisions {
@@ -550,15 +549,15 @@ func buildMetaCreates(ctx context.Context, logger log.FieldLogger, client *ent.C
550549

551550
func buildDecisions(ctx context.Context, logger log.FieldLogger, client *Client, alertItem *models.Alert, stopAtTime time.Time) ([]*ent.Decision, int, error) {
552551
decisions := []*ent.Decision{}
553-
554-
decisionChunks := slicetools.Chunks(alertItem.Decisions, client.decisionBulkSize)
555-
for _, decisionChunk := range decisionChunks {
556-
decisionRet, err := client.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk)
552+
if err := Batch(ctx, alertItem.Decisions, client.decisionBulkSize, func(ctx context.Context, part []*models.Decision) error {
553+
ret, err := client.createDecisionBatch(ctx, *alertItem.Simulated, stopAtTime, part)
557554
if err != nil {
558-
return nil, 0, fmt.Errorf("creating alert decisions: %w", err)
555+
return fmt.Errorf("creating alert decisions: %w", err)
559556
}
560-
561-
decisions = append(decisions, decisionRet...)
557+
decisions = append(decisions, ret...)
558+
return nil
559+
}); err != nil {
560+
return nil, 0, err
562561
}
563562

564563
discarded := len(alertItem.Decisions) - len(decisions)
@@ -626,15 +625,13 @@ func saveAlerts(ctx context.Context, c *Client, batch []alertCreatePlan) ([]stri
626625
continue
627626
}
628627

629-
decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize)
630-
631-
for _, d2 := range decisionsChunk {
632-
if err := retryOnBusy(func() error {
628+
if err := Batch(ctx, d, c.decisionBulkSize, func(ctx context.Context, d2 []*ent.Decision) error {
629+
return retryOnBusy(func() error {
633630
_, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx)
634631
return err
635-
}); err != nil {
636-
return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err)
637-
}
632+
})
633+
}); err != nil {
634+
return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err)
638635
}
639636
}
640637

@@ -646,7 +643,7 @@ type alertCreatePlan struct {
646643
decisions []*ent.Decision
647644
}
648645

649-
func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) {
646+
func (c *Client) createAlertBatch(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) {
650647
batch := make([]alertCreatePlan, 0, len(alerts))
651648

652649
for _, alertItem := range alerts {
@@ -746,16 +743,16 @@ func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList []
746743

747744
c.Log.Debugf("writing %d items", len(alertList))
748745

749-
alertChunks := slicetools.Chunks(alertList, alertCreateBulkSize)
750746
alertIDs := []string{}
751-
752-
for _, alertChunk := range alertChunks {
753-
ids, err := c.createAlertChunk(ctx, machineID, owner, alertChunk)
747+
if err := Batch(ctx, alertList, alertCreateBulkSize, func(ctx context.Context, part []*models.Alert) error {
748+
ids, err := c.createAlertBatch(ctx, machineID, owner, part)
754749
if err != nil {
755-
return nil, fmt.Errorf("machine '%s': %w", machineID, err)
750+
return fmt.Errorf("machine %q: %w", machineID, err)
756751
}
757-
758752
alertIDs = append(alertIDs, ids...)
753+
return nil
754+
}); err != nil {
755+
return nil, err
759756
}
760757

761758
if owner != nil {

0 commit comments

Comments
 (0)