|
9 | 9 | "entgo.io/ent/dialect/sql" |
10 | 10 | "github.com/pkg/errors" |
11 | 11 |
|
12 | | - "github.com/crowdsecurity/go-cs-lib/slicetools" |
13 | | - |
14 | 12 | "github.com/crowdsecurity/crowdsec/pkg/csnet" |
15 | 13 | "github.com/crowdsecurity/crowdsec/pkg/database/ent" |
16 | 14 | "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision" |
@@ -298,68 +296,79 @@ func decisionIDs(decisions []*ent.Decision) []int { |
298 | 296 | return ids |
299 | 297 | } |
300 | 298 |
|
301 | | -// ExpireDecisions sets the expiration of a list of decisions to now() |
302 | | -// It returns the number of impacted decisions for the CAPI/PAPI |
303 | | -func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) { |
304 | | - if len(decisions) <= decisionDeleteBulkSize { |
305 | | - ids := decisionIDs(decisions) |
| 299 | +// expireDecisionBatch expires the decisions as a single operation. |
| 300 | +func (c *Client) expireDecisionBatch(ctx context.Context, batch []*ent.Decision, now time.Time) (int, error) { |
| 301 | + ids := decisionIDs(batch) |
306 | 302 |
|
307 | | - rows, err := c.Ent.Decision.Update().Where( |
308 | | - decision.IDIn(ids...), |
309 | | - ).SetUntil(time.Now().UTC()).Save(ctx) |
310 | | - if err != nil { |
311 | | - return 0, fmt.Errorf("expire decisions with provided filter: %w", err) |
312 | | - } |
| 303 | + rows, err := c.Ent.Decision. |
| 304 | + Update(). |
| 305 | + Where(decision.IDIn(ids...)). |
| 306 | + SetUntil(now). |
| 307 | + Save(ctx) |
| 308 | + if err != nil { |
| 309 | + return 0, fmt.Errorf("expire decisions with provided filter: %w", err) |
| 310 | + } |
313 | 311 |
|
314 | | - return rows, nil |
| 312 | + return rows, nil |
| 313 | +} |
| 314 | + |
| 315 | +// ExpireDecisions sets the expiration of a list of decisions to now(), |
| 316 | +// in multiple operations if len(decisions) > decisionDeleteBulkSize. |
| 317 | +// It returns the number of impacted decisions for the CAPI/PAPI, even in case of error. |
| 318 | +func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) { |
| 319 | + if len(decisions) == 0 { |
| 320 | + return 0, nil |
315 | 321 | } |
316 | 322 |
|
317 | | - // big batch, let's split it and recurse |
| 323 | + now := time.Now().UTC() |
318 | 324 |
|
319 | 325 | total := 0 |
320 | | - |
321 | | - for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) { |
322 | | - rows, err := c.ExpireDecisions(ctx, chunk) |
| 326 | + err := Batch(ctx, decisions, decisionDeleteBulkSize, func(ctx context.Context, batch []*ent.Decision) error { |
| 327 | + rows, err := c.expireDecisionBatch(ctx, batch, now) |
323 | 328 | if err != nil { |
324 | | - return total, err |
| 329 | + return err |
325 | 330 | } |
326 | | - |
327 | 331 | total += rows |
328 | | - } |
| 332 | + return nil |
| 333 | + }) |
329 | 334 |
|
330 | | - return total, nil |
| 335 | + return total, err |
331 | 336 | } |
332 | 337 |
|
333 | | -// DeleteDecisions removes a list of decisions from the database |
334 | | -// It returns the number of impacted decisions for the CAPI/PAPI |
335 | | -func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) { |
336 | | - if len(decisions) < decisionDeleteBulkSize { |
337 | | - ids := decisionIDs(decisions) |
338 | | - |
339 | | - rows, err := c.Ent.Decision.Delete().Where( |
340 | | - decision.IDIn(ids...), |
341 | | - ).Exec(ctx) |
342 | | - if err != nil { |
343 | | - return 0, fmt.Errorf("hard delete decisions with provided filter: %w", err) |
344 | | - } |
| 338 | +// deleteDecisionBatch removes the decisions as a single operation. |
| 339 | +func (c *Client) deleteDecisionBatch(ctx context.Context, batch []*ent.Decision) (int, error) { |
| 340 | + ids := decisionIDs(batch) |
345 | 341 |
|
346 | | - return rows, nil |
| 342 | + rows, err := c.Ent.Decision. |
| 343 | + Delete(). |
| 344 | + Where(decision.IDIn(ids...)). |
| 345 | + Exec(ctx) |
| 346 | + if err != nil { |
| 347 | + return 0, fmt.Errorf("hard delete decisions with provided filter: %w", err) |
347 | 348 | } |
348 | 349 |
|
349 | | - // big batch, let's split it and recurse |
| 350 | + return rows, nil |
| 351 | +} |
350 | 352 |
|
351 | | - tot := 0 |
| 353 | +// DeleteDecisions removes a list of decisions from the database, |
| 354 | +// in multiple operations if len(decisions) > decisionDeleteBulkSize. |
| 355 | +// It returns the number of impacted decisions for the CAPI/PAPI, even in case of error. |
| 356 | +func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) { |
| 357 | + if len(decisions) == 0 { |
| 358 | + return 0, nil |
| 359 | + } |
352 | 360 |
|
353 | | - for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) { |
354 | | - rows, err := c.DeleteDecisions(ctx, chunk) |
| 361 | + total := 0 |
| 362 | + err := Batch(ctx, decisions, decisionDeleteBulkSize, func(ctx context.Context, batch []*ent.Decision) error { |
| 363 | + rows, err := c.deleteDecisionBatch(ctx, batch) |
355 | 364 | if err != nil { |
356 | | - return tot, err |
| 365 | + return err |
357 | 366 | } |
| 367 | + total += rows |
| 368 | + return nil |
| 369 | + }) |
358 | 370 |
|
359 | | - tot += rows |
360 | | - } |
361 | | - |
362 | | - return tot, nil |
| 371 | + return total, err |
363 | 372 | } |
364 | 373 |
|
365 | 374 | // ExpireDecisionByID set the expiration of a decision to now() |
|
0 commit comments