Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cli/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type DaemonCommand struct {
signals chan os.Signal
done chan bool
Logger core.Logger

dockerHandlerStop func()
}

// Execute runs the daemon
Expand Down Expand Up @@ -64,6 +66,8 @@ func (c *DaemonCommand) boot() (err error) {
return fmt.Errorf("failed to create docker handler: %w", err)
}

c.dockerHandlerStop = config.dockerHandler.Stop

err = config.InitializeApp()
if err != nil {
return fmt.Errorf("can't start the app: %w", err)
Expand Down Expand Up @@ -105,6 +109,10 @@ func (c *DaemonCommand) shutdown() error {
return nil
}

if c.dockerHandlerStop != nil {
c.dockerHandlerStop()
}

c.Logger.Warningf("Waiting running jobs.")
return c.scheduler.Stop()
}
46 changes: 34 additions & 12 deletions cli/docker_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type DockerHandler struct {
configsFromLabels bool
logger core.Logger
filters []string

stopCh chan struct{}
}

type labelConfigUpdater interface {
Expand Down Expand Up @@ -64,6 +66,7 @@ func NewDockerHandler(config *Config, dockerFilters []string, configsFromLabels
configsFromLabels: configsFromLabels,
notifier: config,
logger: logger,
stopCh: make(chan struct{}),
}
var err error
c.dockerClient, err = c.buildDockerClient()
Expand Down Expand Up @@ -91,13 +94,29 @@ func (c *DockerHandler) watch() {
c.logger.Debugf("Watching for Docker labels changes every %s...", pollInterval)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for range ticker.C {
labels, err := c.GetDockerLabels()
// Do not print or care if there is no container up right now
if err != nil && !errors.Is(err, errNoContainersMatchingFilters) {
c.logger.Debugf("%v", err)
for {
select {
case <-ticker.C:
labels, err := c.GetDockerLabels()
// Do not print or care if there is no container up right now
if err != nil && !errors.Is(err, errNoContainersMatchingFilters) {
c.logger.Debugf("%v", err)
}
c.notifier.dockerLabelsUpdate(labels)
case <-c.stopCh:
return
}
c.notifier.dockerLabelsUpdate(labels)
}
}

// Stop terminates the background watch goroutine if running.
func (c *DockerHandler) Stop() {
select {
case <-c.stopCh:
// already closed
return
default:
close(c.stopCh)
}
}

Expand Down Expand Up @@ -154,15 +173,18 @@ func (c *DockerHandler) GetDockerLabels() (map[string]map[string]string, error)
for _, cont := range conts {
if len(cont.Names) > 0 && len(cont.Labels) > 0 {
name := strings.TrimPrefix(cont.Names[0], "/")
for k := range cont.Labels {
// remove all not relevant labels
if !strings.HasPrefix(k, labelPrefix) {
delete(cont.Labels, k)
continue

// copy only relevant labels instead of mutating cont.Labels
filtered := make(map[string]string, len(cont.Labels))
for k, v := range cont.Labels {
if strings.HasPrefix(k, labelPrefix) {
filtered[k] = v
}
}

labels[name] = cont.Labels
if len(filtered) > 0 {
labels[name] = filtered
}
}
}

Expand Down
10 changes: 4 additions & 6 deletions core/common.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package core

import (
"bytes"
"crypto/rand"
"errors"
"fmt"
"reflect"
"strings"
"time"

"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
)

Expand Down Expand Up @@ -143,17 +143,15 @@ type Execution struct {
Skipped bool
Error error

OutputStream, ErrorStream *circbuf.Buffer `json:"-"`
OutputStream, ErrorStream *bytes.Buffer `json:"-"`
}

// NewExecution returns a new Execution, with a random ID
func NewExecution() *Execution {
bufOut, _ := circbuf.NewBuffer(maxStreamSize)
bufErr, _ := circbuf.NewBuffer(maxStreamSize)
return &Execution{
ID: randomID(),
OutputStream: bufOut,
ErrorStream: bufErr,
OutputStream: &bytes.Buffer{},
ErrorStream: &bytes.Buffer{},
}
}

Expand Down
7 changes: 3 additions & 4 deletions core/localjob_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package core

import (
"bytes"
"strings"

"github.com/armon/circbuf"

. "gopkg.in/check.v1"
)

Expand All @@ -16,7 +15,7 @@ func (s *SuiteLocalJob) TestRun(c *C) {
job := &LocalJob{}
job.Command = `echo "foo bar"`

b, _ := circbuf.NewBuffer(1000)
b := &bytes.Buffer{}
e := NewExecution()
e.OutputStream = b

Expand All @@ -31,7 +30,7 @@ func (s *SuiteLocalJob) TestEnvironment(c *C) {
env := []string{"test_Key1=value1", "test_Key2=value2"}
job.Environment = env

b, _ := circbuf.NewBuffer(1000)
b := &bytes.Buffer{}
e := NewExecution()
e.OutputStream = b

Expand Down
4 changes: 2 additions & 2 deletions core/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ func (w *jobWrapper) stop(ctx *Context, err error) {
errText = ctx.Execution.Error.Error()
}

if ctx.Execution.OutputStream.TotalWritten() > 0 {
if ctx.Execution.OutputStream.Len() > 0 {
ctx.Log("StdOut: " + ctx.Execution.OutputStream.String())
}

if ctx.Execution.ErrorStream.TotalWritten() > 0 {
if ctx.Execution.ErrorStream.Len() > 0 {
ctx.Log("StdErr: " + ctx.Execution.ErrorStream.String())
}

Expand Down
Loading