Skip to content

Commit 2f691f2

Browse files
Added option to specify the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts. (#7)
1 parent 7e42e91 commit 2f691f2

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

subscriber.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ func main() {
9595
client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.")
9696
distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.")
9797
printMessages := flag.Bool("print-messages", false, "print messages.")
98+
dialTimeout := flag.Duration("redis-timeout", time.Second*300, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.")
9899
flag.Parse()
100+
99101
totalMessages = 0
100102
var nodes []radix.ClusterNode
101103
var nodesAddresses []string
@@ -108,7 +110,8 @@ func main() {
108110
opts = append(opts, radix.DialAuthPass(*password))
109111
}
110112
}
111-
113+
opts = append(opts, radix.DialTimeout(*dialTimeout))
114+
fmt.Printf("Using a redis connection, read, and write timeout of %v\n", *dialTimeout)
112115
if *test_time != 0 && *messages_per_channel_subscriber != 0 {
113116
log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )"))
114117
}
@@ -120,7 +123,7 @@ func main() {
120123
}
121124

122125
if strings.Compare(*client_output_buffer_limit_pubsub, "") != 0 {
123-
checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub)
126+
checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub, opts)
124127
}
125128

126129
stopChan := make(chan struct{})
@@ -289,9 +292,9 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
289292
return false, start, time.Since(start), totalMessages, messageRateTs
290293
}
291294

292-
func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string) {
295+
func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string, opts []radix.DialOpt) {
293296
for _, slot := range nodes {
294-
conn, err := radix.Dial("tcp", slot.Addr)
297+
conn, err := radix.Dial("tcp", slot.Addr, opts...)
295298
if err != nil {
296299
panic(err)
297300
}

0 commit comments

Comments
 (0)