Skip to content

Commit fe64d14

Browse files
authored
Merge branch 'main' into musa-asad/feature/workload-discovery
2 parents fa76598 + 06ff831 commit fe64d14

File tree

4 files changed

+166
-7
lines changed

4 files changed

+166
-7
lines changed

.github/workflows/build-test-artifacts.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ jobs:
8282

8383
BuildDistributor:
8484
needs: [BuildAndUpload, BuildAndUploadPackages, BuildDocker]
85+
if: ${{ github.event_name == 'push' || inputs.build-distributor }}
8586
uses: ./.github/workflows/test-build-distributor.yml
8687
secrets: inherit
8788
permissions:

.github/workflows/test-build-distributor.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ env:
33
CWA_GITHUB_TEST_REPO_NAME: "aws/amazon-cloudwatch-agent-test"
44
CWA_GITHUB_TEST_REPO_BRANCH: "main"
55
CHECKOUT_ROOT_DIR: "test"
6+
SSM_DISTRIBUTOR_VERSION_LIMIT: 25
67
on:
78
workflow_dispatch:
89
inputs:
@@ -74,22 +75,39 @@ jobs:
7475
contents: read
7576
outputs:
7677
agent-version: ${{ steps.version.outputs.version }}
78+
should-build: ${{ steps.check.outputs.should-build }}
7779
steps:
7880
- name: Configure AWS Credentials
7981
uses: aws-actions/configure-aws-credentials@v4
8082
with:
8183
role-to-assume: ${{ inputs.TerraformAWSAssumeRole }}
8284
aws-region: ${{ inputs.Region }}
85+
8386
- name: Agent Version
8487
id: version
8588
run: |
8689
aws s3 cp s3://${{ inputs.Bucket }}/${{ inputs.BucketKey }}/CWAGENT_VERSION .
8790
echo "version=$(cat CWAGENT_VERSION)" >> "$GITHUB_OUTPUT"
8891
92+
- name: Check if SSM distributor version already exists
93+
id: check
94+
run: |
95+
if aws ssm describe-document \
96+
--name "${{ inputs.DistributorName }}" \
97+
--region ${{ inputs.Region }} \
98+
--query 'Document.VersionName' \
99+
--output text 2>/dev/null | grep -q "^${{ steps.version.outputs.version }}$"; then
100+
echo "should-build=false" >> "$GITHUB_OUTPUT"
101+
echo "Version ${{ steps.version.outputs.version }} already exists, skipping build"
102+
else
103+
echo "should-build=true" >> "$GITHUB_OUTPUT"
104+
fi
105+
89106
PackageLinux:
90107
name: 'PackageLinux'
91108
runs-on: ubuntu-latest
92109
needs: [AgentVersion]
110+
if: needs.AgentVersion.outputs.should-build == 'true'
93111
permissions:
94112
id-token: write
95113
contents: read
@@ -149,6 +167,7 @@ jobs:
149167
name: 'PackageDarwin'
150168
runs-on: ubuntu-latest
151169
needs: [AgentVersion]
170+
if: needs.AgentVersion.outputs.should-build == 'true'
152171
permissions:
153172
id-token: write
154173
contents: read
@@ -206,6 +225,7 @@ jobs:
206225
name: 'PackageWindows'
207226
runs-on: ubuntu-latest
208227
needs: [AgentVersion]
228+
if: needs.AgentVersion.outputs.should-build == 'true'
209229
permissions:
210230
id-token: write
211231
contents: read
@@ -260,6 +280,7 @@ jobs:
260280
name: 'UploadDistributor'
261281
runs-on: ubuntu-latest
262282
needs: [AgentVersion, PackageLinux, PackageDarwin, PackageWindows]
283+
if: needs.AgentVersion.outputs.should-build == 'true'
263284
permissions:
264285
id-token: write
265286
contents: read
@@ -321,6 +342,16 @@ jobs:
321342
--document-type Package \
322343
--region ${{ inputs.Region }}
323344
345+
- name: Remove oldest if limit hit
346+
if: steps.check-distributor.outputs.exists == 'true'
347+
run: |
348+
VERSION_COUNT=$(aws ssm list-document-versions --name "${{ inputs.DistributorName }}" --region ${{ inputs.Region }} --no-paginate --query 'length(DocumentVersions)' --output text)
349+
if [ "$VERSION_COUNT" -ge ${{ env.SSM_DISTRIBUTOR_VERSION_LIMIT }} ]; then
350+
OLDEST_VERSION=$(aws ssm list-document-versions --name "${{ inputs.DistributorName }}" --region ${{ inputs.Region }} --no-paginate --query 'DocumentVersions[-1].DocumentVersion' --output text)
351+
echo "Deleting oldest version: $OLDEST_VERSION"
352+
aws ssm delete-document --name "${{ inputs.DistributorName }}" --document-version "$OLDEST_VERSION" --region ${{ inputs.Region }}
353+
fi
354+
324355
- name: Update distributor
325356
if: steps.check-distributor.outputs.exists == 'true'
326357
run: |

