Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extra/rediscensus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ require (
)

retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
v9.5.3 // This version was accidentally released.
)
2 changes: 1 addition & 1 deletion extra/rediscmd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ require (
)

retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
v9.5.3 // This version was accidentally released.
)
2 changes: 1 addition & 1 deletion extra/redisotel/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ require (
)

retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
v9.5.3 // This version was accidentally released.
)
2 changes: 1 addition & 1 deletion extra/redisprometheus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ require (
)

retract (
v9.5.3 // This version was accidentally released.
v9.7.2 // This version was accidentally released.
v9.5.3 // This version was accidentally released.
)
36 changes: 32 additions & 4 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -571,6 +572,9 @@ type channel struct {
chanSize int
chanSendTimeout time.Duration
checkInterval time.Duration

subscriptions map[string]struct{}
subscriptionsMu sync.RWMutex
}

func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
Expand All @@ -580,6 +584,8 @@ func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel {
chanSize: 100,
chanSendTimeout: time.Minute,
checkInterval: 3 * time.Second,

subscriptions: make(map[string]struct{}),
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -618,6 +624,20 @@ func (c *channel) initHealthCheck() {
}()
}

// Helper function to format subscription information
func (c *channel) getSubscriptionInfo() string {
if len(c.subscriptions) == 0 {
return "none"
}

subs := make([]string, 0, len(c.subscriptions))
for sub := range c.subscriptions {
subs = append(subs, sub)
}
sort.Strings(subs) // Sort for consistent output
return strings.Join(subs, ", ")
}

// initMsgChan must be in sync with initAllChan.
func (c *channel) initMsgChan() {
ctx := context.TODO()
Expand Down Expand Up @@ -663,9 +683,13 @@ func (c *channel) initMsgChan() {
<-timer.C
}
case <-timer.C:
c.subscriptionsMu.RLock()
subInfo := c.getSubscriptionInfo()
c.subscriptionsMu.RUnlock()

internal.Logger.Printf(
ctx, "redis: %s channel is full for %s (message is dropped)",
c, c.chanSendTimeout)
ctx, "redis: Channel is full for %s (message is dropped), subscriptions: %s",
c.chanSendTimeout, subInfo)
}
default:
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
Expand Down Expand Up @@ -717,9 +741,13 @@ func (c *channel) initAllChan() {
<-timer.C
}
case <-timer.C:
c.subscriptionsMu.RLock()
subInfo := c.getSubscriptionInfo()
c.subscriptionsMu.RUnlock()

internal.Logger.Printf(
ctx, "redis: %s channel is full for %s (message is dropped)",
c, c.chanSendTimeout)
ctx, "redis: Channel is full for %s (message is dropped), subscriptions: %s",
c.chanSendTimeout, subInfo)
}
default:
internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)
Expand Down
Loading