Skip to content

Commit e9f32f0

Browse files
committed
wip
1 parent 58c9f55 commit e9f32f0

File tree

5 files changed

+68
-35
lines changed

5 files changed

+68
-35
lines changed

example/pubsub/main.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func main() {
6161
go floodThePool(subCtx, rdb, wg)
6262
}
6363

64-
for i := 0; i < 100; i++ {
64+
for i := 0; i < 500; i++ {
6565
if err := rdb.Incr(ctx, "cons").Err(); err != nil {
6666
panic(err)
6767
}
@@ -95,12 +95,14 @@ func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
9595
}
9696
err := rdb.Publish(ctx, "test2", "hello").Err()
9797
if err != nil {
98-
log.Println("publish error:", err)
98+
// noop
99+
//log.Println("publish error:", err)
99100
}
100101

101102
err = rdb.Incr(ctx, "cntr").Err()
102103
if err != nil {
103-
log.Println("incr error:", err)
104+
// noop
105+
//log.Println("incr error:", err)
104106
}
105107
time.Sleep(10 * time.Nanosecond)
106108
}

internal/pool/conn.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,14 +460,20 @@ func (cn *Conn) WithReader(
460460
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
461461
) error {
462462
if timeout >= 0 {
463+
// CRITICAL FIX: Revert to original working approach
464+
// Use direct netConn access instead of atomic getNetConn() which can return nil
465+
463466
// Use relaxed timeout if set, otherwise use provided timeout
464467
effectiveTimeout := cn.getEffectiveReadTimeout(timeout)
465468

466-
// Lock-free netConn access for better performance
467-
if netConn := cn.getNetConn(); netConn != nil {
468-
if err := netConn.SetReadDeadline(cn.deadline(ctx, effectiveTimeout)); err != nil {
469-
return err
470-
}
469+
// Get the connection directly from atomic storage
470+
netConn := cn.getNetConn()
471+
if netConn == nil {
472+
return fmt.Errorf("redis: connection not available")
473+
}
474+
475+
if err := netConn.SetReadDeadline(cn.deadline(ctx, effectiveTimeout)); err != nil {
476+
return err
471477
}
472478
}
473479

@@ -481,11 +487,16 @@ func (cn *Conn) WithWriter(
481487
// Use relaxed timeout if set, otherwise use provided timeout
482488
effectiveTimeout := cn.getEffectiveWriteTimeout(timeout)
483489

484-
// Lock-free netConn access for better performance
490+
// CRITICAL FIX: Always set write deadline, even if getNetConn() returns nil
491+
// This prevents write operations from hanging indefinitely
485492
if netConn := cn.getNetConn(); netConn != nil {
486493
if err := netConn.SetWriteDeadline(cn.deadline(ctx, effectiveTimeout)); err != nil {
487494
return err
488495
}
496+
} else {
497+
// If getNetConn() returns nil, we still need to respect the timeout
498+
// Return an error to prevent indefinite blocking
499+
return fmt.Errorf("redis: connection not available for write operation")
489500
}
490501
}
491502

internal/pool/pool.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package pool
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"log"
78
"net"
89
"sync"
@@ -102,7 +103,7 @@ type ConnPool struct {
102103
conns []*Conn
103104
idleConns []*Conn
104105

105-
poolSize int
106+
poolSize atomic.Int32
106107
idleConnsLen int
107108

108109
stats Stats
@@ -140,16 +141,16 @@ func (p *ConnPool) checkMinIdleConns() {
140141

141142
// Only create idle connections if we haven't reached the total pool size limit
142143
// MinIdleConns should be a subset of PoolSize, not additional connections
143-
for p.poolSize < p.cfg.PoolSize && p.idleConnsLen < p.cfg.MinIdleConns {
144+
for p.poolSize.Load() < int32(p.cfg.PoolSize) && p.idleConnsLen < p.cfg.MinIdleConns {
144145
select {
145146
case p.queue <- struct{}{}:
146-
p.poolSize++
147+
p.poolSize.Add(1)
147148
p.idleConnsLen++
148149
go func() {
149150
err := p.addIdleConn()
150151
if err != nil && err != ErrClosed {
151152
p.connsMu.Lock()
152-
p.poolSize--
153+
p.poolSize.Add(-1)
153154
p.idleConnsLen--
154155
p.connsMu.Unlock()
155156
}
@@ -197,7 +198,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
197198
}
198199

199200
p.connsMu.Lock()
200-
if p.cfg.MaxActiveConns > 0 && p.poolSize >= p.cfg.MaxActiveConns {
201+
if p.cfg.MaxActiveConns > 0 && p.poolSize.Load() >= int32(p.cfg.MaxActiveConns) {
201202
p.connsMu.Unlock()
202203
return nil, ErrPoolExhausted
203204
}
@@ -214,18 +215,20 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
214215
p.connsMu.Lock()
215216
defer p.connsMu.Unlock()
216217

217-
if p.cfg.MaxActiveConns > 0 && p.poolSize >= p.cfg.MaxActiveConns {
218+
if p.cfg.MaxActiveConns > 0 && p.poolSize.Load() >= int32(p.cfg.MaxActiveConns) {
218219
_ = cn.Close()
219220
return nil, ErrPoolExhausted
220221
}
221222

222223
p.conns = append(p.conns, cn)
223224
if pooled {
224225
// If pool is full remove the cn on next Put.
225-
if p.poolSize >= p.cfg.PoolSize {
226+
currentPoolSize := p.poolSize.Load()
227+
if currentPoolSize >= int32(p.cfg.PoolSize) {
228+
fmt.Printf("Conn %d poolSize (%d) >= cfg.PoolSize (%d), setting pooled to false\n", cn.GetID(), currentPoolSize, p.cfg.PoolSize)
226229
cn.pooled = false
227230
} else {
228-
p.poolSize++
231+
p.poolSize.Add(1)
229232
}
230233
}
231234

@@ -314,9 +317,12 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
314317
return nil, ErrClosed
315318
}
316319

317-
//if err := p.waitTurn(ctx); err != nil {
318-
//return nil, err
319-
// }
320+
// if it is not a pubsub connection, we need to wait for a turn
321+
if !isPubSub {
322+
if err := p.waitTurn(ctx); err != nil {
323+
return nil, err
324+
}
325+
}
320326

321327
tries := 0
322328
now := time.Now()
@@ -331,7 +337,10 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
331337
p.connsMu.Unlock()
332338

333339
if err != nil {
334-
//p.freeTurn()
340+
if !isPubSub {
341+
p.freeTurn()
342+
}
343+
335344
return nil, err
336345
}
337346

@@ -340,6 +349,7 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
340349
}
341350

342351
if !p.isHealthyConn(cn, now) {
352+
fmt.Printf("Conn %d is not healthy, closing...\n", cn.GetID())
343353
_ = p.CloseConn(cn)
344354
continue
345355
}
@@ -348,6 +358,7 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
348358
// Fast path: check processor existence once and cache the result
349359
if processor := p.cfg.ConnectionProcessor; processor != nil {
350360
if err := processor.ProcessConnectionOnGet(ctx, cn); err != nil {
361+
fmt.Printf("Conn %d failed processor on get, closing...\n", cn.GetID())
351362
// Failed to process connection, discard it
352363
_ = p.CloseConn(cn)
353364
continue
@@ -357,20 +368,25 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
357368
atomic.AddUint32(&p.stats.Hits, 1)
358369
cn.isPubSub = isPubSub
359370
if isPubSub {
371+
cn.pooled = false
372+
p.poolSize.Add(-1)
360373
atomic.AddUint32(&p.stats.PubSubCount, 1)
361374
}
362375
return cn, nil
363376
}
364377

365378
atomic.AddUint32(&p.stats.Misses, 1)
366379

367-
newcn, err := p.newConn(ctx, true)
380+
newcn, err := p.newConn(ctx, !isPubSub)
368381
if err != nil {
369-
//p.freeTurn()
382+
if !isPubSub {
383+
p.freeTurn()
384+
}
370385
return nil, err
371386
}
372-
newcn.isPubSub = isPubSub
387+
373388
if isPubSub {
389+
newcn.isPubSub = true
374390
atomic.AddUint32(&p.stats.PubSubCount, 1)
375391
}
376392
return newcn, nil
@@ -449,6 +465,8 @@ func (p *ConnPool) popIdle() (*Conn, error) {
449465
if cn.IsUsable() {
450466
p.idleConnsLen--
451467
break
468+
} else {
469+
fmt.Printf("Connection %d is not usable, retrying...\n", cn.GetID())
452470
}
453471

454472
// Connection is not usable, put it back in the pool
@@ -478,6 +496,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
478496
shouldRemove := false
479497
var err error
480498
if cn.isPubSub {
499+
fmt.Printf("Conn %d is pubsub, removing...\n", cn.GetID())
481500
p.Remove(ctx, cn, err)
482501
return
483502
}
@@ -492,19 +511,23 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
492511
}
493512
}
494513

514+
fmt.Printf("Conn %d shouldPool: %v, shouldRemove: %v\n", cn.GetID(), shouldPool, shouldRemove)
495515
// If processor says to remove the connection, do so
496516
if shouldRemove {
517+
fmt.Printf("Conn %d shouldRemove, removing...\n", cn.GetID())
497518
p.Remove(ctx, cn, nil)
498519
return
499520
}
500521

501522
// If processor says not to pool the connection, remove it
502523
if !shouldPool {
524+
fmt.Printf("Conn %d !shouldPool, removing...\n", cn.GetID())
503525
p.Remove(ctx, cn, nil)
504526
return
505527
}
506528

507529
if !cn.pooled {
530+
fmt.Printf("Conn %d !cn.pooled, removing...\n", cn.GetID())
508531
p.Remove(ctx, cn, nil)
509532
return
510533
}
@@ -517,15 +540,17 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
517540
p.idleConns = append(p.idleConns, cn)
518541
p.idleConnsLen++
519542
} else {
543+
fmt.Printf("Conn %d p.idleConnsLen >= p.cfg.MaxIdleConns, removing...\n", cn.GetID())
520544
p.removeConn(cn)
521545
shouldCloseConn = true
522546
}
523547

524548
p.connsMu.Unlock()
525549

526-
//p.freeTurn()
550+
p.freeTurn()
527551

528552
if shouldCloseConn {
553+
fmt.Printf("Conn %d shouldCloseConn, closing...\n", cn.GetID())
529554
_ = p.closeConn(cn)
530555
}
531556
}
@@ -552,7 +577,7 @@ func (p *ConnPool) removeConn(cn *Conn) {
552577
if c == cn {
553578
p.conns = append(p.conns[:i], p.conns[i+1:]...)
554579
if cn.pooled {
555-
p.poolSize--
580+
p.poolSize.Add(-1)
556581
// Immediately check for minimum idle connections when a pooled connection is removed
557582
p.checkMinIdleConns()
558583
}
@@ -634,7 +659,7 @@ func (p *ConnPool) Close() error {
634659
}
635660
}
636661
p.conns = nil
637-
p.poolSize = 0
662+
p.poolSize.Store(0)
638663
p.idleConns = nil
639664
p.idleConnsLen = 0
640665
p.connsMu.Unlock()

pubsub.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,12 @@ func (c *PubSub) releaseConn(ctx context.Context, cn *pool.Conn, err error, allo
164164
}
165165

166166
if !cn.IsUsable() || cn.ShouldHandoff() {
167+
fmt.Printf("Connection is not usable, reconnecting...\n")
167168
c.reconnect(ctx, fmt.Errorf("pubsub: connection is not usable"))
168169
}
169170

170171
if isBadConn(err, allowTimeout, c.opt.Addr) {
172+
fmt.Printf("Bad connection, reconnecting...\n")
171173
c.reconnect(ctx, err)
172174
}
173175
}

redis.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
479479
}
480480

481481
// Set the connection initialization function for potential reconnections
482-
cn.SetInitConnFunc(c.createInitConnFunc())
482+
cn.SetInitConnFunc(c.initConn)
483483

484484
return nil
485485
}
@@ -624,13 +624,6 @@ func (c *baseClient) context(ctx context.Context) context.Context {
624624
return context.Background()
625625
}
626626

627-
// createInitConnFunc creates a connection initialization function that can be used for reconnections.
628-
func (c *baseClient) createInitConnFunc() func(context.Context, *pool.Conn) error {
629-
return func(ctx context.Context, cn *pool.Conn) error {
630-
return c.initConn(ctx, cn)
631-
}
632-
}
633-
634627
// Close closes the client, releasing any open resources.
635628
//
636629
// It is rare to Close a Client, as the Client is meant to be

0 commit comments

Comments
 (0)