Skip to content

Commit 58c9f55

Browse files
committed
wip
1 parent 8ac1cb7 commit 58c9f55

File tree

7 files changed

+621
-16
lines changed

7 files changed

+621
-16
lines changed

example/pubsub/go.mod

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
module github.com/redis/go-redis/example/pubsub
2+
3+
go 1.18
4+
5+
replace github.com/redis/go-redis/v9 => ../..
6+
7+
require github.com/redis/go-redis/v9 v9.11.0
8+
9+
require (
10+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
11+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
12+
)

example/pubsub/go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
2+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
3+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
4+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
5+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
6+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=

example/pubsub/main.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"time"
9+
10+
"github.com/redis/go-redis/v9"
11+
)
12+
13+
var ctx = context.Background()
14+
var consStopped = false
15+
16+
func main() {
17+
wg := &sync.WaitGroup{}
18+
rdb := redis.NewClient(&redis.Options{
19+
Addr: ":6379",
20+
})
21+
_ = rdb.FlushDB(ctx).Err()
22+
23+
go func() {
24+
for {
25+
time.Sleep(2 * time.Second)
26+
fmt.Printf("pool stats: %+v\n", rdb.PoolStats())
27+
}
28+
}()
29+
err := rdb.Ping(ctx).Err()
30+
if err != nil {
31+
panic(err)
32+
}
33+
if err := rdb.Set(ctx, "prods", "0", 0).Err(); err != nil {
34+
panic(err)
35+
}
36+
if err := rdb.Set(ctx, "cons", "0", 0).Err(); err != nil {
37+
panic(err)
38+
}
39+
if err := rdb.Set(ctx, "cntr", "0", 0).Err(); err != nil {
40+
panic(err)
41+
}
42+
if err := rdb.Set(ctx, "recs", "0", 0).Err(); err != nil {
43+
panic(err)
44+
}
45+
fmt.Println("cntr", rdb.Get(ctx, "cntr").Val())
46+
fmt.Println("recs", rdb.Get(ctx, "recs").Val())
47+
subCtx, cancelSubCtx := context.WithCancel(ctx)
48+
for i := 0; i < 10; i++ {
49+
wg.Add(1)
50+
go subscribe(subCtx, rdb, "test", i, wg)
51+
}
52+
time.Sleep(time.Second)
53+
cancelSubCtx()
54+
time.Sleep(time.Second)
55+
subCtx, cancelSubCtx = context.WithCancel(ctx)
56+
for i := 0; i < 10; i++ {
57+
if err := rdb.Incr(ctx, "prods").Err(); err != nil {
58+
panic(err)
59+
}
60+
wg.Add(1)
61+
go floodThePool(subCtx, rdb, wg)
62+
}
63+
64+
for i := 0; i < 100; i++ {
65+
if err := rdb.Incr(ctx, "cons").Err(); err != nil {
66+
panic(err)
67+
}
68+
wg.Add(1)
69+
go subscribe(subCtx, rdb, "test2", i, wg)
70+
}
71+
time.Sleep(10 * time.Second)
72+
fmt.Println("canceling")
73+
cancelSubCtx()
74+
wg.Wait()
75+
cntr, err := rdb.Get(ctx, "cntr").Result()
76+
recs, err := rdb.Get(ctx, "recs").Result()
77+
prods, err := rdb.Get(ctx, "prods").Result()
78+
cons, err := rdb.Get(ctx, "cons").Result()
79+
fmt.Printf("cntr: %s\n", cntr)
80+
fmt.Printf("recs: %s\n", recs)
81+
fmt.Printf("prods: %s\n", prods)
82+
fmt.Printf("cons: %s\n", cons)
83+
time.Sleep(2 * time.Second)
84+
}
85+
86+
func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
87+
defer wg.Done()
88+
for {
89+
select {
90+
case <-ctx.Done():
91+
fmt.Println("floodThePool stopping")
92+
consStopped = true
93+
return
94+
default:
95+
}
96+
err := rdb.Publish(ctx, "test2", "hello").Err()
97+
if err != nil {
98+
log.Println("publish error:", err)
99+
}
100+
101+
err = rdb.Incr(ctx, "cntr").Err()
102+
if err != nil {
103+
log.Println("incr error:", err)
104+
}
105+
time.Sleep(10 * time.Nanosecond)
106+
}
107+
}
108+
109+
func subscribe(ctx context.Context, rdb *redis.Client, topic string, subscriberId int, wg *sync.WaitGroup) {
110+
defer wg.Done()
111+
defer fmt.Printf("subscriber %d stopping\n", subscriberId)
112+
rec := rdb.Subscribe(ctx, topic)
113+
recChan := rec.Channel()
114+
for {
115+
select {
116+
case <-ctx.Done():
117+
rec.Close()
118+
if subscriberId == 199 {
119+
fmt.Printf("subscriber %d done\n", subscriberId)
120+
}
121+
return
122+
default:
123+
select {
124+
case <-ctx.Done():
125+
rec.Close()
126+
if subscriberId == 199 {
127+
fmt.Printf("subscriber %d done\n", subscriberId)
128+
}
129+
return
130+
case msg := <-recChan:
131+
err := rdb.Incr(ctx, "recs").Err()
132+
if err != nil {
133+
log.Println("incr error:", err)
134+
}
135+
if consStopped {
136+
fmt.Printf("subscriber %d received %s\n", subscriberId, msg.Payload)
137+
}
138+
if subscriberId == 199 {
139+
fmt.Printf("subscriber %d received %s\n", subscriberId, msg.Payload)
140+
}
141+
}
142+
}
143+
}
144+
}

