Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 29 additions & 28 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/crowdsecurity/dlog"

"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/docker/tracker"
"github.com/crowdsecurity/crowdsec/pkg/metrics"
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
)
Expand Down Expand Up @@ -56,8 +57,8 @@ type DockerConfiguration struct {
type DockerSource struct {
metricsLevel metrics.AcquisitionMetricsLevel
Config DockerConfiguration
runningContainerState map[string]*ContainerConfig
runningServiceState map[string]*ContainerConfig
runningContainerState *tracker.Tracker[*ContainerConfig]
runningServiceState *tracker.Tracker[*ContainerConfig]
compiledContainerName []*regexp.Regexp
compiledContainerID []*regexp.Regexp
compiledServiceName []*regexp.Regexp
Expand Down Expand Up @@ -210,8 +211,8 @@ func (d *DockerSource) Configure(ctx context.Context, yamlConfig []byte, logger
return err
}

d.runningContainerState = make(map[string]*ContainerConfig)
d.runningServiceState = make(map[string]*ContainerConfig)
d.runningContainerState = tracker.NewTracker[*ContainerConfig]()
d.runningServiceState = tracker.NewTracker[*ContainerConfig]()

d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config)

Expand Down Expand Up @@ -273,8 +274,8 @@ func (d *DockerSource) ConfigureByDSN(_ context.Context, dsn string, labels map[
d.Config.UniqueId = uuid
d.Config.ContainerName = make([]string, 0)
d.Config.ContainerID = make([]string, 0)
d.runningContainerState = make(map[string]*ContainerConfig)
d.runningServiceState = make(map[string]*ContainerConfig)
d.runningContainerState = tracker.NewTracker[*ContainerConfig]()
d.runningServiceState = tracker.NewTracker[*ContainerConfig]()
d.Config.Mode = configuration.CAT_MODE
d.logger = logger
d.Config.Labels = labels
Expand Down Expand Up @@ -382,7 +383,7 @@ func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan pipeline
foundOne := false

for _, container := range runningContainers {
if _, ok := d.runningContainerState[container.ID]; ok {
if _, ok := d.runningContainerState.Get(container.ID); ok {
d.logger.Debugf("container with id %s is already being read from", container.ID)
continue
}
Expand Down Expand Up @@ -447,7 +448,7 @@ func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan pipeline
d.logger.Errorf("Got error from docker read: %s", err)
}

d.runningContainerState[container.ID] = containerConfig
d.runningContainerState.Set(container.ID, containerConfig)
}
}

Expand Down Expand Up @@ -684,7 +685,7 @@ func (d *DockerSource) checkServices(ctx context.Context, monitChan chan *Contai
d.logger.Errorf("cannot connect to docker daemon for service monitoring: %v", err)

// Kill all running service monitoring if we can't connect
for id, service := range d.runningServiceState {
for id, service := range d.runningServiceState.GetAll() {
if service.t.Alive() {
d.logger.Infof("killing tail for service %s", service.Name)
service.t.Kill(nil)
Expand All @@ -694,7 +695,7 @@ func (d *DockerSource) checkServices(ctx context.Context, monitChan chan *Contai
}
}

delete(d.runningServiceState, id)
d.runningServiceState.Delete(id)
}
} else {
d.logger.Errorf("service list err: %s", err)
Expand All @@ -707,7 +708,7 @@ func (d *DockerSource) checkServices(ctx context.Context, monitChan chan *Contai
runningServicesID[service.ID] = true

// Don't need to re-eval an already monitored service
if _, ok := d.runningServiceState[service.ID]; ok {
if _, ok := d.runningServiceState.Get(service.ID); ok {
continue
}

Expand All @@ -717,13 +718,13 @@ func (d *DockerSource) checkServices(ctx context.Context, monitChan chan *Contai
}

// Send deletion notifications for services that are no longer running
for serviceStateID, serviceConfig := range d.runningServiceState {
for serviceStateID, serviceConfig := range d.runningServiceState.GetAll() {
if _, ok := runningServicesID[serviceStateID]; !ok {
deleteChan <- serviceConfig
}
}

d.logger.Tracef("Reading logs from %d services", len(d.runningServiceState))
d.logger.Tracef("Reading logs from %d services", d.runningServiceState.Len())

return nil
}
Expand All @@ -735,7 +736,7 @@ func (d *DockerSource) checkContainers(ctx context.Context, monitChan chan *Cont
runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
for id, container := range d.runningContainerState {
for id, container := range d.runningContainerState.GetAll() {
if container.t.Alive() {
d.logger.Infof("killing tail for container %s", container.Name)
container.t.Kill(nil)
Expand All @@ -745,7 +746,7 @@ func (d *DockerSource) checkContainers(ctx context.Context, monitChan chan *Cont
}
}

delete(d.runningContainerState, id)
d.runningContainerState.Delete(id)
}
} else {
log.Errorf("container list err: %s", err)
Expand All @@ -758,7 +759,7 @@ func (d *DockerSource) checkContainers(ctx context.Context, monitChan chan *Cont
runningContainersID[container.ID] = true

// don't need to re eval an already monitored container
if _, ok := d.runningContainerState[container.ID]; ok {
if _, ok := d.runningContainerState.Get(container.ID); ok {
continue
}

Expand All @@ -767,13 +768,13 @@ func (d *DockerSource) checkContainers(ctx context.Context, monitChan chan *Cont
}
}

for containerStateID, containerConfig := range d.runningContainerState {
for containerStateID, containerConfig := range d.runningContainerState.GetAll() {
if _, ok := runningContainersID[containerStateID]; !ok {
deleteChan <- containerConfig
}
}

d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
d.logger.Tracef("Reading logs from %d containers", d.runningContainerState.Len())

return nil
}
Expand Down Expand Up @@ -1234,22 +1235,22 @@ func (d *DockerSource) ContainerManager(ctx context.Context, in chan *ContainerC
for {
select {
case newContainer := <-in:
if _, ok := d.runningContainerState[newContainer.ID]; !ok {
if _, ok := d.runningContainerState.Get(newContainer.ID); !ok {
newContainer.logger = d.logger.WithField("container_name", newContainer.Name)
newContainer.t.Go(func() error {
return d.TailContainer(ctx, newContainer, outChan, deleteChan)
})

d.runningContainerState[newContainer.ID] = newContainer
d.runningContainerState.Set(newContainer.ID, newContainer)
}
case containerToDelete := <-deleteChan:
if containerConfig, ok := d.runningContainerState[containerToDelete.ID]; ok {
if containerConfig, ok := d.runningContainerState.Get(containerToDelete.ID); ok {
log.Infof("container acquisition stopped for container '%s'", containerConfig.Name)
containerConfig.t.Kill(nil)
delete(d.runningContainerState, containerToDelete.ID)
d.runningContainerState.Delete(containerToDelete.ID)
}
case <-d.t.Dying():
for _, container := range d.runningContainerState {
for _, container := range d.runningContainerState.GetAll() {
if container.t.Alive() {
d.logger.Infof("killing tail for container %s", container.Name)
container.t.Kill(nil)
Expand All @@ -1274,22 +1275,22 @@ func (d *DockerSource) ServiceManager(ctx context.Context, in chan *ContainerCon
for {
select {
case newService := <-in:
if _, ok := d.runningServiceState[newService.ID]; !ok {
if _, ok := d.runningServiceState.Get(newService.ID); !ok {
newService.logger = d.logger.WithField("service_name", newService.Name)
newService.t.Go(func() error {
return d.TailService(ctx, newService, outChan, deleteChan)
})

d.runningServiceState[newService.ID] = newService
d.runningServiceState.Set(newService.ID, newService)
}
case serviceToDelete := <-deleteChan:
if serviceConfig, ok := d.runningServiceState[serviceToDelete.ID]; ok {
if serviceConfig, ok := d.runningServiceState.Get(serviceToDelete.ID); ok {
d.logger.Infof("service acquisition stopped for service '%s'", serviceConfig.Name)
serviceConfig.t.Kill(nil)
delete(d.runningServiceState, serviceToDelete.ID)
d.runningServiceState.Delete(serviceToDelete.ID)
}
case <-d.t.Dying():
for _, service := range d.runningServiceState {
for _, service := range d.runningServiceState.GetAll() {
if service.t.Alive() {
d.logger.Infof("killing tail for service %s", service.Name)
service.t.Kill(nil)
Expand Down
89 changes: 89 additions & 0 deletions pkg/acquisition/modules/docker/tracker/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package tracker

import (
"maps"
"sync"
)

// Tracker is a concurrency-safe map wrapper for tracking live objects like
// containers or services.
//
// It uses a sync.RWMutex to guard access to its internal map.
// The zero value of tracker is ready to use; calling Set() will lazily initialize
// the internal map if needed.
//
// The type parameter T can be either a value type (e.g. int, struct) or a pointer
// type (e.g. *ContainerConfig). Use pointers if T contains fields that should not
// be copied (for example tombs, mutexes, or loggers).
type Tracker[T any] struct {
mu sync.RWMutex
items map[string]T
}

// NewTracker creates and initializes a new tracker for type T.
//
// While the zero value of Tracker is ready to use, using the constructor is
// recommended in case future versions require explicit initialization.
func NewTracker[T any]() *Tracker[T] {
return &Tracker[T]{items: make(map[string]T)}
}

// GetAll returns a snapshot copy of all items currently in the tracker.
//
// The returned map is a shallow copy: modifying it will not affect
// the underlying tracker contents. Safe for concurrent use.
func (t *Tracker[T]) GetAll() map[string]T {
t.mu.RLock()
snapshot := make(map[string]T, len(t.items))
maps.Copy(snapshot, t.items)
t.mu.RUnlock()
return snapshot
}

// Get returns the item stored under the given id, along with a boolean
// indicating whether it was found.
func (t *Tracker[T]) Get(id string) (T, bool) {
t.mu.RLock()
v, ok := t.items[id]
t.mu.RUnlock()
return v, ok
}

// Set stores item under the given id. If the tracker map is nil,
// it will lazily initialize it.
func (t *Tracker[T]) Set(id string, item T) {
t.mu.Lock()
if t.items == nil {
t.items = make(map[string]T)
}
t.items[id] = item
t.mu.Unlock()
}

// Delete removes the item with the given id, if present.
func (t *Tracker[T]) Delete(id string) {
t.mu.Lock()
delete(t.items, id)
t.mu.Unlock()
}

// Clear removes all tracked items and returns a snapshot of the previous contents.
//
// The returned map is a shallow copy of the internal state at the time of the call,
// so further modifications to it do not affect the tracker.
func (t *Tracker[T]) Clear() map[string]T {
t.mu.Lock()
old := make(map[string]T, len(t.items))
maps.Copy(old, t.items)
t.items = make(map[string]T)
t.mu.Unlock()
return old
}

// Len returns the number of items currently stored in the tracker.
func (t *Tracker[T]) Len() int {
t.mu.RLock()
n := len(t.items)
t.mu.RUnlock()
return n
}
Loading
Loading