plugins/outputs/cloudwatchlogs/internal/pusher/target.go

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const (
2424
baseRetryDelay = 1 * time.Second
2525
maxRetryDelayTarget = 10 * time.Second
2626
numBackoffRetries = 5
27+
28+
errMessageLogGroupIdentifierNotSupported = "Input filter on Log group identifiers is not supported."
2729
)
2830

2931
type Target struct {
@@ -185,21 +187,31 @@ func (m *targetManager) processDescribeLogGroup() {
185187
case target := <-m.dlg:
186188
batch[target.Group] = target
187189
if len(batch) == logGroupIdentifierLimit {
188-
m.updateTargetBatch(batch)
190+
m.updateTargets(batch)
189191
// Reset batch
190192
batch = make(map[string]Target, logGroupIdentifierLimit)
191193
}
192194
case <-t.C:
193195
if len(batch) > 0 {
194-
m.updateTargetBatch(batch)
196+
m.updateTargets(batch)
195197
// Reset batch
196198
batch = make(map[string]Target, logGroupIdentifierLimit)
197199
}
198200
}
199201
}
200202
}
201203

202-
func (m *targetManager) updateTargetBatch(targets map[string]Target) {
204+
func (m *targetManager) updateTargets(targets map[string]Target) {
205+
err := m.updateTargetsBatch(targets)
206+
if err != nil {
207+
m.logger.Debug("falling back to describing log groups by prefix")
208+
m.updateTargetsIteratively(targets)
209+
}
210+
}
211+
212+
// updateTargetsBatch will call DLG for the entire batch (single call). Will return an error if
213+
// DLG by identifiers is not supported.
214+
func (m *targetManager) updateTargetsBatch(targets map[string]Target) error {
203215
identifiers := make([]*string, 0, len(targets))
204216
for logGroup := range targets {
205217
identifiers = append(identifiers, aws.String(logGroup))
@@ -211,7 +223,11 @@ func (m *targetManager) updateTargetBatch(targets map[string]Target) {
211223
for attempt := 0; attempt < numBackoffRetries; attempt++ {
212224
output, err := m.service.DescribeLogGroups(describeLogGroupsInput)
213225
if err != nil {
214-
m.logger.Errorf("failed to describe log group retention for targets %v: %v", targets, err)
226+
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == cloudwatchlogs.ErrCodeInvalidParameterException && aerr.Message() == errMessageLogGroupIdentifierNotSupported {
227+
return err
228+
}
229+
230+
m.logger.Errorf("failed to batch describe log group retention for targets %v: %v", targets, err)
215231
time.Sleep(m.calculateBackoff(attempt))
216232
continue
217233
}
@@ -225,6 +241,49 @@ func (m *targetManager) updateTargetBatch(targets map[string]Target) {
225241
}
226242
break
227243
}
244+
return nil
245+
}
246+
247+
// updateTargetsIteratively will iterate through the targets and call DLG for each target.
248+
func (m *targetManager) updateTargetsIteratively(targets map[string]Target) {
249+
for _, target := range targets {
250+
for attempt := 0; attempt < numBackoffRetries; attempt++ {
251+
currentRetention, err := m.getRetention(target)
252+
if err != nil {
253+
m.logger.Errorf("failed to describe log group retention for target %v: %v", target, err)
254+
time.Sleep(m.calculateBackoff(attempt))
255+
continue
256+
}
257+
258+
if currentRetention != target.Retention && target.Retention > 0 {
259+
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
260+
m.prp <- target
261+
}
262+
break // no change in retention
263+
}
264+
}
265+
}
266+
267+
func (m *targetManager) getRetention(target Target) (int, error) {
268+
input := &cloudwatchlogs.DescribeLogGroupsInput{
269+
LogGroupNamePrefix: aws.String(target.Group),
270+
}
271+
272+
output, err := m.service.DescribeLogGroups(input)
273+
if err != nil {
274+
return 0, fmt.Errorf("describe log groups failed: %w", err)
275+
}
276+
277+
for _, group := range output.LogGroups {
278+
if *group.LogGroupName == target.Group {
279+
if group.RetentionInDays == nil {
280+
return 0, nil
281+
}
282+
return int(*group.RetentionInDays), nil
283+
}
284+
}
285+
286+
return 0, fmt.Errorf("log group %v not found", target.Group)
228287
}
229288

230289
func (m *targetManager) processPutRetentionPolicy() {

plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -460,14 +460,57 @@ func TestDescribeLogGroupsBatching(t *testing.T) {
460460
batch := make(map[string]Target)
461461
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
462462
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
463-
tm.updateTargetBatch(batch)
463+
tm.updateTargets(batch)
464464

465465
// Wait for ticker to fire (slightly longer than 5 seconds)
466466
time.Sleep(5100 * time.Millisecond)
467467

468468
mockService.AssertNotCalled(t, "PutRetentionPolicy")
469469
})
470470

471+
t.Run("FallbackToPrefix", func(t *testing.T) {
472+
mockService := new(mockLogsService)
473+
474+
// First call with identifiers fails with unsupported error
475+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
476+
return len(input.LogGroupIdentifiers) > 0
477+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{},
478+
awserr.New(cloudwatchlogs.ErrCodeInvalidParameterException, errMessageLogGroupIdentifierNotSupported, nil)).Once()
479+
480+
// Fallback calls with prefix for each group
481+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
482+
return input.LogGroupNamePrefix != nil && *input.LogGroupNamePrefix == "group-1"
483+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
484+
LogGroups: []*cloudwatchlogs.LogGroup{
485+
{LogGroupName: aws.String("group-1"), RetentionInDays: aws.Int64(1)},
486+
},
487+
}, nil).Once()
488+
489+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
490+
return input.LogGroupNamePrefix != nil && *input.LogGroupNamePrefix == "group-2"
491+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
492+
LogGroups: []*cloudwatchlogs.LogGroup{
493+
{LogGroupName: aws.String("group-2"), RetentionInDays: aws.Int64(7)},
494+
},
495+
}, nil).Once()
496+
497+
mockService.On("PutRetentionPolicy", mock.MatchedBy(func(input *cloudwatchlogs.PutRetentionPolicyInput) bool {
498+
return *input.LogGroupName == "group-1" && *input.RetentionInDays == 7
499+
})).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()
500+
501+
manager := NewTargetManager(logger, mockService)
502+
tm := manager.(*targetManager)
503+
504+
batch := make(map[string]Target)
505+
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
506+
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
507+
508+
tm.updateTargets(batch)
509+
time.Sleep(100 * time.Millisecond)
510+
511+
mockService.AssertExpectations(t)
512+
})
513+
471514
t.Run("RetentionPolicyUpdate", func(t *testing.T) {
472515
mockService := new(mockLogsService)
473516

@@ -497,7 +540,7 @@ func TestDescribeLogGroupsBatching(t *testing.T) {
497540
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
498541
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
499542

500-
tm.updateTargetBatch(batch)
543+
tm.updateTargets(batch)
501544
time.Sleep(100 * time.Millisecond)
502545

503546
mockService.AssertExpectations(t)
@@ -523,13 +566,38 @@ func TestDescribeLogGroupsBatching(t *testing.T) {
523566
batch := make(map[string]Target)
524567
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
525568

526-
tm.updateTargetBatch(batch)
569+
tm.updateTargets(batch)
527570
// Sleep enough for retry
528571
time.Sleep(2 * time.Second)
529572

530573
mockService.AssertExpectations(t)
531574
})
532575

576+
t.Run("FallbackToPrefix/OtherError", func(t *testing.T) {
577+
mockService := new(mockLogsService)
578+
579+
// First call with identifiers fails with different error (should not fallback)
580+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
581+
return len(input.LogGroupIdentifiers) > 0
582+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{},
583+
awserr.New(cloudwatchlogs.ErrCodeInvalidParameterException, "Different error message", nil)).Times(numBackoffRetries)
584+
585+
manager := NewTargetManager(logger, mockService)
586+
tm := manager.(*targetManager)
587+
588+
batch := make(map[string]Target)
589+
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
590+
591+
tm.updateTargets(batch)
592+
time.Sleep(2 * time.Second)
593+
594+
mockService.AssertExpectations(t)
595+
// Should not call prefix-based DescribeLogGroups
596+
mockService.AssertNotCalled(t, "DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
597+
return input.LogGroupNamePrefix != nil
598+
}))
599+
})
600+
533601
}
534602

535603
func TestCalculateBackoff(t *testing.T) {

0 commit comments

Comments
 (0)