Skip to content

Commit 464860e

Browse files
committed
Update workflow for workload discovery integration tests
1 parent 95fbd16 commit 464860e

File tree

9 files changed

+809
-1
lines changed

9 files changed

+809
-1
lines changed

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ copy-version-file: create-version-file
6363
mkdir -p build/bin/
6464
cp CWAGENT_VERSION $(BUILD_SPACE)/bin/CWAGENT_VERSION
6565

66-
amazon-cloudwatch-agent-linux: copy-version-file
66+
amazon-cloudwatch-agent-linux: copy-version-file workload-discovery-linux
6767
@echo Building CloudWatchAgent for Linux,Debian with ARM64 and AMD64
6868
$(LINUX_AMD64_BUILD)/config-downloader github.com/aws/amazon-cloudwatch-agent/cmd/config-downloader
6969
$(LINUX_ARM64_BUILD)/config-downloader github.com/aws/amazon-cloudwatch-agent/cmd/config-downloader
@@ -102,6 +102,10 @@ amazon-cloudwatch-agent-windows: copy-version-file
102102
$(WIN_BUILD)/start-amazon-cloudwatch-agent.exe github.com/aws/amazon-cloudwatch-agent/cmd/start-amazon-cloudwatch-agent
103103
$(WIN_BUILD)/amazon-cloudwatch-agent-config-wizard.exe github.com/aws/amazon-cloudwatch-agent/cmd/amazon-cloudwatch-agent-config-wizard
104104

