Skip to content

Commit 3776d31

Browse files
committed
add ttl for retention policy
1 parent 2e9286b commit 3776d31

File tree

11 files changed

+617
-29
lines changed

11 files changed

+617
-29
lines changed

internal/logscommon/const.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,6 @@ const (
3434
LogType = "log_type"
3535

3636
LogBackpressureModeKey = "backpressure_mode"
37+
38+
RetentionPolicyTTLFileName = "Amazon_CloudWatch_RetentionPolicyTTL"
3739
)

plugins/inputs/logfile/logfile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ func (t *LogFile) cleanupStateFolder() {
373373
continue
374374
}
375375

376-
if strings.Contains(file, logscommon.WindowsEventLogPrefix) {
376+
if strings.Contains(file, logscommon.WindowsEventLogPrefix) || strings.Contains(file, logscommon.RetentionPolicyTTLFileName) {
377377
continue
378378
}
379379

plugins/outputs/cloudwatchlogs/cloudwatchlogs.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ type CloudWatchLogs struct {
6363
Token string `toml:"token"`
6464

6565
//log group and stream names
66-
LogStreamName string `toml:"log_stream_name"`
67-
LogGroupName string `toml:"log_group_name"`
66+
LogStreamName string `toml:"log_stream_name"`
67+
LogGroupName string `toml:"log_group_name"`
68+
FileStateFolder string `toml:"file_state_folder"`
6869

6970
// Retention for log group
7071
RetentionInDays int `toml:"retention_in_days"`
@@ -97,6 +98,9 @@ func (c *CloudWatchLogs) Close() error {
9798
if c.workerPool != nil {
9899
c.workerPool.Stop()
99100
}
101+
if c.targetManager != nil {
102+
c.targetManager.Stop()
103+
}
100104

101105
return nil
102106
}
@@ -144,7 +148,7 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest {
144148
if c.Concurrency > 0 {
145149
c.workerPool = pusher.NewWorkerPool(c.Concurrency)
146150
}
147-
c.targetManager = pusher.NewTargetManager(c.Log, client)
151+
c.targetManager = pusher.NewTargetManager(c.Log, client, c.FileStateFolder)
148152
})
149153
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup)
150154
cwd := &cwDest{pusher: p, retryer: logThrottleRetryer}

plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package pusher
66
import (
77
"errors"
88
"fmt"
9+
"os"
10+
"path/filepath"
911
"strings"
1012
"sync"
1113
"sync/atomic"
@@ -17,6 +19,7 @@ import (
1719
"github.com/influxdata/telegraf"
1820
"github.com/stretchr/testify/require"
1921

22+
"github.com/aws/amazon-cloudwatch-agent/internal/logscommon"
2023
"github.com/aws/amazon-cloudwatch-agent/logs"
2124
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
2225
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
@@ -674,7 +677,9 @@ func testPreparationWithLogger(
674677
) (chan struct{}, *queue) {
675678
t.Helper()
676679
stop := make(chan struct{})
677-
tm := NewTargetManager(logger, service)
680+
tempDir := t.TempDir()
681+
os.Create(filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName))
682+
tm := NewTargetManager(logger, service, tempDir)
678683
s := newSender(logger, service, tm, retryDuration, stop)
679684
q := newQueue(
680685
logger,
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package pusher
5+
6+
import (
7+
"bufio"
8+
"bytes"
9+
"os"
10+
"path/filepath"
11+
"strconv"
12+
"strings"
13+
"sync"
14+
"time"
15+
16+
"github.com/influxdata/telegraf"
17+
18+
"github.com/aws/amazon-cloudwatch-agent/internal/logscommon"
19+
)
20+
21+
const (
22+
ttlTime = 5 * time.Minute
23+
)
24+
25+
type payload struct {
26+
group string
27+
timestamp time.Time
28+
}
29+
30+
type retentionPolicyTTL struct {
31+
logger telegraf.Logger
32+
stateFilePath string
33+
// oldTimestamps come from the TTL file on agent start. Key is escaped group name
34+
oldTimestamps map[string]time.Time
35+
// newTimestamps are the new TTLs that will be saved periodically and when the agent is done. Key is escaped group name
36+
newTimestamps map[string]time.Time
37+
mu sync.RWMutex
38+
ch chan payload
39+
done chan struct{}
40+
}
41+
42+
func NewRetentionPolicyTTL(logger telegraf.Logger, fileStatePath string) *retentionPolicyTTL {
43+
r := &retentionPolicyTTL{
44+
logger: logger,
45+
stateFilePath: filepath.Join(fileStatePath, logscommon.RetentionPolicyTTLFileName),
46+
oldTimestamps: make(map[string]time.Time),
47+
newTimestamps: make(map[string]time.Time),
48+
ch: make(chan payload, retentionChannelSize),
49+
done: make(chan struct{}),
50+
}
51+
52+
r.loadTTLState()
53+
go r.process()
54+
return r
55+
}
56+
57+
// Update will update the newTimestamps to the current time that will later be persisted to disk.
58+
func (r *retentionPolicyTTL) Update(group string) {
59+
r.ch <- payload{
60+
group: group,
61+
timestamp: time.Now(),
62+
}
63+
}
64+
65+
func (r *retentionPolicyTTL) Done() {
66+
close(r.done)
67+
}
68+
69+
// IsExpired checks from the timestamps in the read state file at the agent start.
70+
func (r *retentionPolicyTTL) IsExpired(group string) bool {
71+
if ts, ok := r.oldTimestamps[escapeLogGroup(group)]; ok {
72+
return ts.Add(ttlTime).Before(time.Now())
73+
}
74+
// Log group was not in state file -- default to expired
75+
return true
76+
}
77+
78+
// UpdateFromFile updates the newTimestamps cache using the timestamp from the loaded state file.
79+
func (r *retentionPolicyTTL) UpdateFromFile(group string) {
80+
if oldTs, ok := r.oldTimestamps[escapeLogGroup(group)]; ok {
81+
r.ch <- payload{
82+
group: group,
83+
timestamp: oldTs,
84+
}
85+
}
86+
}
87+
88+
func (r *retentionPolicyTTL) loadTTLState() {
89+
if _, err := os.Stat(r.stateFilePath); err != nil {
90+
r.logger.Debug("retention policy ttl state file does not exist")
91+
return
92+
}
93+
94+
file, err := os.Open(r.stateFilePath)
95+
if err != nil {
96+
r.logger.Errorf("unable to open retention policy ttl state file: %v", err)
97+
return
98+
}
99+
defer file.Close()
100+
101+
scanner := bufio.NewScanner(file)
102+
for scanner.Scan() {
103+
line := scanner.Text()
104+
if len(line) == 0 {
105+
continue
106+
}
107+
split := strings.Split(line, ":")
108+
if len(split) < 2 {
109+
r.logger.Errorf("invalid format in retention policy ttl state file: %s", line)
110+
continue
111+
}
112+
113+
group := split[0]
114+
timestamp, err := strconv.ParseInt(split[1], 10, 64)
115+
if err != nil {
116+
r.logger.Errorf("unable to parse timestamp in retention policy ttl for group %s: %v", group, err)
117+
continue
118+
}
119+
r.oldTimestamps[group] = time.UnixMilli(timestamp)
120+
}
121+
122+
if err := scanner.Err(); err != nil {
123+
r.logger.Errorf("error when parsing retention policy ttl state file: %v", err)
124+
return
125+
}
126+
}
127+
128+
func (r *retentionPolicyTTL) process() {
129+
t := time.NewTicker(time.Minute)
130+
defer t.Stop()
131+
132+
for {
133+
select {
134+
case payload := <-r.ch:
135+
r.updateTimestamp(payload.group, payload.timestamp)
136+
case <-t.C:
137+
r.saveTTLState()
138+
case <-r.done:
139+
r.saveTTLState()
140+
return
141+
}
142+
}
143+
}
144+
145+
func (r *retentionPolicyTTL) updateTimestamp(group string, timestamp time.Time) {
146+
r.mu.Lock()
147+
defer r.mu.Unlock()
148+
r.newTimestamps[escapeLogGroup(group)] = timestamp
149+
}
150+
151+
func (r *retentionPolicyTTL) saveTTLState() {
152+
r.mu.RLock()
153+
defer r.mu.RUnlock()
154+
155+
var buf bytes.Buffer
156+
for group, timestamp := range r.newTimestamps {
157+
buf.Write([]byte(group + ":" + strconv.FormatInt(timestamp.UnixMilli(), 10) + "\n"))
158+
}
159+
160+
err := os.WriteFile(r.stateFilePath, buf.Bytes(), 0644)
161+
if err != nil {
162+
r.logger.Errorf("unable to write retention policy ttl state file: %v", err)
163+
}
164+
}
165+
166+
func escapeLogGroup(group string) (escapedLogGroup string) {
167+
escapedLogGroup = filepath.ToSlash(group)
168+
escapedLogGroup = strings.Replace(escapedLogGroup, "/", "_", -1)
169+
escapedLogGroup = strings.Replace(escapedLogGroup, " ", "_", -1)
170+
escapedLogGroup = strings.Replace(escapedLogGroup, ":", "_", -1)
171+
return
172+
}

0 commit comments

Comments
 (0)