From 0761b27368341764639c35be7081eea2642a5468 Mon Sep 17 00:00:00 2001 From: monkey92t Date: Wed, 5 Apr 2023 23:46:58 +0800 Subject: [PATCH 1/2] feat: cache init Signed-off-by: monkey92t --- cache.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 cache.go diff --git a/cache.go b/cache.go new file mode 100644 index 000000000..73145daf8 --- /dev/null +++ b/cache.go @@ -0,0 +1,107 @@ +package redis + +import ( + "context" + "time" + + "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/proto" +) + +type Cache interface{} + +type cache struct { + // cluster? sentinel? + conn *Conn +} + +func newCache() Cache { + // ? + return &cache{} +} + +// ------------------------------------------------------------------------------------------ + +// extension method + +func (c *Conn) readReply(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error { + return c.withConn(ctx, func(ctx context.Context, conn *pool.Conn) error { + return conn.WithReader(ctx, timeout, fn) + }) +} + +// ------------------------------------------------------------------------------------------ + +// Client-side caching command. + +type trackingArgs struct { + redirect int + prefixes []string + broadcast bool + optIn bool + optOut bool + noLoop bool +} + +func (c *cache) clientTracking(ctx context.Context, t *trackingArgs) *StringCmd { + args := make([]any, 0, 7+len(t.prefixes)) + args = append(args, "CLIENT", "TRACKING", "ON") + if t.redirect > 0 { + args = append(args, "REDIRECT", t.redirect) + } + if len(t.prefixes) > 0 { + for _, prefix := range t.prefixes { + args = append(args, "PREFIX", prefix) + } + } + if t.optIn { + args = append(args, "OPTIN") + } + if t.optOut { + args = append(args, "OPTOUT") + } + if t.noLoop { + args = append(args, "NOLOOP") + } + cmd := NewStringCmd(ctx, args...) + _ = c.conn.Process(ctx, cmd) + return cmd +} + +func (c *cache) trackingClose(ctx context.Context) error { + return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "TRACKING", "OFF")) +} + +func (c *cache) cachingYes(ctx context.Context) error { + return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "CACHING", "YES")) +} + +func (c *cache) cachingNo(ctx context.Context) error { + return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "CACHING", "NO")) +} + +// ------------------------------------------------------------------------------------ + +// invalidatePush check if the data packet is a key-invalidate push and return the invalid key. +func (c *cache) invalidatePush(v []any) ([]string, bool) { + if len(v) != 2 { + return nil, false + } + if s, ok := v[0].(string); !ok || s != "invalidate" { + return nil, false + } + slice, ok := v[1].([]any) + if !ok { + return nil, false + } + + keys := make([]string, 0, len(slice)) + for i := 0; i < len(slice); i++ { + if s, ok := slice[i].(string); ok { + keys = append(keys, s) + } + } + return keys, true +} + +// ------------------------------------- Broadcasting ------------------------------------- From e9158aaee52bc60845ce9567b7ce53f3fd20e3aa Mon Sep 17 00:00:00 2001 From: monkey92t Date: Mon, 24 Apr 2023 22:16:43 +0800 Subject: [PATCH 2/2] add Broadcasting listen and read push Signed-off-by: monkey92t --- cache.go | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 171 insertions(+), 15 deletions(-) diff --git a/cache.go b/cache.go index 73145daf8..fbca1501a 100644 --- a/cache.go +++ b/cache.go @@ -2,8 +2,13 @@ package redis import ( "context" + "fmt" + "net" + "strconv" + "sync/atomic" "time" + "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" ) @@ -11,8 +16,13 @@ import ( type Cache interface{} type cache struct { + client *Client + // cluster? sentinel? - conn *Conn + conn *Conn + prefix []string + + closed int32 // atomic } func newCache() Cache { @@ -82,26 +92,172 @@ func (c *cache) cachingNo(ctx context.Context) error { // ------------------------------------------------------------------------------------ -// invalidatePush check if the data packet is a key-invalidate push and return the invalid key. -func (c *cache) invalidatePush(v []any) ([]string, bool) { - if len(v) != 2 { - return nil, false +// readInvalidate To read the expired message push from redis-server, +// we only read for invalidate messages, and consider any other data that is read as an error. +func (c *cache) readInvalidate(rd *proto.Reader) ([]string, error) { + line, err := rd.ReadLine() + if err != nil { + return nil, err } - if s, ok := v[0].(string); !ok || s != "invalidate" { - return nil, false + + if line[0] != proto.RespPush { + return nil, fmt.Errorf("invalid data-%s", string(line)) } - slice, ok := v[1].([]any) - if !ok { - return nil, false + + n, err := strconv.Atoi(string(line[1:])) + if err != nil { + return nil, err + } + if n != 2 { + return nil, fmt.Errorf("got %d elements in the map, wanted %d", n, 2) } - keys := make([]string, 0, len(slice)) - for i := 0; i < len(slice); i++ { - if s, ok := slice[i].(string); ok { - keys = append(keys, s) + // read `invalidate` + s, err := rd.ReadString() + if err != nil { + return nil, err + } + if s != "invalidate" { + return nil, fmt.Errorf("not a client-side caching push message, data-%s", s) + } + + n, err = rd.ReadArrayLen() + if err != nil { + return nil, err + } + + keys := make([]string, 0, n) + for i := 0; i < n; i++ { + key, err := rd.ReadString() + if err != nil { + return nil, err } + keys = append(keys, key) } - return keys, true + + return keys, nil } // ------------------------------------- Broadcasting ------------------------------------- + +func (c *cache) listen(timeout time.Duration) { + ctx := context.Background() + defer func() { + if err := recover(); err != nil { + internal.Logger.Printf(ctx, "redis cache: panic - %v", err) + } + }() + + if timeout == 0 { + timeout = 30 * time.Second + } + internal.Logger.Printf(ctx, "redis cache: listen working, read timeout-%d second", int(timeout/time.Second)) + + // state, 0-normal, 1-need init track + const ( + normal = 0 + bad = 1 + ) + var state = normal + for { + if atomic.LoadInt32(&c.closed) == 1 { + _ = c.conn.Close() + internal.Logger.Printf(ctx, "redis cache: close, quit listen") + return + } + + if state == bad { + internal.Logger.Printf(ctx, "redis cache: state bad") + if err := c.initTrack(ctx); err != nil { + internal.Logger.Printf(ctx, "redis cache: listen init track error-%s", err.Error()) + time.Sleep(1 * time.Second) + continue + } + } + + if err := c.conn.Ping(ctx).Err(); err != nil { + internal.Logger.Printf(ctx, "redis cache: listen ping error-%s", err.Error()) + state = bad + continue + } + state = normal + + var keys []string + err := c.conn.withConn(ctx, func(ctx context.Context, conn *pool.Conn) error { + return conn.WithReader(ctx, timeout, func(rd *proto.Reader) (err error) { + keys, err = c.readInvalidate(rd) + + if err == nil { + return nil + } + + // The timeout error is considered normal, and it is triggered when we fail + // to receive a notification. We handle it as nil. + // We cannot return the timeout error, as go-redis would consider it a network + // problem and close the network connection. + if isNetTimeout(err) { + err = nil + return err + } + + // We only listen for redis-push notifications, so under normal circumstances, + // we should not receive any redis-error notifications. + // If we do, we need to handle them as errors; otherwise, + // go-redis may consider redis errors as normal occurrences. + if isRedisError(err) { + err = fmt.Errorf("redis cache: unexpected response redis-error-msg-%s", err.Error()) + } + + return err + }) + }) + + // under normal circumstances, we should not receive any errors, including redis errors. + if err != nil { + state = bad + + internal.Logger.Printf(ctx, "redis cache: read push data error-%s", err.Error()) + continue + } + + // it's possible that we may not receive any notifications for keys. + if len(keys) > 0 { + // handle keys + } + } +} + +func (c *cache) initTrack(ctx context.Context) error { + internal.Logger.Printf(ctx, "redis cache: init track") + if c.conn != nil { + _ = c.conn.Close() + } + c.conn = c.client.Conn() + + args := make([]any, 0, 3+2*len(c.prefix)+1) + args = append(args, "CLIENT", "TRACKING", "ON") + for _, prefix := range c.prefix { + args = append(args, "PREFIX", prefix) + } + args = append(args, "BCAST") + cmd := NewStringCmd(ctx, args...) + + if err := c.conn.Process(ctx, cmd); err != nil { + _ = c.conn.Close() + return err + } + + return nil +} + +// isNetTimeout check err == net timeout +func isNetTimeout(err error) bool { + if err == nil { + return false + } + netErr, ok := err.(net.Error) + if !ok { + return false + } + return netErr.Timeout() +}