105+
workload-discovery-linux:
106+
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags="-s -w" -buildmode=${CWAGENT_BUILD_MODE} -o $(BUILD_SPACE)/bin/linux_amd64/workload-discovery github.com/aws/amazon-cloudwatch-agent/cmd/workload-discovery
107+
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -trimpath -ldflags="-s -w" -buildmode=${CWAGENT_BUILD_MODE} -o $(BUILD_SPACE)/bin/linux_arm64/workload-discovery github.com/aws/amazon-cloudwatch-agent/cmd/workload-discovery
108+
105109
# A fast build that only builds amd64, we don't need wizard and config downloader
106110
build-for-docker: build-for-docker-amd64
107111

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"errors"
10+
"log/slog"
11+
"os"
12+
"runtime"
13+
"sync"
14+
"time"
15+
16+
"github.com/shirou/gopsutil/v4/process"
17+
18+
"github.com/aws/amazon-cloudwatch-agent/internal/detector"
19+
"github.com/aws/amazon-cloudwatch-agent/internal/detector/util"
20+
)
21+
22+
type Config struct {
23+
LogLevel slog.Level `json:"log_level"`
24+
Concurrency int `json:"concurrency"`
25+
Timeout time.Duration `json:"timeout"`
26+
}
27+
28+
type Discoverer struct {
29+
cfg Config
30+
logger *slog.Logger
31+
processDetectors []detector.ProcessDetector
32+
}
33+
34+
func NewDiscoverer(cfg Config, logger *slog.Logger) *Discoverer {
35+
return &Discoverer{
36+
cfg: cfg,
37+
logger: logger,
38+
processDetectors: []detector.ProcessDetector{},
39+
}
40+
}
41+
42+
func (d *Discoverer) Discover(ctx context.Context) error {
43+
start := time.Now()
44+
processes, err := process.Processes()
45+
if err != nil {
46+
return err
47+
}
48+
md, err := d.detectMetadataFromProcesses(ctx, processes)
49+
if err != nil {
50+
return err
51+
}
52+
d.logger.Debug("Discovered metadata", "elapsed", time.Since(start))
53+
if len(md) > 0 {
54+
encoder := json.NewEncoder(os.Stdout)
55+
encoder.SetIndent("", " ")
56+
if err = encoder.Encode(md); err != nil {
57+
return err
58+
}
59+
}
60+
return nil
61+
}
62+
63+
func (d *Discoverer) detectMetadataFromProcesses(ctx context.Context, processes []*process.Process) (detector.MetadataSlice, error) {
64+
d.logger.Debug(
65+
"Starting discovery",
66+
"num_process", len(processes),
67+
"num_worker", d.cfg.Concurrency,
68+
)
69+
70+
jobs := make(chan *process.Process, len(processes)-1)
71+
results := make(chan *detector.Metadata, d.cfg.Concurrency)
72+
73+
var workerWg sync.WaitGroup
74+
for w := 0; w < d.cfg.Concurrency; w++ {
75+
workerWg.Add(1)
76+
go d.worker(ctx, jobs, results, &workerWg)
77+
}
78+
79+
var collectorWg sync.WaitGroup
80+
var mds detector.MetadataSlice
81+
collectorWg.Add(1)
82+
go func() {
83+
defer collectorWg.Done()
84+
for r := range results {
85+
mds = append(mds, r)
86+
}
87+
}()
88+
89+
selfPID := int32(os.Getpid()) // nolint:gosec
90+
for _, p := range processes {
91+
// skip the workload discovery process
92+
if p.Pid == selfPID {
93+
continue
94+
}
95+
select {
96+
case jobs <- p:
97+
case <-ctx.Done():
98+
close(jobs)
99+
workerWg.Wait()
100+
return nil, ctx.Err()
101+
}
102+
}
103+
close(jobs)
104+
workerWg.Wait()
105+
close(results)
106+
collectorWg.Wait()
107+
108+
return mds, nil
109+
}
110+
111+
func (d *Discoverer) worker(ctx context.Context, jobs <-chan *process.Process, results chan<- *detector.Metadata, wg *sync.WaitGroup) {
112+
defer wg.Done()
113+
for {
114+
select {
115+
case <-ctx.Done():
116+
return
117+
case p, ok := <-jobs:
118+
if !ok {
119+
return
120+
}
121+
mds, err := d.detectMetadataFromProcess(ctx, util.NewCachedProcess(util.NewProcessWithPID(p)))
122+
if err == nil && mds != nil {
123+
for _, md := range mds {
124+
results <- md
125+
}
126+
}
127+
}
128+
}
129+
}
130+
131+
func (d *Discoverer) detectMetadataFromProcess(ctx context.Context, p detector.Process) (detector.MetadataSlice, error) {
132+
var mds detector.MetadataSlice
133+
for _, processDetector := range d.processDetectors {
134+
md, err := processDetector.Detect(ctx, p)
135+
if err != nil {
136+
if errors.Is(err, detector.ErrSkipProcess) {
137+
return nil, detector.ErrSkipProcess
138+
}
139+
continue
140+
}
141+
mds = append(mds, md)
142+
}
143+
return mds, nil
144+
}
145+
146+
func buildLogger(level slog.Level) *slog.Logger {
147+
opts := &slog.HandlerOptions{
148+
Level: level,
149+
AddSource: true,
150+
}
151+
handler := slog.NewTextHandler(os.Stderr, opts)
152+
return slog.New(handler)
153+
}
154+
155+
func main() {
156+
cfg := Config{
157+
Concurrency: runtime.NumCPU(),
158+
LogLevel: slog.LevelDebug,
159+
Timeout: 100 * time.Millisecond,
160+
}
161+
162+
logger := buildLogger(cfg.LogLevel)
163+
164+
d := NewDiscoverer(cfg, logger)
165+
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
166+
defer cancel()
167+
if err := d.Discover(ctx); err != nil {
168+
logger.Error("Discovery failed", "error", err)
169+
os.Exit(1)
170+
}
171+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"log/slog"
9+
"os"
10+
"testing"
11+
"time"
12+
13+
"github.com/shirou/gopsutil/v4/process"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/mock"
16+
"github.com/stretchr/testify/require"
17+
18+
"github.com/aws/amazon-cloudwatch-agent/internal/detector"
19+
"github.com/aws/amazon-cloudwatch-agent/internal/detector/detectortest"
20+
)
21+
22+
func TestDiscoverer_detectMetadataFromProcesses(t *testing.T) {
23+
testCases := map[string]struct {
24+
setupMock func(*detectortest.MockProcessDetector)
25+
want []*detector.Metadata
26+
wantErr error
27+
}{
28+
"SingleProcess": {
29+
setupMock: func(md *detectortest.MockProcessDetector) {
30+
md.On("Detect", mock.Anything, mock.Anything).Return(&detector.Metadata{
31+
Categories: []detector.Category{detector.CategoryJVM},
32+
Name: "test-process",
33+
}, nil).Once()
34+
},
35+
want: []*detector.Metadata{
36+
{
37+
Categories: []detector.Category{detector.CategoryJVM},
38+
Name: "test-process",
39+
},
40+
},
41+
},
42+
}
43+
for name, testCase := range testCases {
44+
t.Run(name, func(t *testing.T) {
45+
md := new(detectortest.MockProcessDetector)
46+
testCase.setupMock(md)
47+
48+
cfg := Config{
49+
LogLevel: slog.LevelDebug,
50+
Concurrency: 1,
51+
Timeout: time.Second,
52+
}
53+
d := NewDiscoverer(cfg, slog.Default())
54+
d.processDetectors = []detector.ProcessDetector{md}
55+
56+
ctx := context.Background()
57+
got, err := d.detectMetadataFromProcesses(ctx, []*process.Process{
58+
{Pid: int32(1234)},
59+
{Pid: int32(os.Getpid())}, // nolint:gosec
60+
})
61+
if testCase.wantErr != nil {
62+
assert.ErrorIs(t, err, testCase.wantErr)
63+
} else {
64+
require.NoError(t, err)
65+
assert.Equal(t, detector.MetadataSlice(testCase.want), got)
66+
}
67+
md.AssertExpectations(t)
68+
})
69+
}
70+
}
71+
72+
func TestDiscoverer_detectMetadataFromProcess(t *testing.T) {
73+
testCases := map[string]struct {
74+
setupMock func(*detectortest.MockProcessDetector)
75+
want []*detector.Metadata
76+
wantErr error
77+
}{
78+
"SkipProcess": {
79+
setupMock: func(md *detectortest.MockProcessDetector) {
80+
md.On("Detect", mock.Anything, mock.Anything).
81+
Return(nil, detector.ErrSkipProcess).Once()
82+
},
83+
wantErr: detector.ErrSkipProcess,
84+
},
85+
"NoCompatibleDetectors": {
86+
setupMock: func(m *detectortest.MockProcessDetector) {
87+
m.On("Detect", mock.Anything, mock.Anything).
88+
Return(nil, detector.ErrIncompatibleDetector).Once()
89+
},
90+
},
91+
}
92+
for name, testCase := range testCases {
93+
t.Run(name, func(t *testing.T) {
94+
md := new(detectortest.MockProcessDetector)
95+
testCase.setupMock(md)
96+
97+
cfg := Config{
98+
LogLevel: slog.LevelDebug,
99+
Concurrency: 1,
100+
Timeout: time.Second,
101+
}
102+
d := NewDiscoverer(cfg, slog.Default())
103+
d.processDetectors = []detector.ProcessDetector{md}
104+
105+
mp := new(detectortest.MockProcess)
106+
107+
ctx := context.Background()
108+
got, err := d.detectMetadataFromProcess(ctx, mp)
109+
if testCase.wantErr != nil {
110+
assert.ErrorIs(t, err, testCase.wantErr)
111+
} else {
112+
require.NoError(t, err)
113+
assert.Equal(t, detector.MetadataSlice(testCase.want), got)
114+
}
115+
md.AssertExpectations(t)
116+
})
117+
}
118+
}

