Skip to content

Commit b3d632f

Browse files
Merge pull request #22 from redis-performance/readme.update
Added --clients flag. included example in readme.
2 parents f163666 + cf7de3e commit b3d632f

File tree

2 files changed

+89
-50
lines changed

2 files changed

+89
-50
lines changed

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,40 @@ Usage of ./pubsub-sub-bench:
6666
Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.
6767
-client-update-tick int
6868
client update tick. (default 1)
69+
-clients int
70+
Number of parallel connections. (default 50)
71+
-cpuprofile string
72+
write cpu profile to file
6973
-host string
7074
redis host. (default "127.0.0.1")
7175
-json-out-file string
7276
Name of json output file, if not set, will not print to json.
77+
-max-number-channels-per-subscriber int
78+
max number of channels to subscribe to, per connection. (default 1)
79+
-max-reconnect-interval int
80+
max reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.
7381
-messages int
7482
Number of total messages per subscriber per channel.
83+
-min-number-channels-per-subscriber int
84+
min number of channels to subscribe to, per connection. (default 1)
85+
-min-reconnect-interval int
86+
min reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.
87+
-mode string
88+
Subscribe mode. Either 'subscribe' or 'ssubscribe'. (default "subscribe")
7589
-oss-cluster-api-distribute-subscribers
7690
read cluster slots and distribute subscribers among them.
91+
-pool_size int
92+
Maximum number of socket connections per node.
7793
-port string
7894
redis port. (default "6379")
7995
-print-messages
8096
print messages.
97+
-rand-seed int
98+
Random deterministic seed. (default 12345)
99+
-redis-timeout duration
100+
determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts. (default 30s)
101+
-resp int
102+
redis command response protocol (2 - RESP 2, 3 - RESP 3) (default 2)
81103
-subscriber-prefix string
82104
prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum. (default "channel-")
83105
-subscribers-per-channel int
@@ -88,5 +110,23 @@ Usage of ./pubsub-sub-bench:
88110
Number of seconds to run the test, after receiving the first message.
89111
-user string
90112
Used to send ACL style 'AUTH username pass'. Needs -a.
113+
-verbose
114+
verbose print.
115+
-version
116+
print version and exit.
117+
```
118+
119+
### Example usage: create 10 subscribers that will subscribe to 2000 channels
120+
121+
Subscriber
91122

92123
```
124+
./pubsub-sub-bench --clients 10 --channel-maximum 2000 --channel-minimum 1 -min-number-channels-per-subscriber 2000 -max-number-channels-per-subscriber 2000
125+
```
126+
127+
Publisher
128+
129+
```
130+
memtier_benchmark --key-prefix "channel-" --key-maximum 2000 --key-minimum 1 --command "PUBLISH __key__ __data__" --test-time 60 --pipeline 10
131+
```
132+

subscriber.go

Lines changed: 49 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func main() {
153153
channel_minimum := flag.Int("channel-minimum", 1, "channel ID minimum value ( each channel has a dedicated thread ).")
154154
channel_maximum := flag.Int("channel-maximum", 100, "channel ID maximum value ( each channel has a dedicated thread ).")
155155
subscribers_per_channel := flag.Int("subscribers-per-channel", 1, "number of subscribers per channel.")
156+
clients := flag.Int("clients", 50, "Number of parallel connections.")
156157
min_channels_per_subscriber := flag.Int("min-number-channels-per-subscriber", 1, "min number of channels to subscribe to, per connection.")
157158
max_channels_per_subscriber := flag.Int("max-number-channels-per-subscriber", 1, "max number of channels to subscribe to, per connection.")
158159
min_reconnect_interval := flag.Int("min-reconnect-interval", 0, "min reconnect interval. if 0 disable (s)unsubscribe/(s)ubscribe.")
@@ -249,7 +250,6 @@ func main() {
249250
total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber
250251
subscriptions_per_node := total_subscriptions / nodeCount
251252

252-
log.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages))
253253
log.Println(fmt.Sprintf("Will use a subscriber prefix of: %s<channel id>", *subscribe_prefix))
254254

255255
if *poolSizePtr == 0 {
@@ -306,60 +306,59 @@ func main() {
306306
}
307307
totalCreatedClients := 0
308308
if strings.Compare(*subscribers_placement, "dense") == 0 {
309-
for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ {
310-
channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id)
311-
for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ {
312-
channels := []string{channel}
313-
n_channels_this_conn := 1
314-
if *max_channels_per_subscriber == *min_channels_per_subscriber {
315-
n_channels_this_conn = *max_channels_per_subscriber
316-
} else {
317-
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber
309+
for client_id := 1; client_id <= *clients; client_id++ {
310+
channels := []string{}
311+
n_channels_this_conn := 0
312+
if *max_channels_per_subscriber == *min_channels_per_subscriber {
313+
n_channels_this_conn = *max_channels_per_subscriber
314+
} else {
315+
n_channels_this_conn = rand.Intn(*max_channels_per_subscriber-*min_channels_per_subscriber) + *min_channels_per_subscriber
316+
}
317+
for channel_this_conn := 1; channel_this_conn <= n_channels_this_conn; channel_this_conn++ {
318+
new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum
319+
new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id)
320+
channels = append(channels, new_channel)
321+
}
322+
totalCreatedClients++
323+
subscriberName := fmt.Sprintf("subscriber#%d", client_id)
324+
var client *redis.Client
325+
var err error = nil
326+
ctx = context.Background()
327+
// In case of SSUBSCRIBE the node is associated the to the channel name
328+
if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true {
329+
firstChannel := channels[0]
330+
client, err = clusterClient.MasterForKey(ctx, firstChannel)
331+
if err != nil {
332+
log.Fatal(err)
318333
}
319-
for channel_this_conn := 1; channel_this_conn < n_channels_this_conn; channel_this_conn++ {
320-
new_channel_id := rand.Intn(*channel_maximum) + *channel_minimum
321-
new_channel := fmt.Sprintf("%s%d", *subscribe_prefix, new_channel_id)
322-
channels = append(channels, new_channel)
334+
if *verbose {
335+
log.Println(fmt.Sprintf("client %d is a CLUSTER client connected to %v. Subscriber name %s", totalCreatedClients, client.String(), subscriberName))
323336
}
324-
totalCreatedClients++
325-
subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id)
326-
var client *redis.Client
327-
var err error = nil
328-
ctx = context.Background()
329-
// In case of SSUBSCRIBE the node is associated the to the channel name
330-
if strings.Compare(*mode, "ssubscribe") == 0 && *distributeSubscribers == true {
331-
client, err = clusterClient.MasterForKey(ctx, channel)
332-
if err != nil {
333-
log.Fatal(err)
334-
}
335-
if *verbose {
336-
log.Println(fmt.Sprintf("client %d is a CLUSTER client connected to %v. Subscriber name %s", totalCreatedClients, client.String(), subscriberName))
337-
}
338-
} else {
339-
nodes_pos := channel_id % nodeCount
340-
addr := nodesAddresses[nodes_pos]
341-
client = nodeClients[nodes_pos]
342-
if *verbose {
343-
log.Println(fmt.Sprintf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName))
344-
}
345-
err = client.Ping(ctx).Err()
346-
if err != nil {
347-
log.Fatal(err)
348-
}
349-
}
350-
wg.Add(1)
351-
connectionReconnectInterval := 0
352-
if *max_reconnect_interval == *min_reconnect_interval {
353-
connectionReconnectInterval = *max_reconnect_interval
354-
} else {
355-
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval
337+
} else {
338+
nodes_pos := client_id % nodeCount
339+
addr := nodesAddresses[nodes_pos]
340+
client = nodeClients[nodes_pos]
341+
if *verbose {
342+
log.Println(fmt.Sprintf("client %d is a STANDALONE client connected to node %d (address %s). Subscriber name %s", totalCreatedClients, nodes_pos, addr, subscriberName))
356343
}
357-
if connectionReconnectInterval > 0 {
358-
log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName))
344+
err = client.Ping(ctx).Err()
345+
if err != nil {
346+
log.Fatal(err)
359347
}
360-
log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels))
361-
go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
362348
}
349+
wg.Add(1)
350+
connectionReconnectInterval := 0
351+
if *max_reconnect_interval == *min_reconnect_interval {
352+
connectionReconnectInterval = *max_reconnect_interval
353+
} else {
354+
connectionReconnectInterval = rand.Intn(*max_reconnect_interval-*min_reconnect_interval) + *min_reconnect_interval
355+
}
356+
if connectionReconnectInterval > 0 {
357+
log.Println(fmt.Sprintf("Using reconnection interval of %d milliseconds for subscriber: %s", connectionReconnectInterval, subscriberName))
358+
}
359+
log.Println(fmt.Sprintf("subscriber: %s. Total channels %d: %v", subscriberName, len(channels), channels))
360+
go subscriberRoutine(subscriberName, *mode, channels, *printMessages, connectionReconnectInterval, ctx, &wg, client)
361+
// }
363362
}
364363
}
365364

0 commit comments

Comments
 (0)