Skip to content

Commit 04d0b6f

Browse files
Alexander Sosnambanck-cd
authored andcommitted
Add synchronous jobs
We needed a possibility to execute some jobs / queries synchronously every time /metrics is requested. So I added the infrastructure to execute jobs with an interval <= 0 synchronously when the http-handler is called. interval: '0s' # an interval <= 0 will make the queries synchronous Some typos fixed by Felix Hillingshaeuser, thanks!
1 parent 0001728 commit 04d0b6f

File tree

5 files changed

+69
-10
lines changed

5 files changed

+69
-10
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ jobs:
119119
# each job needs a unique name, it's used for logging and as a default label
120120
- name: "example"
121121
# interval defined the pause between the runs of this job
122+
# set to 0 to make the queries synchronous
122123
interval: '5m'
123124
# cron_schedule when to execute the job in the standard CRON syntax
124125
# if specified, the interval is ignored

config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ func (c *cronConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
145145
type Job struct {
146146
log log.Logger
147147
conns []*connection
148+
Trigger chan bool // used to trigger execution
149+
Done chan bool // used to tell state
148150
Name string `yaml:"name"` // name of this job
149151
KeepAlive bool `yaml:"keepalive"` // keep connection between runs?
150152
Interval time.Duration `yaml:"interval"` // interval at which this job is run

handler.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package main
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promhttp"
6+
"net/http"
7+
)
8+
9+
// handlerFunc can be used as handler for http.HandleFunc()
10+
// all synchronous jobs will be triggered and waited for,
11+
// then the promhttp handler is executed
12+
func (ex *Exporter) handlerFunc(w http.ResponseWriter, req *http.Request) {
13+
// pull all triggers on jobs with interval 0
14+
for _, job := range ex.jobs {
15+
// if job is nil or is async then continue to next job
16+
if job == nil || job.Interval > 0 {
17+
continue
18+
}
19+
job.Trigger <- true
20+
}
21+
22+
// wait for all sync jobs to finish
23+
for _, job := range ex.jobs {
24+
if job == nil || job.Interval > 0 {
25+
continue
26+
}
27+
<-job.Done
28+
}
29+
30+
// get the prometheus handler
31+
handler := promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{})
32+
33+
// execute the ServeHTTP function
34+
handler.ServeHTTP(w, req)
35+
}

job.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ func (j *Job) Init(logger log.Logger, queries map[string]string) error {
8484
}
8585

8686
func (j *Job) updateConnections() {
87+
// if the interval is not set > 0, create needed channels
88+
if j.Interval <= 0 {
89+
if j.Trigger == nil {
90+
j.Trigger = make(chan bool)
91+
}
92+
93+
if j.Done == nil {
94+
j.Done = make(chan bool)
95+
}
96+
}
8797
// if there are no connection URLs for this job it can't be run
8898
if j.Connections == nil {
8999
level.Error(j.log).Log("msg", "no connections for job", "job_name", j.Name)
@@ -405,13 +415,25 @@ func (j *Job) markFailed(conn *connection) {
405415

406416
// Run the job queries with exponential backoff, implements the cron.Job interface
407417
func (j *Job) Run() {
408-
bo := backoff.NewExponentialBackOff()
409-
bo.MaxElapsedTime = j.Interval
410-
if bo.MaxElapsedTime == 0 {
411-
bo.MaxElapsedTime = time.Minute
412-
}
413-
if err := backoff.Retry(j.runOnce, bo); err != nil {
414-
level.Error(j.log).Log("msg", "Failed to run", "err", err)
418+
// if the interval is 0 or lower, wait to be triggered
419+
if j.Interval <= 0 {
420+
// wait for trigger
421+
<-j.Trigger
422+
if err := j.runOnce(); err != nil {
423+
level.Error(j.log).Log("msg", "Failed to run", "err", err)
424+
}
425+
426+
// send true into done channel
427+
j.Done <- true
428+
} else {
429+
bo := backoff.NewExponentialBackOff()
430+
bo.MaxElapsedTime = j.Interval
431+
if bo.MaxElapsedTime == 0 {
432+
bo.MaxElapsedTime = time.Minute
433+
}
434+
if err := backoff.Retry(j.runOnce, bo); err != nil {
435+
level.Error(j.log).Log("msg", "Failed to run", "err", err)
436+
}
415437
}
416438
}
417439

main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/go-kit/log"
1111
"github.com/go-kit/log/level"
1212
"github.com/prometheus/client_golang/prometheus"
13-
"github.com/prometheus/client_golang/prometheus/promhttp"
1413
"github.com/prometheus/common/version"
1514
)
1615

@@ -62,8 +61,8 @@ func main() {
6261
}
6362
prometheus.MustRegister(exporter)
6463

65-
// setup and start webserver
66-
http.Handle(*metricsPath, promhttp.Handler())
64+
// setup and start webserver with custom function
65+
http.HandleFunc(*metricsPath, exporter.handlerFunc)
6766
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { http.Error(w, "OK", http.StatusOK) })
6867
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
6968
w.Write([]byte(`<html>

0 commit comments

Comments
 (0)