Skip to content

Commit 457c7c2

Browse files
committed
wip
1 parent 25bde87 commit 457c7c2

29 files changed

+5164
-87
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ coverage.txt
99
**/coverage.txt
1010
.vscode
1111
tmp/*
12+
13+
# Hitless upgrade documentation (temporary)
14+
hitless/docs/

adapters.go

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net"
8+
"time"
9+
10+
"github.com/redis/go-redis/v9/internal/interfaces"
11+
"github.com/redis/go-redis/v9/internal/pool"
12+
"github.com/redis/go-redis/v9/push"
13+
)
14+
15+
// ErrInvalidCommand is returned when an invalid command is passed to ExecuteCommand.
16+
var ErrInvalidCommand = errors.New("invalid command type")
17+
18+
// ErrInvalidPool is returned when the pool type is not supported.
19+
var ErrInvalidPool = errors.New("invalid pool type")
20+
21+
// NewClientAdapter creates a new client adapter for regular Redis clients.
22+
func NewClientAdapter(client *Client) interfaces.ClientInterface {
23+
return &clientAdapter{client: client}
24+
}
25+
26+
// NewClusterClientAdapter creates a new client adapter for cluster Redis clients.
27+
func NewClusterClientAdapter(client interface{}) interfaces.ClientInterface {
28+
return &clusterClientAdapter{client: client}
29+
}
30+
31+
// clientAdapter adapts a Redis client to implement interfaces.ClientInterface.
32+
type clientAdapter struct {
33+
client *Client
34+
}
35+
36+
// GetOptions returns the client options.
37+
func (ca *clientAdapter) GetOptions() interfaces.OptionsInterface {
38+
return &optionsAdapter{options: ca.client.opt}
39+
}
40+
41+
// GetPushProcessor returns the client's push notification processor.
42+
func (ca *clientAdapter) GetPushProcessor() interfaces.NotificationProcessor {
43+
return &pushProcessorAdapter{processor: ca.client.pushProcessor}
44+
}
45+
46+
// optionsAdapter adapts Redis options to implement interfaces.OptionsInterface.
47+
type optionsAdapter struct {
48+
options *Options
49+
}
50+
51+
// GetReadTimeout returns the read timeout.
52+
func (oa *optionsAdapter) GetReadTimeout() time.Duration {
53+
return oa.options.ReadTimeout
54+
}
55+
56+
// GetWriteTimeout returns the write timeout.
57+
func (oa *optionsAdapter) GetWriteTimeout() time.Duration {
58+
return oa.options.WriteTimeout
59+
}
60+
61+
// GetAddr returns the connection address.
62+
func (oa *optionsAdapter) GetAddr() string {
63+
return oa.options.Addr
64+
}
65+
66+
// IsTLSEnabled returns true if TLS is enabled.
67+
func (oa *optionsAdapter) IsTLSEnabled() bool {
68+
return oa.options.TLSConfig != nil
69+
}
70+
71+
// GetProtocol returns the protocol version.
72+
func (oa *optionsAdapter) GetProtocol() int {
73+
return oa.options.Protocol
74+
}
75+
76+
// GetPoolSize returns the connection pool size.
77+
func (oa *optionsAdapter) GetPoolSize() int {
78+
return oa.options.PoolSize
79+
}
80+
81+
// NewDialer returns a new dialer function for the connection.
82+
func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
83+
baseDialer := oa.options.NewDialer()
84+
return func(ctx context.Context) (net.Conn, error) {
85+
// Extract network and address from the options
86+
network := "tcp"
87+
addr := oa.options.Addr
88+
return baseDialer(ctx, network, addr)
89+
}
90+
}
91+
92+
// connectionAdapter adapts a Redis connection to interfaces.ConnectionWithRelaxedTimeout
93+
type connectionAdapter struct {
94+
conn *pool.Conn
95+
}
96+
97+
// Close closes the connection.
98+
func (ca *connectionAdapter) Close() error {
99+
return ca.conn.Close()
100+
}
101+
102+
// IsUsable returns true if the connection is safe to use for new commands.
103+
func (ca *connectionAdapter) IsUsable() bool {
104+
return ca.conn.IsUsable()
105+
}
106+
107+
// GetPoolConnection returns the underlying pool connection.
108+
func (ca *connectionAdapter) GetPoolConnection() *pool.Conn {
109+
return ca.conn
110+
}
111+
112+
// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades.
113+
// These timeouts remain active until explicitly cleared.
114+
func (ca *connectionAdapter) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) {
115+
ca.conn.SetRelaxedTimeout(readTimeout, writeTimeout)
116+
}
117+
118+
// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline.
119+
// After the deadline, timeouts automatically revert to normal values.
120+
func (ca *connectionAdapter) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time) {
121+
ca.conn.SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout, deadline)
122+
}
123+
124+
// ClearRelaxedTimeout clears relaxed timeouts for this connection.
125+
func (ca *connectionAdapter) ClearRelaxedTimeout() {
126+
ca.conn.ClearRelaxedTimeout()
127+
}
128+
129+
// clusterClientAdapter adapts a cluster client to implement interfaces.ClientInterface.
130+
type clusterClientAdapter struct {
131+
client interface{}
132+
}
133+
134+
// GetOptions returns the client options.
135+
func (cca *clusterClientAdapter) GetOptions() interfaces.OptionsInterface {
136+
// Return a mock options adapter for cluster clients
137+
return &mockClusterOptionsAdapter{}
138+
}
139+
140+
// ExecuteCommand executes a command on the cluster client.
141+
func (cca *clusterClientAdapter) ExecuteCommand(ctx context.Context, cmd interface{}) error {
142+
// Use reflection to call Process method on the cluster client
143+
// This is a simplified implementation for the refactoring
144+
return nil // Mock implementation
145+
}
146+
147+
// GetPushProcessor returns the cluster client's push notification processor.
148+
func (cca *clusterClientAdapter) GetPushProcessor() interfaces.NotificationProcessor {
149+
// For cluster clients, return a mock processor since the actual implementation
150+
// would be more complex and distributed across nodes
151+
return &mockClusterPushProcessor{}
152+
}
153+
154+
// DialToEndpoint creates a connection to the specified endpoint for cluster clients.
155+
func (cca *clusterClientAdapter) DialToEndpoint(ctx context.Context, endpoint string) (interface{}, error) {
156+
// For cluster clients, this would need to handle cluster-specific connection logic
157+
// For now, return an error indicating this is not implemented for cluster clients
158+
return nil, fmt.Errorf("DialToEndpoint not implemented for cluster clients")
159+
}
160+
161+
// mockClusterOptionsAdapter is a mock implementation for cluster options.
162+
type mockClusterOptionsAdapter struct{}
163+
164+
// GetReadTimeout returns the read timeout.
165+
func (mcoa *mockClusterOptionsAdapter) GetReadTimeout() time.Duration {
166+
return 5 * time.Second
167+
}
168+
169+
// GetWriteTimeout returns the write timeout.
170+
func (mcoa *mockClusterOptionsAdapter) GetWriteTimeout() time.Duration {
171+
return 3 * time.Second
172+
}
173+
174+
// GetAddr returns the connection address.
175+
func (mcoa *mockClusterOptionsAdapter) GetAddr() string {
176+
return "localhost:6379"
177+
}
178+
179+
// IsTLSEnabled returns true if TLS is enabled.
180+
func (mcoa *mockClusterOptionsAdapter) IsTLSEnabled() bool {
181+
return false
182+
}
183+
184+
// GetProtocol returns the protocol version.
185+
func (mcoa *mockClusterOptionsAdapter) GetProtocol() int {
186+
return 3
187+
}
188+
189+
// GetPoolSize returns the connection pool size.
190+
func (mcoa *mockClusterOptionsAdapter) GetPoolSize() int {
191+
return 50 // Default cluster pool size (5 * runtime.GOMAXPROCS(0))
192+
}
193+
194+
// NewDialer returns a new dialer function for the connection.
195+
func (mcoa *mockClusterOptionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
196+
return func(ctx context.Context) (net.Conn, error) {
197+
return nil, errors.New("mock cluster dialer")
198+
}
199+
}
200+
201+
// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
202+
type pushProcessorAdapter struct {
203+
processor push.NotificationProcessor
204+
}
205+
206+
// RegisterHandler registers a handler for a specific push notification name.
207+
func (ppa *pushProcessorAdapter) RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error {
208+
if pushHandler, ok := handler.(push.NotificationHandler); ok {
209+
return ppa.processor.RegisterHandler(pushNotificationName, pushHandler, protected)
210+
}
211+
return errors.New("handler must implement push.NotificationHandler")
212+
}
213+
214+
// UnregisterHandler removes a handler for a specific push notification name.
215+
func (ppa *pushProcessorAdapter) UnregisterHandler(pushNotificationName string) error {
216+
return ppa.processor.UnregisterHandler(pushNotificationName)
217+
}
218+
219+
// GetHandler returns the handler for a specific push notification name.
220+
func (ppa *pushProcessorAdapter) GetHandler(pushNotificationName string) interface{} {
221+
return ppa.processor.GetHandler(pushNotificationName)
222+
}
223+
224+
// mockClusterPushProcessor is a mock implementation for cluster push processors.
225+
type mockClusterPushProcessor struct{}
226+
227+
// RegisterHandler registers a handler (mock implementation).
228+
func (mcpp *mockClusterPushProcessor) RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error {
229+
return nil
230+
}
231+
232+
// UnregisterHandler removes a handler (mock implementation).
233+
func (mcpp *mockClusterPushProcessor) UnregisterHandler(pushNotificationName string) error {
234+
return nil
235+
}
236+
237+
// GetHandler returns the handler (mock implementation).
238+
func (mcpp *mockClusterPushProcessor) GetHandler(pushNotificationName string) interface{} {
239+
return nil
240+
}

0 commit comments

Comments
 (0)