Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Commit fe98972

Browse files
authored
Merge pull request #28 from blendle/kb/librdkafka
Use the librdkafka implementation by confluent.
2 parents 50b8776 + f618f24 commit fe98972

File tree

11 files changed

+104
-146
lines changed

11 files changed

+104
-146
lines changed

Dockerfile

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
1+
FROM golang:alpine as builder
2+
3+
WORKDIR /go/src/github.com/blendle/pg2kafka
4+
ADD . ./
5+
6+
RUN apk --update --no-cache add git alpine-sdk bash
7+
RUN wget -qO- https://github.com/edenhill/librdkafka/archive/v0.11.4-RC1.tar.gz | tar xz
8+
RUN cd librdkafka-* && ./configure && make && make install
9+
RUN go get github.com/golang/dep/cmd/dep && dep ensure -vendor-only
10+
RUN go build -ldflags "-X main.version=$(git rev-parse --short @) -s -extldflags -static" -a -installsuffix cgo .
11+
112
FROM scratch
2-
MAINTAINER Jurre Stender <[email protected]>
13+
LABEL maintainer="Jurre Stender <[email protected]>"
314
COPY sql ./sql
4-
COPY pg2kafka /
15+
COPY --from=builder /go/src/github.com/blendle/pg2kafka/pg2kafka /
16+
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
517
ENTRYPOINT ["/pg2kafka"]

Gopkg.lock

Lines changed: 5 additions & 65 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
[[constraint]]
2-
branch = "master"
3-
name = "github.com/blendle/go-streamprocessor"
4-
51
[[constraint]]
62
branch = "master"
73
name = "github.com/buger/jsonparser"

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,12 @@ kafka-topics \
157157
Then export the Kafka host as an URL so pg2kafka can use it:
158158

159159
```bash
160-
$ export KAFKA_PRODUCER_URL="kafka://localhost:9092"
160+
$ export KAFKA_BROKER="localhost:9092"
161161
```
162162

163163
### Running the service locally
164164

165-
Make sure you export the `DATABASE_URL` and `KAFKA_PRODUCER_URL`, and also
165+
Make sure you export the `DATABASE_URL` and `KAFKA_BROKER`, and also
166166
`export PERFORM_MIGRATIONS=true`.
167167

