Skip to content

Commit af6a103

Browse files
committed
feat(push): reading optimization for Linux
Optimize the peeking on newly acquired connection on *unix. Use syscall to peek on the socket instead of blocking for a fixed amount of time. This won't work on Windows, hence the `MaybeHasData` will always return true on Windows and the client will have to block for a given time to actually peek on the socket. *Time to complete N HSET operations (individual commands)* | Batch Size | Before (total sec) | After (total sec) | Time Saved | % Faster | |------------|-------------------|------------------|------------|----------| | 100 ops | 0.0172 | 0.0133 | 0.0038 | **22.4%** | | 1K ops | 0.178 | 0.133 | 0.045 | **25.3%** | | 10K ops | 1.72 | 1.28 | 0.44 | **25.6%** | | 100K ops | 17.1 | 13.4 | 3.7 | **22.0%** |
1 parent 72c24d5 commit af6a103

File tree

6 files changed

+274
-27
lines changed

6 files changed

+274
-27
lines changed

hset_benchmark_test.go

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
package redis_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/redis/go-redis/v9"
10+
)
11+
12+
// HSET Benchmark Tests
13+
//
14+
// This file contains benchmark tests for Redis HSET operations with different scales:
15+
// 1, 10, 100, 1000, 10000, 100000 operations
16+
//
17+
// Prerequisites:
18+
// - Redis server running on localhost:6379
19+
// - No authentication required
20+
//
21+
// Usage:
22+
// go test -bench=BenchmarkHSET -v ./hset_benchmark_test.go
23+
// go test -bench=BenchmarkHSETPipelined -v ./hset_benchmark_test.go
24+
// go test -bench=. -v ./hset_benchmark_test.go # Run all benchmarks
25+
//
26+
// Example output:
27+
// BenchmarkHSET/HSET_1_operations-8 5000 250000 ns/op 1000000.00 ops/sec
28+
// BenchmarkHSET/HSET_100_operations-8 100 10000000 ns/op 100000.00 ops/sec
29+
//
30+
// The benchmarks test three different approaches:
31+
// 1. Individual HSET commands (BenchmarkHSET)
32+
// 2. Pipelined HSET commands (BenchmarkHSETPipelined)
33+
34+
// BenchmarkHSET benchmarks HSET operations with different scales
35+
func BenchmarkHSET(b *testing.B) {
36+
ctx := context.Background()
37+
38+
// Setup Redis client
39+
rdb := redis.NewClient(&redis.Options{
40+
Addr: "localhost:6379",
41+
DB: 0,
42+
})
43+
defer rdb.Close()
44+
45+
// Test connection
46+
if err := rdb.Ping(ctx).Err(); err != nil {
47+
b.Skipf("Redis server not available: %v", err)
48+
}
49+
50+
// Clean up before and after tests
51+
defer func() {
52+
rdb.FlushDB(ctx)
53+
}()
54+
55+
scales := []int{1, 10, 100, 1000, 10000, 100000}
56+
57+
for _, scale := range scales {
58+
b.Run(fmt.Sprintf("HSET_%d_operations", scale), func(b *testing.B) {
59+
benchmarkHSETOperations(b, rdb, ctx, scale)
60+
})
61+
}
62+
}
63+
64+
// benchmarkHSETOperations performs the actual HSET benchmark for a given scale
65+
func benchmarkHSETOperations(b *testing.B, rdb *redis.Client, ctx context.Context, operations int) {
66+
hashKey := fmt.Sprintf("benchmark_hash_%d", operations)
67+
68+
b.ResetTimer()
69+
b.StartTimer()
70+
totalTimes := []time.Duration{}
71+
72+
for i := 0; i < b.N; i++ {
73+
b.StopTimer()
74+
// Clean up the hash before each iteration
75+
rdb.Del(ctx, hashKey)
76+
b.StartTimer()
77+
78+
startTime := time.Now()
79+
// Perform the specified number of HSET operations
80+
for j := 0; j < operations; j++ {
81+
field := fmt.Sprintf("field_%d", j)
82+
value := fmt.Sprintf("value_%d", j)
83+
84+
err := rdb.HSet(ctx, hashKey, field, value).Err()
85+
if err != nil {
86+
b.Fatalf("HSET operation failed: %v", err)
87+
}
88+
}
89+
totalTimes = append(totalTimes, time.Now().Sub(startTime))
90+
}
91+
92+
// Stop the timer to calculate metrics
93+
b.StopTimer()
94+
95+
// Report operations per second
96+
opsPerSec := float64(operations*b.N) / b.Elapsed().Seconds()
97+
b.ReportMetric(opsPerSec, "ops/sec")
98+
99+
// Report average time per operation
100+
avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N)
101+
b.ReportMetric(float64(avgTimePerOp), "ns/op")
102+
// report average time in milliseconds from totalTimes
103+
avgTimePerOpMs := totalTimes[0].Milliseconds() / int64(len(totalTimes))
104+
b.ReportMetric(float64(avgTimePerOpMs), "ms")
105+
}
106+
107+
// BenchmarkHSETPipelined benchmarks HSET operations using pipelining for better performance
108+
func BenchmarkHSETPipelined(b *testing.B) {
109+
ctx := context.Background()
110+
111+
// Setup Redis client
112+
rdb := redis.NewClient(&redis.Options{
113+
Addr: "localhost:6379",
114+
DB: 0,
115+
})
116+
defer rdb.Close()
117+
118+
// Test connection
119+
if err := rdb.Ping(ctx).Err(); err != nil {
120+
b.Skipf("Redis server not available: %v", err)
121+
}
122+
123+
// Clean up before and after tests
124+
defer func() {
125+
rdb.FlushDB(ctx)
126+
}()
127+
128+
scales := []int{1, 10, 100, 1000, 10000, 100000}
129+
130+
for _, scale := range scales {
131+
b.Run(fmt.Sprintf("HSET_Pipelined_%d_operations", scale), func(b *testing.B) {
132+
benchmarkHSETPipelined(b, rdb, ctx, scale)
133+
})
134+
}
135+
}
136+
137+
// benchmarkHSETPipelined performs HSET benchmark using pipelining
138+
func benchmarkHSETPipelined(b *testing.B, rdb *redis.Client, ctx context.Context, operations int) {
139+
hashKey := fmt.Sprintf("benchmark_hash_pipelined_%d", operations)
140+
141+
b.ResetTimer()
142+
b.StartTimer()
143+
totalTimes := []time.Duration{}
144+
145+
for i := 0; i < b.N; i++ {
146+
b.StopTimer()
147+
// Clean up the hash before each iteration
148+
rdb.Del(ctx, hashKey)
149+
b.StartTimer()
150+
151+
startTime := time.Now()
152+
// Use pipelining for better performance
153+
pipe := rdb.Pipeline()
154+
155+
// Add all HSET operations to the pipeline
156+
for j := 0; j < operations; j++ {
157+
field := fmt.Sprintf("field_%d", j)
158+
value := fmt.Sprintf("value_%d", j)
159+
pipe.HSet(ctx, hashKey, field, value)
160+
}
161+
162+
// Execute all operations at once
163+
_, err := pipe.Exec(ctx)
164+
if err != nil {
165+
b.Fatalf("Pipeline execution failed: %v", err)
166+
}
167+
totalTimes = append(totalTimes, time.Now().Sub(startTime))
168+
}
169+
170+
b.StopTimer()
171+
172+
// Report operations per second
173+
opsPerSec := float64(operations*b.N) / b.Elapsed().Seconds()
174+
b.ReportMetric(opsPerSec, "ops/sec")
175+
176+
// Report average time per operation
177+
avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N)
178+
b.ReportMetric(float64(avgTimePerOp), "ns/op")
179+
// report average time in milliseconds from totalTimes
180+
avgTimePerOpMs := totalTimes[0].Milliseconds() / int64(len(totalTimes))
181+
b.ReportMetric(float64(avgTimePerOpMs), "ms")
182+
}
183+
184+
// add same tests but with RESP2
185+
func BenchmarkHSET_RESP2(b *testing.B) {
186+
ctx := context.Background()
187+
188+
// Setup Redis client
189+
rdb := redis.NewClient(&redis.Options{
190+
Addr: "localhost:6379",
191+
Password: "", // no password docs
192+
DB: 0, // use default DB
193+
Protocol: 2,
194+
})
195+
defer rdb.Close()
196+
197+
// Test connection
198+
if err := rdb.Ping(ctx).Err(); err != nil {
199+
b.Skipf("Redis server not available: %v", err)
200+
}
201+
202+
// Clean up before and after tests
203+
defer func() {
204+
rdb.FlushDB(ctx)
205+
}()
206+
207+
scales := []int{1, 10, 100, 1000, 10000, 100000}
208+
209+
for _, scale := range scales {
210+
b.Run(fmt.Sprintf("HSET_RESP2_%d_operations", scale), func(b *testing.B) {
211+
benchmarkHSETOperations(b, rdb, ctx, scale)
212+
})
213+
}
214+
}
215+
216+
func BenchmarkHSETPipelined_RESP2(b *testing.B) {
217+
ctx := context.Background()
218+
219+
// Setup Redis client
220+
rdb := redis.NewClient(&redis.Options{
221+
Addr: "localhost:6379",
222+
Password: "", // no password docs
223+
DB: 0, // use default DB
224+
Protocol: 2,
225+
})
226+
defer rdb.Close()
227+
228+
// Test connection
229+
if err := rdb.Ping(ctx).Err(); err != nil {
230+
b.Skipf("Redis server not available: %v", err)
231+
}
232+
233+
// Clean up before and after tests
234+
defer func() {
235+
rdb.FlushDB(ctx)
236+
}()
237+
238+
scales := []int{1, 10, 100, 1000, 10000, 100000}
239+
240+
for _, scale := range scales {
241+
b.Run(fmt.Sprintf("HSET_Pipelined_RESP2_%d_operations", scale), func(b *testing.B) {
242+
benchmarkHSETPipelined(b, rdb, ctx, scale)
243+
})
244+
}
245+
}

