Skip to content

Commit 3437e8e

Browse files
Change from radix/v4 to go-redis due to high lock overhead on EncodeDecode. Current best performance vs all previous versions (#14)
* Enabled pprof cpuprofile collection * Avoid context locking on pubsub due to PingInterval (disabled it) * Using go-redis for SUBSCRIBE flow
1 parent 3c7faf0 commit 3437e8e

File tree

2 files changed

+29
-41
lines changed

2 files changed

+29
-41
lines changed

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,8 @@ Thumbs.db
9090

9191
# Coverage Results #
9292
####################
93-
coverage.txt
93+
coverage.txt
94+
95+
# Profiler Results #
96+
####################
97+
*.pprof

subscriber.go

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"context"
55
"encoding/json"
6-
"errors"
76
"flag"
87
"fmt"
98
radix "github.com/mediocregopher/radix/v4"
@@ -12,6 +11,7 @@ import (
1211
"log"
1312
"os"
1413
"os/signal"
14+
"runtime/pprof"
1515
"strings"
1616
"sync"
1717
"sync/atomic"
@@ -40,14 +40,14 @@ type testResult struct {
4040
func subscriberRoutine(addr string, mode, subscriberName string, channel string, printMessages bool, ctx context.Context, wg *sync.WaitGroup, opts radix.Dialer, protocolVersion int) {
4141
// tell the caller we've stopped
4242
defer wg.Done()
43+
client := redis.NewClient(&redis.Options{
44+
Addr: addr,
45+
Password: opts.AuthPass,
46+
ClientName: subscriberName,
47+
ProtocolVersion: protocolVersion,
48+
})
4349
switch mode {
4450
case "ssubscribe":
45-
client := redis.NewClient(&redis.Options{
46-
Addr: addr,
47-
Password: opts.AuthPass,
48-
ClientName: subscriberName,
49-
ProtocolVersion: protocolVersion,
50-
})
5151
spubsub := client.SSubscribe(ctx, channel)
5252
defer spubsub.Close()
5353
for {
@@ -64,53 +64,26 @@ func subscriberRoutine(addr string, mode, subscriberName string, channel string,
6464
case "subscribe":
6565
fallthrough
6666
default:
67-
_, _, ps, _ := bootstrapPubSub(addr, subscriberName, channel, opts)
68-
defer ps.Close()
67+
pubsub := client.Subscribe(ctx, channel)
68+
defer pubsub.Close()
6969
for {
70-
msg, err := ps.Next(ctx)
71-
if errors.Is(err, context.Canceled) {
72-
break
73-
} else if err != nil {
70+
msg, err := pubsub.ReceiveMessage(ctx)
71+
if err != nil {
7472
panic(err)
7573
}
7674
if printMessages {
77-
fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Message))
75+
fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload))
7876
}
7977
atomic.AddUint64(&totalMessages, 1)
8078
}
81-
82-
}
83-
84-
}
85-
86-
func bootstrapPubSub(addr string, subscriberName string, channel string, opts radix.Dialer) (radix.Conn, error, radix.PubSubConn, *time.Ticker) {
87-
// Create a normal redis connection
88-
ctx := context.Background()
89-
conn, err := opts.Dial(ctx, "tcp", addr)
90-
91-
if err != nil {
92-
log.Fatal(err)
93-
}
94-
95-
err = conn.Do(ctx, radix.FlatCmd(nil, "CLIENT", "SETNAME", subscriberName))
96-
if err != nil {
97-
log.Fatal(err)
98-
}
99-
100-
// Pass that connection into PubSub, conn should never get used after this
101-
ps := radix.PubSubConfig{}.New(conn)
102-
103-
err = ps.Subscribe(ctx, channel)
104-
if err != nil {
105-
log.Fatal(err)
10679
}
10780

108-
return conn, err, ps, nil
10981
}
11082

11183
func main() {
11284
host := flag.String("host", "127.0.0.1", "redis host.")
11385
port := flag.String("port", "6379", "redis port.")
86+
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file")
11487
password := flag.String("a", "", "Password for Redis Auth.")
11588
mode := flag.String("mode", "subscribe", "Subscribe mode. Either 'subscribe' or 'ssubscribe'.")
11689
username := flag.String("user", "", "Used to send ACL style 'AUTH username pass'. Needs -a.")
@@ -186,6 +159,13 @@ func main() {
186159
total_subscriptions := total_channels * *subscribers_per_channel
187160
total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber
188161
fmt.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages))
162+
if *cpuprofile != "" {
163+
f, err := os.Create(*cpuprofile)
164+
if err != nil {
165+
log.Fatal(err)
166+
}
167+
pprof.StartCPUProfile(f)
168+
}
189169

190170
if strings.Compare(*subscribers_placement, "dense") == 0 {
191171
for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ {
@@ -210,6 +190,10 @@ func main() {
210190
closed, start_time, duration, totalMessages, messageRateTs := updateCLI(tick, c, total_messages, w, *test_time)
211191
messageRate := float64(totalMessages) / float64(duration.Seconds())
212192

193+
if *cpuprofile != "" {
194+
pprof.StopCPUProfile()
195+
}
196+
213197
fmt.Fprint(w, fmt.Sprintf("#################################################\nTotal Duration %f Seconds\nMessage Rate %f\n#################################################\n", duration.Seconds(), messageRate))
214198
fmt.Fprint(w, "\r\n")
215199
w.Flush()

0 commit comments

Comments
 (0)