Skip to content

Commit dadcdcf

Browse files
committed
fix: Blocking commands respect canceled context
1 parent 345ad75 commit dadcdcf

File tree

6 files changed

+127
-6
lines changed

6 files changed

+127
-6
lines changed

cluster.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,7 +1291,19 @@ func (c *ClusterClient) processPipelineNode(
12911291
defer func() {
12921292
node.Client.releaseConn(ctx, cn, processErr)
12931293
}()
1294-
processErr = c.processPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1294+
1295+
errCh := make(chan error, 1)
1296+
1297+
go func() {
1298+
errCh <- c.processPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1299+
}()
1300+
1301+
select {
1302+
case processErr = <-errCh:
1303+
case <-ctx.Done():
1304+
_ = cn.Close()
1305+
processErr = ctx.Err()
1306+
}
12951307

12961308
return processErr
12971309
})
@@ -1472,7 +1484,19 @@ func (c *ClusterClient) processTxPipelineNode(
14721484
defer func() {
14731485
node.Client.releaseConn(ctx, cn, processErr)
14741486
}()
1475-
processErr = c.processTxPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1487+
1488+
errCh := make(chan error, 1)
1489+
1490+
go func() {
1491+
errCh <- c.processTxPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1492+
}()
1493+
1494+
select {
1495+
case processErr = <-errCh:
1496+
case <-ctx.Done():
1497+
_ = cn.Close()
1498+
processErr = ctx.Err()
1499+
}
14761500

14771501
return processErr
14781502
})

commands_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4843,6 +4843,24 @@ var _ = Describe("Commands", func() {
48434843
Expect(err).To(Equal(redis.Nil))
48444844
})
48454845

4846+
Describe("canceled context", func() {
4847+
It("should unblock XRead", func() {
4848+
ctx2, cancel := context.WithCancel(ctx)
4849+
errCh := make(chan error, 1)
4850+
go func() {
4851+
errCh <- client.XRead(ctx2, &redis.XReadArgs{
4852+
Streams: []string{"stream", "$"},
4853+
}).Err()
4854+
}()
4855+
4856+
var gotErr error
4857+
Consistently(errCh).ShouldNot(Receive(&gotErr), "Received %v", gotErr)
4858+
cancel()
4859+
Eventually(errCh).Should(Receive(&gotErr))
4860+
Expect(gotErr).To(HaveOccurred())
4861+
})
4862+
})
4863+
48464864
Describe("group", func() {
48474865
BeforeEach(func() {
48484866
err := client.XGroupCreate(ctx, "stream", "group", "0").Err()
@@ -5023,6 +5041,26 @@ var _ = Describe("Commands", func() {
50235041
Expect(err).NotTo(HaveOccurred())
50245042
Expect(n).To(Equal(int64(2)))
50255043
})
5044+
5045+
Describe("canceled context", func() {
5046+
It("should unblock XReadGroup", func() {
5047+
ctx2, cancel := context.WithCancel(ctx)
5048+
errCh := make(chan error, 1)
5049+
go func() {
5050+
errCh <- client.XReadGroup(ctx2, &redis.XReadGroupArgs{
5051+
Group: "group",
5052+
Consumer: "consumer",
5053+
Streams: []string{"stream", ">"},
5054+
}).Err()
5055+
}()
5056+
5057+
var gotErr error
5058+
Consistently(errCh).ShouldNot(Receive(&gotErr), "Received %v", gotErr)
5059+
cancel()
5060+
Eventually(errCh).Should(Receive(&gotErr))
5061+
Expect(gotErr).To(HaveOccurred())
5062+
})
5063+
})
50265064
})
50275065

50285066
Describe("xinfo", func() {

internal_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,4 +351,21 @@ var _ = Describe("withConn", func() {
351351
Expect(newConn).NotTo(Equal(conn))
352352
Expect(client.connPool.Len()).To(Equal(1))
353353
})
354+
355+
It("should remove the connection from the pool if the context is canceled", func() {
356+
var conn *pool.Conn
357+
358+
ctx2, cancel := context.WithCancel(ctx)
359+
cancel()
360+
361+
client.withConn(ctx2, func(ctx context.Context, c *pool.Conn) error {
362+
conn = c
363+
return nil
364+
})
365+
366+
newConn, err := client.connPool.Get(ctx)
367+
Expect(err).To(BeNil())
368+
Expect(newConn).NotTo(Equal(conn))
369+
Expect(client.connPool.Len()).To(Equal(1))
370+
})
354371
})

pubsub.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,9 +432,19 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
432432
return nil, err
433433
}
434434

435-
err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error {
436-
return c.cmd.readReply(rd)
437-
})
435+
errCh := make(chan error, 1)
436+
437+
go func() {
438+
errCh <- cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error {
439+
return c.cmd.readReply(rd)
440+
})
441+
}()
442+
443+
select {
444+
case err = <-errCh:
445+
case <-ctx.Done():
446+
err = ctx.Err()
447+
}
438448

439449
c.releaseConnWithLock(ctx, cn, err, timeout > 0)
440450

pubsub_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redis_test
22

33
import (
4+
"context"
45
"io"
56
"net"
67
"sync"
@@ -567,4 +568,24 @@ var _ = Describe("PubSub", func() {
567568
Expect(msg.Channel).To(Equal("mychannel"))
568569
Expect(msg.Payload).To(Equal(text))
569570
})
571+
572+
Describe("canceled context", func() {
573+
It("should unblock ReceiveMessage", func() {
574+
pubsub := client.Subscribe(ctx, "mychannel")
575+
defer pubsub.Close()
576+
577+
ctx2, cancel := context.WithCancel(ctx)
578+
errCh := make(chan error, 1)
579+
go func() {
580+
_, err := pubsub.ReceiveMessage(ctx2)
581+
errCh <- err
582+
}()
583+
584+
var gotErr error
585+
Consistently(errCh).ShouldNot(Receive(&gotErr), "Received %v", gotErr)
586+
cancel()
587+
Eventually(errCh).Should(Receive(&gotErr))
588+
Expect(gotErr).To(HaveOccurred())
589+
})
590+
})
570591
})

redis.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,18 @@ func (c *baseClient) withConn(
347347
c.releaseConn(ctx, cn, fnErr)
348348
}()
349349

350-
fnErr = fn(ctx, cn)
350+
errCh := make(chan error, 1)
351+
352+
go func() {
353+
errCh <- fn(ctx, cn)
354+
}()
355+
356+
select {
357+
case fnErr = <-errCh:
358+
case <-ctx.Done():
359+
_ = c.connPool.CloseConn(cn)
360+
fnErr = ctx.Err()
361+
}
351362

352363
return fnErr
353364
}

0 commit comments

Comments
 (0)