Skip to content

Commit f163666

Browse files
Reconnection Improvements, Channel Metrics, and Random Seed Support (#21)
* Dynamic Reconnect and Multi-Channel Support * updating go-release-action to the latest * Reconnection improvements and extra metrics
1 parent 4bb4fd6 commit f163666

File tree

1 file changed

+49
-22
lines changed

1 file changed

+49
-22
lines changed

subscriber.go

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
)
3131

3232
var totalMessages uint64
33+
var totalSubscribedChannels int64
34+
var totalConnects uint64
3335
var clusterSlicesMu sync.Mutex
3436

3537
type testResult struct {
@@ -48,41 +50,58 @@ type testResult struct {
4850
Addresses []string `json:"Addresses"`
4951
}
5052

51-
func subscriberRoutine(mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) {
53+
func subscriberRoutine(clientName, mode string, channels []string, printMessages bool, connectionReconnectInterval int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client) {
5254
// Tell the caller we've stopped
5355
defer wg.Done()
5456
var reconnectTicker *time.Ticker
5557
if connectionReconnectInterval > 0 {
56-
reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Second)
58+
reconnectTicker = time.NewTicker(time.Duration(connectionReconnectInterval) * time.Millisecond)
5759
defer reconnectTicker.Stop()
5860
} else {
5961
reconnectTicker = time.NewTicker(1 * time.Second)
6062
reconnectTicker.Stop()
6163
}
6264

6365
var pubsub *redis.PubSub
66+
nChannels := len(channels)
6467

6568
// Helper function to handle subscription based on mode
6669
subscribe := func() {
6770
if pubsub != nil {
68-
// Unsubscribe based on mode before re-subscribing
69-
if mode == "ssubscribe" {
70-
if err := pubsub.SUnsubscribe(ctx, channels...); err != nil {
71-
fmt.Printf("Error during SUnsubscribe: %v\n", err)
71+
if nChannels > 1 {
72+
// Unsubscribe based on mode before re-subscribing
73+
if mode == "ssubscribe" {
74+
if err := pubsub.SUnsubscribe(ctx, channels[1:]...); err != nil {
75+
fmt.Printf("Error during SUnsubscribe: %v\n", err)
76+
}
77+
pubsub.Close()
78+
atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:])))
79+
pubsub = client.SSubscribe(ctx, channels[1:]...)
80+
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:])))
81+
} else {
82+
if err := pubsub.Unsubscribe(ctx, channels[1:]...); err != nil {
83+
fmt.Printf("Error during Unsubscribe: %v\n", err)
84+
pubsub.Close()
85+
atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels[1:])))
86+
pubsub = client.Subscribe(ctx, channels[1:]...)
87+
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels[1:])))
88+
}
7289
}
90+
atomic.AddUint64(&totalConnects, 1)
7391
} else {
74-
if err := pubsub.Unsubscribe(ctx, channels...); err != nil {
75-
fmt.Printf("Error during Unsubscribe: %v\n", err)
76-
}
92+
log.Println(fmt.Sprintf("Skipping (S)UNSUBSCRIBE given client %s had only one channel subscribed in this connection: %v.", clientName, channels))
7793
}
78-
pubsub.Close()
79-
}
80-
switch mode {
81-
case "ssubscribe":
82-
pubsub = client.SSubscribe(ctx, channels...)
83-
default:
84-
pubsub = client.Subscribe(ctx, channels...)
94+
} else {
95+
switch mode {
96+
case "ssubscribe":
97+
pubsub = client.SSubscribe(ctx, channels...)
98+
default:
99+
pubsub = client.Subscribe(ctx, channels...)
100+
}
101+
atomic.AddInt64(&totalSubscribedChannels, int64(len(channels)))
102+
atomic.AddUint64(&totalConnects, 1)
85103
}
104+
86105
}
87106

88107
subscribe()
@@ -142,6 +161,7 @@ func main() {
142161
json_out_file := flag.String("json-out-file", "", "Name of json output file, if not set, will not print to json.")
143162
client_update_tick := flag.Int("client-update-tick", 1, "client update tick.")
144163
test_time := flag.Int("test-time", 0, "Number of seconds to run the test, after receiving the first message.")
164+
randSeed := flag.Int64("rand-seed", 12345, "Random deterministic seed.")
145165
subscribe_prefix := flag.String("subscriber-prefix", "channel-", "prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum.")
146166
client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.")
147167
distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.")
@@ -167,6 +187,8 @@ func main() {
167187
log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )"))
168188
}
169189
log.Println(fmt.Sprintf("pubsub-sub-bench (git_sha1:%s%s)", git_sha, git_dirty_str))
190+
log.Println(fmt.Sprintf("using random seed:%d", *randSeed))
191+
rand.Seed(*randSeed)
170192

171193
ctx := context.Background()
172194
nodeCount := 0
@@ -292,7 +314,7 @@ func main() {
292314
if *max_channels_per_subscriber == *min_channels_per_subscriber {
293315
n_channels_this_conn = *max_channels_per_subscriber
294316
} else {
295-
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber - *min_channels_per_subscriber)
317+
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber
296318
}
297319
for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ {
298320
new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum
@@ -330,12 +352,13 @@ func main() {
330352
if *max_reconnect_interval == *min_reconnect_interval {
331353
connectionReconnectInterval = *max_reconnect_interval
332354
} else {
333-
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *max_reconnect_interval
355+
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval
334356
}
335357
if connectionReconnectInterval > 0 {
336-
log.Println(fmt.Sprintf("Using reconnection interval of %d for subscriber: %s", connectionReconnectInterval, subscriberName))
358+
log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName))
337359
}
338-
go subscriberRoutine(*mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
360+
log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels))
361+
go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
339362
}
340363
}
341364
}
@@ -431,10 +454,11 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
431454
start := time.Now()
432455
prevTime := time.Now()
433456
prevMessageCount := uint64(0)
457+
prevConnectCount := uint64(0)
434458
messageRateTs := []float64{}
435459

436460
w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight)
437-
fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \t"))
461+
fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \tConnect Rate \tActive subscriptions\t"))
438462
fmt.Fprint(w, "\n")
439463
w.Flush()
440464
for {
@@ -444,16 +468,19 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
444468
now := time.Now()
445469
took := now.Sub(prevTime)
446470
messageRate := float64(totalMessages-prevMessageCount) / float64(took.Seconds())
471+
connectRate := float64(totalConnects-prevConnectCount) / float64(took.Seconds())
472+
447473
if prevMessageCount == 0 && totalMessages != 0 {
448474
start = time.Now()
449475
}
450476
if totalMessages != 0 {
451477
messageRateTs = append(messageRateTs, messageRate)
452478
}
453479
prevMessageCount = totalMessages
480+
prevConnectCount = totalConnects
454481
prevTime = now
455482

456-
fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t", time.Since(start).Seconds(), totalMessages, messageRate))
483+
fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t%.2f\t%d\t", time.Since(start).Seconds(), totalMessages, messageRate, connectRate, totalSubscribedChannels))
457484
fmt.Fprint(w, "\r\n")
458485
w.Flush()
459486
if message_limit > 0 && totalMessages >= uint64(message_limit) {

0 commit comments

Comments
 (0)