internal/pool/conn.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Conn struct {
4545

4646
Inited bool
4747
pooled bool
48+
isPubSub bool
4849
createdAt time.Time
4950
expiresAt time.Time
5051

@@ -90,7 +91,7 @@ func NewConn(netConn net.Conn) *Conn {
9091
cn.newEndpointAtomic.Store("") // empty string initially
9192

9293
cn.rd = proto.NewReader(netConn)
93-
cn.bw = bufio.NewWriter(netConn)
94+
cn.bw = bufio.NewWriterSize(netConn, 1<<19)
9495
cn.wr = proto.NewWriter(cn.bw)
9596
cn.SetUsedAt(time.Now())
9697
return cn

internal/pool/pool.go

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ func (p *ConnPool) checkMinIdleConns() {
153153
p.idleConnsLen--
154154
p.connsMu.Unlock()
155155
}
156-
157156
p.freeTurn()
158157
}()
159158
default:
@@ -170,6 +169,9 @@ func (p *ConnPool) addIdleConn() error {
170169
if err != nil {
171170
return err
172171
}
172+
// Mark connection as usable after successful creation
173+
// This is essential for normal pool operations
174+
cn.SetUsable(true)
173175

174176
p.connsMu.Lock()
175177
defer p.connsMu.Unlock()
@@ -205,6 +207,9 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
205207
if err != nil {
206208
return nil, err
207209
}
210+
// Mark connection as usable after successful creation
211+
// This is essential for normal pool operations
212+
cn.SetUsable(true)
208213

209214
p.connsMu.Lock()
210215
defer p.connsMu.Unlock()
@@ -253,10 +258,6 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
253258
cn.expiresAt = noExpiration
254259
}
255260

256-
// Mark connection as usable after successful creation
257-
// This is essential for normal pool operations
258-
cn.SetUsable(true)
259-
260261
return cn, nil
261262
}
262263

@@ -299,19 +300,23 @@ func (p *ConnPool) getLastDialError() error {
299300
// The purpose of GetPubSub is just to increment the stats.PubSubCount counter.
300301
// The connection is still returned from the pool with Get().
301302
func (p *ConnPool) GetPubSub(ctx context.Context) (*Conn, error) {
302-
p.stats.PubSubCount++
303-
return p.Get(ctx)
303+
return p.getConn(ctx, true)
304304
}
305305

306306
// Get returns existed connection from the pool or creates a new one.
307307
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
308+
return p.getConn(ctx, false)
309+
}
310+
311+
// getConn returns a connection from the pool.
312+
func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
308313
if p.closed() {
309314
return nil, ErrClosed
310315
}
311316

312-
if err := p.waitTurn(ctx); err != nil {
313-
return nil, err
314-
}
317+
//if err := p.waitTurn(ctx); err != nil {
318+
//return nil, err
319+
// }
315320

316321
tries := 0
317322
now := time.Now()
@@ -326,7 +331,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
326331
p.connsMu.Unlock()
327332

328333
if err != nil {
329-
p.freeTurn()
334+
//p.freeTurn()
330335
return nil, err
331336
}
332337

@@ -350,17 +355,24 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
350355
}
351356

352357
atomic.AddUint32(&p.stats.Hits, 1)
358+
cn.isPubSub = isPubSub
359+
if isPubSub {
360+
atomic.AddUint32(&p.stats.PubSubCount, 1)
361+
}
353362
return cn, nil
354363
}
355364

356365
atomic.AddUint32(&p.stats.Misses, 1)
357366

358367
newcn, err := p.newConn(ctx, true)
359368
if err != nil {
360-
p.freeTurn()
369+
//p.freeTurn()
361370
return nil, err
362371
}
363-
372+
newcn.isPubSub = isPubSub
373+
if isPubSub {
374+
atomic.AddUint32(&p.stats.PubSubCount, 1)
375+
}
364376
return newcn, nil
365377
}
366378

@@ -465,6 +477,10 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
465477
shouldPool := true
466478
shouldRemove := false
467479
var err error
480+
if cn.isPubSub {
481+
p.Remove(ctx, cn, err)
482+
return
483+
}
468484

469485
// Fast path: cache processor reference to avoid repeated field access
470486
if processor := p.cfg.ConnectionProcessor; processor != nil {
@@ -507,7 +523,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
507523

508524
p.connsMu.Unlock()
509525

510-
p.freeTurn()
526+
//p.freeTurn()
511527

512528
if shouldCloseConn {
513529
_ = p.closeConn(cn)
@@ -543,6 +559,10 @@ func (p *ConnPool) removeConn(cn *Conn) {
543559
break
544560
}
545561
}
562+
if cn.isPubSub {
563+
// Decrement pubsub count if this is a pubsub connection
564+
atomic.AddUint32(&p.stats.PubSubCount, ^uint32(0))
565+
}
546566
atomic.AddUint32(&p.stats.StaleConns, 1)
547567
}
548568

0 commit comments

Comments
 (0)