Skip to content

Commit 643bea6

Browse files
author
Alexander Sosna
committed
Add support for synchronous query execution
1 parent 79b6110 commit 643bea6

File tree

5 files changed

+90
-9
lines changed

5 files changed

+90
-9
lines changed

circle.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
dependencies:
2+
pre:
3+
override:
4+
post:
5+
- cp sql_exporter $CIRCLE_ARTIFACTS/

config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type File struct {
4545
type Job struct {
4646
log log.Logger
4747
conns []*connection
48+
Trigger chan bool // used to trigger execution
49+
Done chan bool // used to tell state
4850
Name string `yaml:"name"` // name of this job
4951
KeepAlive bool `yaml:"keepalive"` // keep connection between runs?
5052
Interval time.Duration `yaml:"interval"` // interval at which this job is run

handler.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"net/http"
5+
"os"
6+
7+
"github.com/go-kit/kit/log"
8+
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/client_golang/prometheus/promhttp"
10+
)
11+
12+
// handlerFunc can be used as handler for http.HandleFunc()
13+
// all synchronus jobs will be triggered and waited for,
14+
// than the promhttp handler is executed
15+
func (ex *Exporter) handlerFunc(w http.ResponseWriter, req *http.Request) {
16+
// logger := log.NewNopLogger()
17+
logger := log.NewJSONLogger(os.Stdout)
18+
logger = log.With(logger, "caller", "handlerFunc")
19+
20+
// pull all triggers on jobs with interval 0
21+
logger.Log("level", "debug", "msg", "Start all sync jobs")
22+
for _, job := range ex.jobs {
23+
// if job is nil or is async then continue to next job
24+
if job == nil || job.Interval > 0 {
25+
logger.Log("level", "debug", "msg", "Send NO trigger to job", job.Name)
26+
continue
27+
}
28+
logger.Log("level", "debug", "msg", "Send trigger to job", job.Name)
29+
job.Trigger <- true
30+
}
31+
32+
// wait for all sync jobs to finish
33+
for _, job := range ex.jobs {
34+
if job == nil || job.Interval > 0 {
35+
continue
36+
}
37+
logger.Log("level", "debug", "msg", "Wait for job", job.Name)
38+
<-job.Done
39+
}
40+
logger.Log("level", "debug", "msg", "All waiting done")
41+
42+
// get the prometheus handler
43+
handler := promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{})
44+
45+
// execute the ServeHTTP function
46+
handler.ServeHTTP(w, req)
47+
}

job.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ func (j *Job) Run() {
7272
if j.log == nil {
7373
j.log = log.NewNopLogger()
7474
}
75+
76+
// if the interval is not set > 0, create needed channels
77+
if j.Interval <= 0 {
78+
if j.Trigger == nil {
79+
j.Trigger = make(chan bool)
80+
}
81+
82+
if j.Done == nil {
83+
j.Done = make(chan bool)
84+
}
85+
}
86+
7587
// if there are no connection URLs for this job it can't be run
7688
if j.Connections == nil {
7789
level.Error(j.log).Log("msg", "No conenctions for job", "job", j.Name)
@@ -110,13 +122,29 @@ func (j *Job) Run() {
110122
// enter the run loop
111123
// tries to run each query on each connection at approx the interval
112124
for {
113-
bo := backoff.NewExponentialBackOff()
114-
bo.MaxElapsedTime = j.Interval
115-
if err := backoff.Retry(j.runOnce, bo); err != nil {
116-
level.Error(j.log).Log("msg", "Failed to run", "err", err)
125+
// if the interval is 0 or lower, wait to be triggered
126+
if j.Interval <= 0 {
127+
j.log.Log("level", "debug", "msg", "Wait for trigger")
128+
// wait for trigger
129+
<-j.Trigger
130+
131+
if err := j.runOnce(); err != nil {
132+
j.log.Log("level", "error", "msg", "Failed to run", "err", err)
133+
}
134+
135+
// send to done chanel
136+
j.Done <- true
137+
j.log.Log("level", "debug", "msg", "Job finished")
138+
} else {
139+
// interval is grater than 0 so procide with async operation
140+
bo := backoff.NewExponentialBackOff()
141+
bo.MaxElapsedTime = j.Interval
142+
if err := backoff.Retry(j.runOnce, bo); err != nil {
143+
level.Error(j.log).Log("msg", "Failed to run", "err", err)
144+
}
145+
level.Debug(j.log).Log("msg", "Sleeping until next run", "sleep", j.Interval.String())
146+
time.Sleep(j.Interval)
117147
}
118-
level.Debug(j.log).Log("msg", "Sleeping until next run", "sleep", j.Interval.String())
119-
time.Sleep(j.Interval)
120148
}
121149
}
122150

main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/go-kit/kit/log"
1111
"github.com/go-kit/kit/log/level"
1212
"github.com/prometheus/client_golang/prometheus"
13-
"github.com/prometheus/client_golang/prometheus/promhttp"
1413
"github.com/prometheus/common/version"
1514
)
1615

@@ -62,8 +61,8 @@ func main() {
6261
}
6362
prometheus.MustRegister(exporter)
6463

65-
// setup and start webserver
66-
http.Handle(*metricsPath, promhttp.Handler())
64+
// setup and start webserver with custom function
65+
http.HandleFunc(*metricsPath, exporter.handlerFunc)
6766
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { http.Error(w, "OK", http.StatusOK) })
6867
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
6968
w.Write([]byte(`<html>

0 commit comments

Comments
 (0)