Skip to content

Commit b24be79

Browse files
authored
Merge pull request #87 from eranra/add_health_check
add health server (http)
2 parents 8b1de71 + 1a27edc commit b24be79

File tree

8 files changed

+164
-4
lines changed

8 files changed

+164
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ Usage:
3333

3434
Flags:
3535
--config string config file (default is $HOME/.flowlogs2metrics)
36+
--health.port string Health server port (default "8080")
3637
-h, --help help for flowlogs2metrics
3738
--log-level string Log level: debug, info, warning, error (default "error")
3839
--pipeline.decode.aws string aws fields

cmd/flowlogs2metrics/main.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
jsoniter "github.com/json-iterator/go"
2424
"github.com/netobserv/flowlogs2metrics/pkg/config"
25+
"github.com/netobserv/flowlogs2metrics/pkg/health"
2526
"github.com/netobserv/flowlogs2metrics/pkg/pipeline"
2627
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils"
2728
log "github.com/sirupsen/logrus"
@@ -128,6 +129,7 @@ func initFlags() {
128129
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName))
129130
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
130131
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Type, "pipeline.ingest.type", "", "Ingest type: file, collector,file_loop (required)")
132+
rootCmd.PersistentFlags().StringVar(&config.Opt.Health.Port, "health.port", "8080", "Health server port")
131133
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.File.Filename, "pipeline.ingest.file.filename", "", "Ingest filename (file)")
132134
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeline.ingest.collector", "", "Ingest collector API")
133135
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Kafka, "pipeline.ingest.kafka", "", "Ingest Kafka API")
@@ -160,23 +162,29 @@ func run() {
160162
mainPipeline *pipeline.Pipeline
161163
)
162164

163-
// Starting log message
165+
// Initial log message
164166
fmt.Printf("%s starting - version [%s]\n\n", filepath.Base(os.Args[0]), Version)
165-
// Dump the configuration
167+
168+
// Dump configuration
166169
dumpConfig()
167170

171+
// Setup (threads) exit manager
168172
utils.SetupElegantExit()
169173

170-
// creating a new pipeline
174+
// Create new flows pipeline
171175
mainPipeline, err = pipeline.NewPipeline()
172176
if err != nil {
173177
log.Fatalf("failed to initialize pipeline %s", err)
174178
os.Exit(1)
175179
}
176180

181+
// Starts the flows pipeline
177182
mainPipeline.Run()
178183