168168
```bash

main.go

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@ import (
1010
"time"
1111

1212
logger "github.com/blendle/go-logger"
13-
"github.com/blendle/go-streamprocessor/stream"
14-
"github.com/blendle/go-streamprocessor/streamclient"
15-
"github.com/blendle/go-streamprocessor/streamclient/kafka"
16-
"github.com/blendle/go-streamprocessor/streamclient/standardstream"
1713
"github.com/blendle/pg2kafka/eventqueue"
14+
"github.com/confluentinc/confluent-kafka-go/kafka"
1815
"github.com/lib/pq"
16+
"github.com/pkg/errors"
1917
"go.uber.org/zap"
2018
)
2119

@@ -24,6 +22,15 @@ var (
2422
version string
2523
)
2624

25+
// Producer is the minimal required interface pg2kafka requires to produce
26+
// events to a kafka topic.
27+
type Producer interface {
28+
Close()
29+
Flush(int) int
30+
31+
Produce(*kafka.Message, chan kafka.Event) error
32+
}
33+
2734
func main() {
2835
conf := &logger.Config{
2936
App: "pg2kafka",
@@ -57,12 +64,8 @@ func main() {
5764
}
5865

5966
producer := setupProducer()
60-
61-
defer func() {
62-
if cerr := producer.Close(); cerr != nil {
63-
logger.L.Fatal("Error closing producer", zap.Error(err))
64-
}
65-
}()
67+
defer producer.Close()
68+
defer producer.Flush(1000)
6669

6770
reportProblem := func(ev pq.ListenerEventType, err error) {
6871
if err != nil {
@@ -91,7 +94,7 @@ func main() {
9194

9295
// ProcessEvents queries the database for unprocessed events and produces them
9396
// to kafka.
94-
func ProcessEvents(p stream.Producer, eq *eventqueue.Queue) {
97+
func ProcessEvents(p Producer, eq *eventqueue.Queue) {
9598
events, err := eq.FetchUnprocessedRecords()
9699
if err != nil {
97100
logger.L.Error("Error listening to pg", zap.Error(err))
@@ -100,7 +103,7 @@ func ProcessEvents(p stream.Producer, eq *eventqueue.Queue) {
100103
produceMessages(p, events, eq)
101104
}
102105

103-
func processQueue(p stream.Producer, eq *eventqueue.Queue) {
106+
func processQueue(p Producer, eq *eventqueue.Queue) {
104107
pageCount, err := eq.UnprocessedEventPagesCount()
105108
if err != nil {
106109
logger.L.Fatal("Error selecting count", zap.Error(err))
@@ -113,7 +116,7 @@ func processQueue(p stream.Producer, eq *eventqueue.Queue) {
113116

114117
func waitForNotification(
115118
l *pq.Listener,
116-
p stream.Producer,
119+
p Producer,
117120
eq *eventqueue.Queue,
118121
signals chan os.Signal,
119122
) {
@@ -134,39 +137,67 @@ func waitForNotification(
134137
}
135138
}
136139

137-
func produceMessages(p stream.Producer, events []*eventqueue.Event, eq *eventqueue.Queue) {
140+
func produceMessages(p Producer, events []*eventqueue.Event, eq *eventqueue.Queue) {
141+
deliveryChan := make(chan kafka.Event)
138142
for _, event := range events {
139143
msg, err := json.Marshal(event)
140144
if err != nil {
141145
logger.L.Fatal("Error parsing event", zap.Error(err))
142146
}
143147

144-
p.Messages() <- &stream.Message{
148+
topic := topicName(event.TableName)
149+
message := &kafka.Message{
150+
TopicPartition: kafka.TopicPartition{
151+
Topic: &topic,
152+
Partition: kafka.PartitionAny, // nolint: gotype
153+
},
145154
Value: msg,
146-
Topic: topicName(event.TableName),
147155
Key: event.ExternalID,
148156
Timestamp: event.CreatedAt,
149157
}
150-
158+
if os.Getenv("DRY_RUN") != "" {
159+
logger.L.Info("Would produce message", zap.Any("message", message))
160+
} else {
161+
err = p.Produce(message, deliveryChan)
162+
if err != nil {
163+
logger.L.Fatal("Failed to produce", zap.Error(err))
164+
}
165+
e := <-deliveryChan
166+
167+
result := e.(*kafka.Message)
168+
if result.TopicPartition.Error != nil {
169+
logger.L.Fatal("Delivery failed", zap.Error(result.TopicPartition.Error))
170+
}
171+
}
151172
err = eq.MarkEventAsProcessed(event.ID)
152173
if err != nil {
153174
logger.L.Fatal("Error marking record as processed", zap.Error(err))
154175
}
155176
}
156177
}
157178

158-
func setupProducer() stream.Producer {
159-
options := func(sc *standardstream.Client, kc *kafka.Client) {
160-
sc.Logger = logger.L
161-
kc.Logger = logger.L
179+
func setupProducer() Producer {
180+
broker := os.Getenv("KAFKA_BROKER")
181+
if broker == "" {
182+
panic("missing KAFKA_BROKER environment")
183+
}
184+
185+
hostname, err := os.Hostname()
186+
if err != nil {
187+
hostname = os.Getenv("HOSTNAME")
162188
}
163189

164-
producer, err := streamclient.NewProducer(options)
190+
p, err := kafka.NewProducer(&kafka.ConfigMap{
191+
"client.id": hostname,
192+
"bootstrap.servers": broker,
193+
"partitioner": "murmur2",
194+
"compression.codec": "snappy",
195+
})
165196
if err != nil {
166-
logger.L.Fatal("Unable to initialize producer", zap.Error(err))
197+
panic(errors.Wrap(err, "failed to setup producer"))
167198
}
168199

169-
return producer
200+
return p
170201
}
171202

172203
func topicName(tableName string) string {

main_test.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import (
55
"os"
66
"testing"
77

8-
"github.com/blendle/go-streamprocessor/streamclient/inmem"
98
"github.com/buger/jsonparser"
9+
"github.com/confluentinc/confluent-kafka-go/kafka"
1010

1111
"github.com/blendle/pg2kafka/eventqueue"
1212
_ "github.com/lib/pq"
@@ -54,23 +54,19 @@ func TestFetchUnprocessedRecords(t *testing.T) {
5454
t.Fatalf("Error inserting events: %v", err)
5555
}
5656

57-
opts := func(c *inmem.Client) {
58-
c.ProducerTopic = "users"
57+
p := &mockProducer{
58+
messages: make([]*kafka.Message, 0),
5959
}
60-
s := inmem.NewStore()
61-
pt := s.NewTopic("users")
62-
c := inmem.NewClientWithStore(s, opts)
63-
p := c.NewProducer()
6460

6561
ProcessEvents(p, eq)
6662

6763
expected := 4
68-
actual := len(pt.Messages())
64+
actual := len(p.messages)
6965
if actual != expected {
7066
t.Fatalf("Unexpected number of messages produced. Expected %d, got %d", expected, actual)
7167
}
7268

73-
msg := pt.Messages()[0]
69+
msg := p.messages[0]
7470
email, err := jsonparser.GetString(msg.Value, "data", "email")
7571
if err != nil {
7672
t.Fatal(err)
@@ -89,7 +85,7 @@ func TestFetchUnprocessedRecords(t *testing.T) {
8985
t.Errorf("Expected %v, got %v", "fefc72b4-d8df-4039-9fb9-bfcb18066a2b", externalID)
9086
}
9187

92-
msg = pt.Messages()[3]
88+
msg = p.messages[3]
9389
email, err = jsonparser.GetString(msg.Value, "data", "email")
9490
if err != nil {
9591
t.Fatal(err)
@@ -99,8 +95,8 @@ func TestFetchUnprocessedRecords(t *testing.T) {
9995
t.Errorf("Data did not match. Expected %v, got %v", "[email protected]", email)
10096
}
10197

102-
if msg.Key != nil {
103-
t.Errorf("Expected key to be nil, got %v", msg.Key)
98+
if len(msg.Key) != 0 {
99+
t.Errorf("Expected empty key, got %v", msg.Key)
104100
}
105101
}
106102

@@ -178,3 +174,20 @@ func TestParseTopicNamespace(t *testing.T) {
178174
})
179175
}
180176
}
177+
178+
type mockProducer struct {
179+
messages []*kafka.Message
180+
}
181+
182+
func (p *mockProducer) Close() {
183+
}
184+
func (p *mockProducer) Flush(timeout int) int {
185+
return 0
186+
}
187+
func (p *mockProducer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
188+
p.messages = append(p.messages, msg)
189+
go func() {
190+
deliveryChan <- msg
191+
}()
192+
return nil
193+
}

script/build

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,4 @@
22
set -eu
33

44
GIT_COMMIT=$(git rev-parse --short @)
5-
export CGO_ENABLED=0
6-
7-
go build -ldflags "-X main.version=$GIT_COMMIT -s -extldflags -static" -a -installsuffix cgo .
5+
docker build -t "eu.gcr.io/bnl-blendle/pg2kafka:$GIT_COMMIT" .

0 commit comments

Comments
 (0)