internal/pool/conn.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,13 @@ func (cn *Conn) Close() error {
113113
return cn.netConn.Close()
114114
}
115115

116+
// MaybeHasData tries to peek at the next byte in the socket without consuming it
117+
// This is used to check if there are push notifications available
118+
// Important: This will work on Linux, but not on Windows
119+
func (cn *Conn) MaybeHasData() bool {
120+
return maybeHasData(cn.netConn)
121+
}
122+
116123
func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
117124
tm := time.Now()
118125
cn.SetUsedAt(tm)

internal/pool/conn_check.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,8 @@ func connCheck(conn net.Conn) error {
5252

5353
return sysErr
5454
}
55+
56+
// maybeHasData checks if there is data in the socket without consuming it
57+
func maybeHasData(conn net.Conn) bool {
58+
return connCheck(conn) == errUnexpectedRead
59+
}

internal/pool/conn_check_dummy.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,8 @@ import "net"
77
func connCheck(conn net.Conn) error {
88
return nil
99
}
10+
11+
// since we can't check for data on the socket, we just assume there is some
12+
func maybeHasData(conn net.Conn) bool {
13+
return true
14+
}

push/processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
5757
replyType, err := rd.PeekReplyType()
5858
if err != nil {
5959
// No more data available or error reading
60+
// if timeout, it will be handled by the caller
6061
break
6162
}
6263

@@ -144,6 +145,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCt
144145
replyType, err := rd.PeekReplyType()
145146
if err != nil {
146147
// No more data available or error reading
148+
// if timeout, it will be handled by the caller
147149
break
148150
}
149151

@@ -176,7 +178,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCt
176178
func willHandleNotificationInClient(notificationType string) bool {
177179
switch notificationType {
178180
// Pub/Sub notifications - handled by pub/sub system
179-
case "message", // Regular pub/sub message
181+
case "message", // Regular pub/sub message
180182
"pmessage", // Pattern pub/sub message
181183
"subscribe", // Subscription confirmation
182184
"unsubscribe", // Unsubscription confirmation

0 commit comments

Comments
 (0)