179-
// give all threads a chance to exit and then exit the process
184+
// Start health report server
185+
health.NewHealthServer(mainPipeline)
186+
187+
// Give all threads a chance to exit and then exit the process
180188
time.Sleep(time.Second)
181189
log.Debugf("exiting main run")
182190
os.Exit(0)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.17
55
require (
66
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
77
github.com/go-kit/kit v0.12.0
8+
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb
89
github.com/ip2location/ip2location-go/v9 v9.2.0
910
github.com/json-iterator/go v1.1.12
1011
github.com/mitchellh/mapstructure v1.4.3

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
495495
github.com/hashicorp/serf v0.9.3/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
496496
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
497497
github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4=
498+
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb h1:tsEKRC3PU9rMw18w/uAptoijhgG4EvlA5kfJPtwrMDk=
499+
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb/go.mod h1:NtmN9h8vrTveVQRLHcX2HQ5wIPBDCsZ351TGbZWgg38=
498500
github.com/hetznercloud/hcloud-go v1.22.0/go.mod h1:xng8lbDUg+xM1dgc0yGHX5EeqbwIq7UYlMWMTx3SQVg=
499501
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
500502
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=

pkg/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ var (
2525

2626
type Options struct {
2727
PipeLine Pipeline
28+
Health Health
29+
}
30+
31+
type Health struct {
32+
Port string
2833
}
2934

3035
type Pipeline struct {

pkg/health/health.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package health
19+
20+
import (
21+
"github.com/heptiolabs/healthcheck"
22+
"github.com/netobserv/flowlogs2metrics/pkg/config"
23+
"github.com/netobserv/flowlogs2metrics/pkg/pipeline"
24+
log "github.com/sirupsen/logrus"
25+
"net"
26+
"net/http"
27+
"time"
28+
)
29+
30+
const defaultServerHost = "0.0.0.0"
31+
32+
type Server struct {
33+
handler healthcheck.Handler
34+
address string
35+
}
36+
37+
func (hs *Server) Serve() {
38+
for {
39+
err := http.ListenAndServe(hs.address, hs.handler)
40+
log.Errorf("http.ListenAndServe error %v", err)
41+
time.Sleep(60 * time.Second)
42+
}
43+
}
44+
45+
func NewHealthServer(pipeline *pipeline.Pipeline) *Server {
46+
47+
handler := healthcheck.NewHandler()
48+
address := net.JoinHostPort(defaultServerHost, config.Opt.Health.Port)
49+
50+
handler.AddLivenessCheck("PipelineCheck", pipeline.IsAlive())
51+
handler.AddReadinessCheck("PipelineCheck", pipeline.IsReady())
52+
53+
server := &Server{
54+
handler: handler,
55+
address: address,
56+
}
57+
58+
go server.Serve()
59+
60+
return server
61+
}

pkg/health/health_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package health
2+
3+
import (
4+
"fmt"
5+
"github.com/netobserv/flowlogs2metrics/pkg/config"
6+
"github.com/netobserv/flowlogs2metrics/pkg/pipeline"
7+
"github.com/stretchr/testify/require"
8+
"net/http"
9+
"net/url"
10+
"testing"
11+
"time"
12+
)
13+
14+
func TestNewHealthServer(t *testing.T) {
15+
readyPath := "/ready"
16+
livePath := "/live"
17+
18+
type args struct {
19+
pipeline pipeline.Pipeline
20+
port string
21+
}
22+
type want struct {
23+
statusCode int
24+
}
25+
26+
tests := []struct {
27+
name string
28+
args args
29+
want want
30+
}{
31+
{name: "pipeline running", args: args{pipeline: pipeline.Pipeline{IsRunning: true}, port: "7000"}, want: want{statusCode: 200}},
32+
{name: "pipeline not running", args: args{pipeline: pipeline.Pipeline{IsRunning: false}, port: "7001"}, want: want{statusCode: 503}},
33+
}
34+
35+
for _, tt := range tests {
36+
t.Run(tt.name, func(t *testing.T) {
37+
38+
config.Opt.Health.Port = tt.args.port
39+
expectedAddr := fmt.Sprintf("0.0.0.0:%s", config.Opt.Health.Port)
40+
server := NewHealthServer(&tt.args.pipeline)
41+
require.NotNil(t, server)
42+
require.Equal(t, expectedAddr, server.address)
43+
44+
client := &http.Client{}
45+
46+
time.Sleep(time.Second)
47+
readyURL := url.URL{Scheme: "http", Host: expectedAddr, Path: readyPath}
48+
var resp, err = client.Get(readyURL.String())
49+
require.NoError(t, err)
50+
require.Equal(t, tt.want.statusCode, resp.StatusCode)
51+
52+
liveURL := url.URL{Scheme: "http", Host: expectedAddr, Path: livePath}
53+
resp, err = client.Get(liveURL.String())
54+
require.NoError(t, err)
55+
require.Equal(t, tt.want.statusCode, resp.StatusCode)
56+
57+
})
58+
}
59+
}

pkg/pipeline/pipeline.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package pipeline
1919

2020
import (
21+
"fmt"
22+
"github.com/heptiolabs/healthcheck"
2123
"github.com/netobserv/flowlogs2metrics/pkg/config"
2224
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode"
2325
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/encode"
@@ -40,6 +42,7 @@ type Pipeline struct {
4042
Writer write.Writer
4143
Extractor extract.Extractor
4244
Encoder encode.Encoder
45+
IsRunning bool
4346
}
4447

4548
func getIngester() (ingest.Ingester, error) {
@@ -150,7 +153,9 @@ func NewPipeline() (*Pipeline, error) {
150153
}
151154

152155
func (p *Pipeline) Run() {
156+
p.IsRunning = true
153157
p.Ingester.Ingest(p.Process)
158+
p.IsRunning = false
154159
}
155160

156161
// Process is called by the Ingester function
@@ -170,3 +175,21 @@ func (p Pipeline) Process(entries []interface{}) {
170175
extracted := p.Extractor.Extract(transformed)
171176
_ = p.Encoder.Encode(extracted)
172177
}
178+
179+
func (p *Pipeline) IsReady() healthcheck.Check {
180+
return func() error {
181+
if !p.IsRunning {
182+
return fmt.Errorf("pipeline is not running")
183+
}
184+
return nil
185+
}
186+
}
187+
188+
func (p *Pipeline) IsAlive() healthcheck.Check {
189+
return func() error {
190+
if !p.IsRunning {
191+
return fmt.Errorf("pipeline is not running")
192+
}
193+
return nil
194+
}
195+
}

0 commit comments

Comments
 (0)