internal/detector/detector.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package detector
5+
6+
import (
7+
"context"
8+
"errors"
9+
)
10+
11+
var (
12+
// ErrSkipProcess indicates that the current process being evaluated should be skipped.
13+
ErrSkipProcess = errors.New("skip process")
14+
// ErrIncompatibleDetector indicates that a specific detector is incompatible with a resource.
15+
ErrIncompatibleDetector = errors.New("incompatible detector")
16+
)
17+
18+
// ProcessDetector defines an interface for detecting and categorizing processes.
19+
type ProcessDetector interface {
20+
// Detect attempts to gather metadata for a given process. Returns an error if the detection fails.
21+
Detect(ctx context.Context, process Process) (*Metadata, error)
22+
}
23+
24+
// Process defines an interface for interacting with system processes.
25+
type Process interface {
26+
// PID returns the process ID.
27+
PID() int32
28+
// ExeWithContext returns the executable path of the process.
29+
ExeWithContext(ctx context.Context) (string, error)
30+
// CwdWithContext returns the current working directory of the process.
31+
CwdWithContext(ctx context.Context) (string, error)
32+
// CmdlineSliceWithContext returns the command line arguments of the process as a slice. Includes the executable
33+
// in the first entry.
34+
CmdlineSliceWithContext(ctx context.Context) ([]string, error)
35+
// EnvironWithContext returns the environment variables of the process. Each entry follows a <NAME>=<VALUE> pattern.
36+
EnvironWithContext(ctx context.Context) ([]string, error)
37+
}

0 commit comments

Comments
 (0)