diff --git a/src/main/java/io/lettuce/core/failover/CircuitBreaker.java b/src/main/java/io/lettuce/core/failover/CircuitBreaker.java new file mode 100644 index 0000000000..6008292d07 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/CircuitBreaker.java @@ -0,0 +1,193 @@ +package io.lettuce.core.failover; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.lettuce.core.RedisCommandTimeoutException; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.failover.api.CircuitBreakerStateListener; +import io.lettuce.core.failover.metrics.CircuitBreakerMetrics; +import io.lettuce.core.failover.metrics.CircuitBreakerMetricsImpl; + +/** + * Circuit breaker for tracking command metrics and managing circuit breaker state. Wraps CircuitBreakerMetrics and exposes it + * via {@link #getMetrics()}. + * + * @author Ali Takavci + * @since 7.1 + */ +public class CircuitBreaker { + + private static final Logger log = LoggerFactory.getLogger(CircuitBreaker.class); + + private final CircuitBreakerMetrics metrics; + + private final CircuitBreakerConfig config; + + private volatile State currentState = State.CLOSED; + + private Predicate exceptionsPredicate; + + private final Set listeners = ConcurrentHashMap.newKeySet(); + + /** + * Create a circuit breaker instance. + */ + public CircuitBreaker(CircuitBreakerConfig config) { + this.metrics = new CircuitBreakerMetricsImpl(); + this.config = config; + this.exceptionsPredicate = createExceptionsPredicate(config.trackedExceptions); + } + + /** + * Get the metrics tracked by this circuit breaker. + * + * @return the circuit breaker metrics + */ + public CircuitBreakerMetrics getMetrics() { + return metrics; + } + + @Override + public String toString() { + return "CircuitBreaker{" + "metrics=" + metrics + ", config=" + config + '}'; + } + + public boolean isCircuitBreakerTrackedException(Throwable error) { + return exceptionsPredicate.test(error); + } + + private static Predicate createExceptionsPredicate(Set> trackedExceptions) { + return throwable -> { + Class errorClass = throwable.getClass(); + for (Class trackedException : trackedExceptions) { + if (trackedException.isAssignableFrom(errorClass)) { + return true; + } + } + return false; + }; + } + + public void evaluateMetrics() { + boolean evaluationResult = metrics.getSnapshot().getFailureRate() >= config.getFailureRateThreshold() + && metrics.getSnapshot().getFailureCount() >= config.getMinimumNumberOfFailures(); + if (evaluationResult) { + stateTransitionTo(State.OPEN); + } + } + + private void stateTransitionTo(State newState) { + State previousState = this.currentState; + if (previousState != newState) { + this.currentState = newState; + fireStateChanged(previousState, newState); + } + } + + public State getCurrentState() { + return currentState; + } + + /** + * Add a listener for circuit breaker state change events. + * + * @param listener the listener to add, must not be {@code null} + */ + public void addListener(CircuitBreakerStateListener listener) { + listeners.add(listener); + } + + /** + * Remove a listener for circuit breaker state change events. + * + * @param listener the listener to remove, must not be {@code null} + */ + public void removeListener(CircuitBreakerStateListener listener) { + listeners.remove(listener); + } + + /** + * Fire a state change event to all registered listeners. + * + * @param previousState the previous state + * @param newState the new state + */ + private void fireStateChanged(State previousState, State newState) { + CircuitBreakerStateChangeEvent event = new CircuitBreakerStateChangeEvent(this, previousState, newState); + for (CircuitBreakerStateListener listener : listeners) { + try { + listener.onCircuitBreakerStateChange(event); + } catch (Exception e) { + // Ignore listener exceptions to prevent one bad listener from affecting others + log.error("Error notifying listener " + listener + " of state change " + event, e); + } + } + } + + public static enum State { + CLOSED, OPEN + } + + public static class CircuitBreakerConfig { + + private final static float DEFAULT_FAILURE_RATE_THRESHOLD = 10; + + private final static int DEFAULT_MINIMUM_NUMBER_OF_FAILURES = 1000; + + private final static Set> DEFAULT_TRACKED_EXCEPTIONS = new HashSet<>(Arrays.asList( + + // Connection failures + RedisConnectionException.class, // Connection establishment failures + IOException.class, // Network I/O failures (includes ClosedChannelException) + ConnectException.class, // Connection refused, etc. + + // Timeout failures + RedisCommandTimeoutException.class, // Command execution timeout + TimeoutException.class // Generic timeout + + )); + + public static final CircuitBreakerConfig DEFAULT = new CircuitBreakerConfig(); + + private final Set> trackedExceptions; + + private final float failureThreshold; + + private final int minimumNumberOfFailures; + + private CircuitBreakerConfig() { + this(DEFAULT_FAILURE_RATE_THRESHOLD, DEFAULT_MINIMUM_NUMBER_OF_FAILURES, DEFAULT_TRACKED_EXCEPTIONS); + } + + public CircuitBreakerConfig(float failureThreshold, int minimumNumberOfFailures, + Set> trackedExceptions) { + this.trackedExceptions = trackedExceptions; + this.failureThreshold = failureThreshold; + this.minimumNumberOfFailures = minimumNumberOfFailures; + } + + public Set> getTrackedExceptions() { + return trackedExceptions; + } + + public float getFailureRateThreshold() { + return failureThreshold; + } + + public int getMinimumNumberOfFailures() { + return minimumNumberOfFailures; + } + + } + +} diff --git a/src/main/java/io/lettuce/core/failover/CircuitBreakerStateChangeEvent.java b/src/main/java/io/lettuce/core/failover/CircuitBreakerStateChangeEvent.java new file mode 100644 index 0000000000..0724b8d225 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/CircuitBreakerStateChangeEvent.java @@ -0,0 +1,76 @@ +package io.lettuce.core.failover; + +/** + * Event representing a circuit breaker state change. + * + * @author Ali Takavci + * @since 7.1 + */ +public class CircuitBreakerStateChangeEvent { + + private final CircuitBreaker circuitBreaker; + + private final CircuitBreaker.State previousState; + + private final CircuitBreaker.State newState; + + private final long timestamp; + + /** + * Create a new circuit breaker state change event. + * + * @param circuitBreaker the circuit breaker instance + * @param previousState the previous state + * @param newState the new state + */ + public CircuitBreakerStateChangeEvent(CircuitBreaker circuitBreaker, CircuitBreaker.State previousState, + CircuitBreaker.State newState) { + this.circuitBreaker = circuitBreaker; + this.previousState = previousState; + this.newState = newState; + this.timestamp = System.currentTimeMillis(); + } + + /** + * Get the circuit breaker instance that changed state. + * + * @return the circuit breaker instance + */ + public CircuitBreaker getCircuitBreaker() { + return circuitBreaker; + } + + /** + * Get the previous state before the transition. + * + * @return the previous state + */ + public CircuitBreaker.State getPreviousState() { + return previousState; + } + + /** + * Get the new state after the transition. + * + * @return the new state + */ + public CircuitBreaker.State getNewState() { + return newState; + } + + /** + * Get the timestamp when the state change occurred. + * + * @return the timestamp in milliseconds since epoch + */ + public long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return "CircuitBreakerStateChangeEvent{" + "previousState=" + previousState + ", newState=" + newState + ", timestamp=" + + timestamp + '}'; + } + +} diff --git a/src/main/java/io/lettuce/core/failover/DatabaseConfig.java b/src/main/java/io/lettuce/core/failover/DatabaseConfig.java new file mode 100644 index 0000000000..a91cc4d4d1 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/DatabaseConfig.java @@ -0,0 +1,129 @@ +package io.lettuce.core.failover; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisURI; +import io.lettuce.core.failover.CircuitBreaker.CircuitBreakerConfig; +import io.lettuce.core.internal.LettuceAssert; + +/** + * Configuration for a database in a multi-database client. Holds the Redis URI, weight for load balancing, and client options. + * + * @author Ali Takavci + * @since 7.1 + */ +public class DatabaseConfig { + + private final RedisURI redisURI; + + private final float weight; + + private final ClientOptions clientOptions; + + private final CircuitBreakerConfig circuitBreakerConfig; + + /** + * Create a new database configuration. + * + * @param redisURI the Redis URI, must not be {@code null} + * @param weight the weight for load balancing, must be greater than 0 + * @param clientOptions the client options, can be {@code null} to use defaults + * @param circuitBreakerConfig the circuit breaker configuration, can be {@code null} to use defaults + */ + public DatabaseConfig(RedisURI redisURI, float weight, ClientOptions clientOptions, + CircuitBreakerConfig circuitBreakerConfig) { + LettuceAssert.notNull(redisURI, "RedisURI must not be null"); + LettuceAssert.isTrue(weight > 0, "Weight must be greater than 0"); + + this.redisURI = redisURI; + this.weight = weight; + this.clientOptions = clientOptions; + this.circuitBreakerConfig = circuitBreakerConfig != null ? circuitBreakerConfig : CircuitBreakerConfig.DEFAULT; + } + + /** + * Create a new database configuration with default client options. + * + * @param redisURI the Redis URI, must not be {@code null} + * @param weight the weight for load balancing, must be greater than 0 + * @param clientOptions the client options, can be {@code null} to use defaults + */ + public DatabaseConfig(RedisURI redisURI, float weight, ClientOptions clientOptions) { + this(redisURI, weight, clientOptions, null); + } + + /** + * Create a new database configuration with default client options. + * + * @param redisURI the Redis URI, must not be {@code null} + * @param weight the weight for load balancing, must be greater than 0 + */ + public DatabaseConfig(RedisURI redisURI, float weight) { + this(redisURI, weight, null, null); + } + + /** + * Get the Redis URI. + * + * @return the Redis URI + */ + public RedisURI getRedisURI() { + return redisURI; + } + + /** + * Get the weight for load balancing. + * + * @return the weight + */ + public float getWeight() { + return weight; + } + + /** + * Get the client options. + * + * @return the client options, can be {@code null} + */ + public ClientOptions getClientOptions() { + return clientOptions; + } + + /** + * Get the circuit breaker configuration. + * + * @return the circuit breaker configuration + */ + public CircuitBreakerConfig getCircuitBreakerConfig() { + return circuitBreakerConfig; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof DatabaseConfig)) + return false; + + DatabaseConfig that = (DatabaseConfig) o; + + if (Float.compare(that.weight, weight) != 0) + return false; + if (!redisURI.equals(that.redisURI)) + return false; + return clientOptions != null ? clientOptions.equals(that.clientOptions) : that.clientOptions == null; + } + + @Override + public int hashCode() { + int result = redisURI.hashCode(); + result = 31 * result + (weight != +0.0f ? Float.floatToIntBits(weight) : 0); + result = 31 * result + (clientOptions != null ? clientOptions.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "DatabaseConfig{" + "redisURI=" + redisURI + ", weight=" + weight + ", clientOptions=" + clientOptions + '}'; + } + +} diff --git a/src/main/java/io/lettuce/core/failover/DatabaseConnectionFactory.java b/src/main/java/io/lettuce/core/failover/DatabaseConnectionFactory.java new file mode 100644 index 0000000000..5f78da6bd4 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/DatabaseConnectionFactory.java @@ -0,0 +1,28 @@ +package io.lettuce.core.failover; + +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.codec.RedisCodec; + +/** + * Factory interface for creating database connections in a multi-database client. + * + * @param Connection type + * @param Key type + * @param Value type + * + * @author Ali Takavci + * @since 7.1 + */ +@FunctionalInterface +interface DatabaseConnectionFactory, K, V> { + + /** + * Create a new database connection for the given configuration. + * + * @param config the database configuration + * @param codec the codec to use for encoding/decoding + * @return a new RedisDatabase instance + */ + RedisDatabase createDatabase(DatabaseConfig config, RedisCodec codec); + +} diff --git a/src/main/java/io/lettuce/core/failover/DatabaseEndpoint.java b/src/main/java/io/lettuce/core/failover/DatabaseEndpoint.java new file mode 100644 index 0000000000..ae69be81c8 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/DatabaseEndpoint.java @@ -0,0 +1,43 @@ +package io.lettuce.core.failover; + +import java.util.Collection; + +import io.lettuce.core.RedisException; +import io.lettuce.core.protocol.RedisCommand; + +/** + * Database endpoint interface for multi-database failover with circuit breaker metrics tracking. + * + * @author Ali Takavci + * @since 7.1 + */ +interface DatabaseEndpoint { + + Collection> drainCommands(); + + RedisCommand write(RedisCommand command); + + /** + * Set the circuit breaker for this endpoint. Must be called before any commands are written. + * + * @param circuitBreaker the circuit breaker instance + */ + void setCircuitBreaker(CircuitBreaker circuitBreaker); + + default void handOverCommandQueue(DatabaseEndpoint target) { + Collection> commands = this.drainCommands(); + + for (RedisCommand queuedCommand : commands) { + if (queuedCommand == null || queuedCommand.isCancelled()) { + continue; + } + + try { + target.write(queuedCommand); + } catch (RedisException e) { + queuedCommand.completeExceptionally(e); + } + } + } + +} diff --git a/src/main/java/io/lettuce/core/failover/DatabaseEndpointImpl.java b/src/main/java/io/lettuce/core/failover/DatabaseEndpointImpl.java new file mode 100644 index 0000000000..c3ff5ae5c2 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/DatabaseEndpointImpl.java @@ -0,0 +1,93 @@ +package io.lettuce.core.failover; + +import java.util.Collection; +import java.util.List; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.protocol.CompleteableCommand; +import io.lettuce.core.protocol.DefaultEndpoint; +import io.lettuce.core.protocol.RedisCommand; +import io.lettuce.core.resource.ClientResources; + +/** + * Database endpoint implementation for multi-database failover with circuit breaker metrics tracking. Extends DefaultEndpoint + * and tracks command successes and failures. + * + * @author Ali Takavci + * @since 7.1 + */ +class DatabaseEndpointImpl extends DefaultEndpoint implements DatabaseEndpoint { + + private CircuitBreaker circuitBreaker; + + public DatabaseEndpointImpl(ClientOptions clientOptions, ClientResources clientResources) { + super(clientOptions, clientResources); + } + + /** + * Set the circuit breaker for this endpoint. Must be called before any commands are written. + * + * @param circuitBreaker the circuit breaker instance + */ + @Override + public void setCircuitBreaker(CircuitBreaker circuitBreaker) { + this.circuitBreaker = circuitBreaker; + } + + /** + * Get the circuit breaker for this endpoint. + * + * @return the circuit breaker instance + */ + public CircuitBreaker getCircuitBreaker() { + return circuitBreaker; + } + + @Override + public RedisCommand write(RedisCommand command) { + RedisCommand result = super.write(command); + + // Attach completion callback to track success/failure + if (circuitBreaker != null && result instanceof CompleteableCommand) { + @SuppressWarnings("unchecked") + CompleteableCommand completeable = (CompleteableCommand) result; + completeable.onComplete(this::handleFailure); + } + + return result; + } + + @Override + public Collection> write(Collection> commands) { + // Delegate to parent + Collection> result = super.write(commands); + + // Attach completion callbacks to track success/failure for each command + if (circuitBreaker != null) { + for (RedisCommand command : result) { + if (command instanceof CompleteableCommand) { + @SuppressWarnings("unchecked") + CompleteableCommand completeable = (CompleteableCommand) command; + completeable.onComplete(this::handleFailure); + } + } + } + + return result; + } + + private void handleFailure(Object output, Throwable error) { + if (error != null && circuitBreaker.isCircuitBreakerTrackedException(error)) { + circuitBreaker.getMetrics().recordFailure(); + circuitBreaker.evaluateMetrics(); + } else { + circuitBreaker.getMetrics().recordSuccess(); + } + } + + @Override + public List> drainCommands() { + return super.drainCommands(); + } + +} diff --git a/src/main/java/io/lettuce/core/failover/DatabasePubSubEndpointImpl.java b/src/main/java/io/lettuce/core/failover/DatabasePubSubEndpointImpl.java new file mode 100644 index 0000000000..da96e2c57f --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/DatabasePubSubEndpointImpl.java @@ -0,0 +1,94 @@ +package io.lettuce.core.failover; + +import java.util.Collection; +import java.util.List; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.protocol.CompleteableCommand; +import io.lettuce.core.protocol.RedisCommand; +import io.lettuce.core.pubsub.PubSubEndpoint; +import io.lettuce.core.resource.ClientResources; + +/** + * Database PubSub endpoint implementation for multi-database failover with circuit breaker metrics tracking. Extends + * PubSubEndpoint and tracks command successes and failures. + * + * @author Ali Takavci + * @since 7.1 + */ +class DatabasePubSubEndpointImpl extends PubSubEndpoint implements DatabaseEndpoint { + + private CircuitBreaker circuitBreaker; + + public DatabasePubSubEndpointImpl(ClientOptions clientOptions, ClientResources clientResources) { + super(clientOptions, clientResources); + } + + /** + * Set the circuit breaker for this endpoint. Must be called before any commands are written. + * + * @param circuitBreaker the circuit breaker instance + */ + @Override + public void setCircuitBreaker(CircuitBreaker circuitBreaker) { + this.circuitBreaker = circuitBreaker; + } + + /** + * Get the circuit breaker for this endpoint. + * + * @return the circuit breaker instance + */ + public CircuitBreaker getCircuitBreaker() { + return circuitBreaker; + } + + @Override + public RedisCommand write(RedisCommand command) { + // Delegate to parent + RedisCommand result = super.write(command); + + // Attach completion callback to track success/failure + if (circuitBreaker != null && result instanceof CompleteableCommand) { + @SuppressWarnings("unchecked") + CompleteableCommand completeable = (CompleteableCommand) result; + completeable.onComplete(this::handleFailure); + } + + return result; + } + + @Override + public Collection> write(Collection> commands) { + // Delegate to parent + Collection> result = super.write(commands); + + // Attach completion callbacks to track success/failure for each command + if (circuitBreaker != null) { + for (RedisCommand command : result) { + if (command instanceof CompleteableCommand) { + @SuppressWarnings("unchecked") + CompleteableCommand completeable = (CompleteableCommand) command; + completeable.onComplete(this::handleFailure); + } + } + } + + return result; + } + + private void handleFailure(Object output, Throwable error) { + if (error != null && circuitBreaker.isCircuitBreakerTrackedException(error)) { + circuitBreaker.getMetrics().recordFailure(); + circuitBreaker.evaluateMetrics(); + } else { + circuitBreaker.getMetrics().recordSuccess(); + } + } + + @Override + public List> drainCommands() { + return super.drainCommands(); + } + +} diff --git a/src/main/java/io/lettuce/core/failover/MultiDbClient.java b/src/main/java/io/lettuce/core/failover/MultiDbClient.java new file mode 100644 index 0000000000..b0a865cb9e --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/MultiDbClient.java @@ -0,0 +1,44 @@ +package io.lettuce.core.failover; + +import java.util.Collection; +import io.lettuce.core.RedisURI; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.failover.api.BaseRedisClient; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.core.failover.api.StatefulRedisMultiDbPubSubConnection; +import io.lettuce.core.resource.ClientResources; + +/** + * @author Ali Takavci + * @since 7.1 + */ +public interface MultiDbClient extends BaseRedisClient { + + public static MultiDbClient create(Collection databaseConfigs) { + if (databaseConfigs == null || databaseConfigs.isEmpty()) { + throw new IllegalArgumentException("Database configs must not be empty"); + } + return new MultiDbClientImpl(databaseConfigs); + } + + public static MultiDbClient create(ClientResources resources, Collection databaseConfigs) { + if (resources == null) { + throw new IllegalArgumentException("Client resources must not be null"); + } + if (databaseConfigs == null || databaseConfigs.isEmpty()) { + throw new IllegalArgumentException("Database configs must not be empty"); + } + return new MultiDbClientImpl(resources, databaseConfigs); + } + + Collection getRedisURIs(); + + StatefulRedisMultiDbConnection connect(RedisCodec codec); + + StatefulRedisMultiDbConnection connect(); + + StatefulRedisMultiDbPubSubConnection connectPubSub(RedisCodec codec); + + StatefulRedisMultiDbPubSubConnection connectPubSub(); + +} diff --git a/src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java b/src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java new file mode 100644 index 0000000000..1aae628122 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java @@ -0,0 +1,165 @@ +package io.lettuce.core.failover; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.lettuce.core.Delegating; +import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.StatefulRedisConnectionImpl; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.core.failover.api.StatefulRedisMultiDbPubSubConnection; +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.DefaultEndpoint; +import io.lettuce.core.pubsub.PubSubEndpoint; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.resource.ClientResources; + +/** + * Failover-aware client that composes multiple standalone Redis endpoints and returns a single Stateful connection wrapper + * which can switch the active endpoint without requiring users to recreate command objects. + * + * Standalone-only POC. Not for Sentinel/Cluster. + * + * @author Ali Takavci + * @since 7.1 + */ +class MultiDbClientImpl extends RedisClient implements MultiDbClient { + + private static final RedisURI EMPTY_URI = new RedisURI(); + + private final Map databaseConfigs; + + MultiDbClientImpl(Collection databaseConfigs) { + this(null, databaseConfigs); + } + + MultiDbClientImpl(ClientResources clientResources, Collection databaseConfigs) { + super(clientResources, EMPTY_URI); + + if (databaseConfigs == null || databaseConfigs.isEmpty()) { + this.databaseConfigs = new ConcurrentHashMap<>(); + } else { + this.databaseConfigs = new ConcurrentHashMap<>(databaseConfigs.size()); + for (DatabaseConfig config : databaseConfigs) { + LettuceAssert.notNull(config, "DatabaseConfig must not be null"); + LettuceAssert.notNull(config.getRedisURI(), "RedisURI must not be null"); + this.databaseConfigs.put(config.getRedisURI(), config); + } + } + } + + /** + * Open a new connection to a Redis server that treats keys and values as UTF-8 strings. + * + * @return A new stateful Redis connection + */ + public StatefulRedisMultiDbConnection connect() { + return connect(newStringStringCodec()); + } + + @Override + public Collection getRedisURIs() { + return databaseConfigs.keySet(); + } + + public StatefulRedisMultiDbConnection connect(RedisCodec codec) { + + if (codec == null) { + throw new IllegalArgumentException("codec must not be null"); + } + + Map>> databases = new ConcurrentHashMap<>(databaseConfigs.size()); + for (Map.Entry entry : databaseConfigs.entrySet()) { + RedisURI uri = entry.getKey(); + DatabaseConfig config = entry.getValue(); + + // HACK: looks like repeating the implementation all around 'RedisClient.connect' is an overkill. + // connections.put(uri, connect(codec, uri)); + // Instead we will use it from delegate + RedisDatabase> database = createRedisDatabase(config, codec); + + databases.put(uri, database); + } + + // Provide a connection factory for dynamic database addition + return new StatefulRedisMultiDbConnectionImpl, K, V>(databases, getResources(), codec, + getOptions().getJsonParser(), this::createRedisDatabase); + } + + private RedisDatabase> createRedisDatabase(DatabaseConfig config, + RedisCodec codec) { + RedisURI uri = config.getRedisURI(); + StatefulRedisConnection connection = connect(codec, uri); + DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection); + RedisDatabase> database = new RedisDatabase<>( + new RedisDatabase.RedisDatabaseConfig(uri, config.getWeight(), config.getCircuitBreakerConfig()), connection, + databaseEndpoint); + + return database; + } + + /** + * Open a new connection to a Redis server that treats keys and values as UTF-8 strings. + * + * @return A new stateful Redis connection + */ + public StatefulRedisMultiDbPubSubConnection connectPubSub() { + return connectPubSub(newStringStringCodec()); + } + + public StatefulRedisMultiDbPubSubConnection connectPubSub(RedisCodec codec) { + + if (codec == null) { + throw new IllegalArgumentException("codec must not be null"); + } + + Map>> databases = new ConcurrentHashMap<>( + databaseConfigs.size()); + for (Map.Entry entry : databaseConfigs.entrySet()) { + RedisURI uri = entry.getKey(); + DatabaseConfig config = entry.getValue(); + + RedisDatabase> database = createRedisDatabaseWithPubSub(config, codec); + databases.put(uri, database); + } + + // Provide a connection factory for dynamic database addition + return new StatefulRedisMultiDbPubSubConnectionImpl(databases, getResources(), codec, + getOptions().getJsonParser(), this::createRedisDatabaseWithPubSub); + } + + private RedisDatabase> createRedisDatabaseWithPubSub(DatabaseConfig config, + RedisCodec codec) { + RedisURI uri = config.getRedisURI(); + StatefulRedisPubSubConnection connection = connectPubSub(codec, uri); + DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection); + RedisDatabase> database = new RedisDatabase<>( + new RedisDatabase.RedisDatabaseConfig(uri, config.getWeight(), config.getCircuitBreakerConfig()), connection, + databaseEndpoint); + return database; + } + + private DatabaseEndpoint extractDatabaseEndpoint(StatefulRedisConnection connection) { + RedisChannelWriter writer = ((StatefulRedisConnectionImpl) connection).getChannelWriter(); + if (writer instanceof Delegating) { + writer = (RedisChannelWriter) ((Delegating) writer).unwrap(); + } + return (DatabaseEndpoint) writer; + } + + @Override + protected DefaultEndpoint createEndpoint() { + return new DatabaseEndpointImpl(getOptions(), getResources()); + } + + @Override + protected PubSubEndpoint createPubSubEndpoint() { + return new DatabasePubSubEndpointImpl<>(getOptions(), getResources()); + } + +} diff --git a/src/main/java/io/lettuce/core/failover/MultiDbFutureSyncInvocationHandler.java b/src/main/java/io/lettuce/core/failover/MultiDbFutureSyncInvocationHandler.java new file mode 100644 index 0000000000..e5bed5cfb4 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/MultiDbFutureSyncInvocationHandler.java @@ -0,0 +1,125 @@ +package io.lettuce.core.failover; + +/* + * Copyright 2011-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.internal.AbstractInvocationHandler; +import io.lettuce.core.internal.Futures; +import io.lettuce.core.internal.TimeoutProvider; +import io.lettuce.core.protocol.CommandType; +import io.lettuce.core.protocol.ProtocolKeyword; +import io.lettuce.core.protocol.RedisCommand; + +/** + * Invocation-handler to synchronize API calls which use Futures as backend. This class leverages the need to implement a full + * sync class which just delegates every request. + * + * IMPORTANT: This class is copied from io.lettuce.core.FutureSyncInvocationHandler. Any changes made there should be reflected. + * We will need to change how the timeout cases are handled for MultiDb connections, since Futures.awaitOrCancel() does not + * provide actual error types in DefaultEndpoint(DatabaseEndpoint, DatabasePubSubEndpoint) implementations for MultiDb. + * + * @author Ali Takavci + * @since 7.1 + */ +class MultiDbFutureSyncInvocationHandler extends AbstractInvocationHandler { + + private final StatefulConnection connection; + + private final TimeoutProvider timeoutProvider; + + private final Object asyncApi; + + private final MethodTranslator translator; + + MultiDbFutureSyncInvocationHandler(StatefulConnection connection, Object asyncApi, Class[] interfaces) { + this.connection = connection; + this.timeoutProvider = new TimeoutProvider(() -> connection.getOptions().getTimeoutOptions(), + () -> connection.getTimeout().toNanos()); + this.asyncApi = asyncApi; + this.translator = MethodTranslator.of(asyncApi.getClass(), interfaces); + } + + @Override + protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable { + + try { + + Method targetMethod = this.translator.get(method); + Object result = targetMethod.invoke(asyncApi, args); + + if (result instanceof RedisFuture) { + + RedisFuture command = (RedisFuture) result; + + if (!isTxControlMethod(method.getName(), args) && isTransactionActive(connection)) { + return null; + } + + long timeout = getTimeoutNs(command); + + return Futures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS); + } + + return result; + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } + } + + private long getTimeoutNs(RedisFuture command) { + + if (command instanceof RedisCommand) { + return timeoutProvider.getTimeoutNs((RedisCommand) command); + } + + return connection.getTimeout().toNanos(); + } + + private static boolean isTransactionActive(StatefulConnection connection) { + return connection instanceof StatefulRedisConnection && ((StatefulRedisConnection) connection).isMulti(); + } + + private static boolean isTxControlMethod(String methodName, Object[] args) { + + if (methodName.equals("exec") || methodName.equals("multi") || methodName.equals("discard") + || methodName.equals("watch")) { + return true; + } + + if (methodName.equals("dispatch") && args.length > 0 && args[0] instanceof ProtocolKeyword) { + + ProtocolKeyword keyword = (ProtocolKeyword) args[0]; + if (keyword.toString().equals(CommandType.MULTI.name()) || keyword.toString().equals(CommandType.EXEC.name()) + || keyword.toString().equals(CommandType.DISCARD.name())) { + return true; + } + } + + return false; + } + +} diff --git a/src/main/java/io/lettuce/core/failover/RedisDatabase.java b/src/main/java/io/lettuce/core/failover/RedisDatabase.java new file mode 100644 index 0000000000..ae73f9f439 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/RedisDatabase.java @@ -0,0 +1,78 @@ +package io.lettuce.core.failover; + +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.failover.CircuitBreaker.CircuitBreakerConfig; + +/** + * Represents a Redis database with a weight and a connection. + * + * @param Connection type. + * + * @author Ali Takavci + * @since 7.1 + */ +public class RedisDatabase> { + + private final float weight; + + private final C connection; + + private final RedisURI redisURI; + + private final DatabaseEndpoint databaseEndpoint; + + private final CircuitBreaker circuitBreaker; + + public RedisDatabase(RedisDatabaseConfig config, C connection, DatabaseEndpoint databaseEndpoint) { + this.redisURI = config.redisURI; + this.weight = config.weight; + this.connection = connection; + this.databaseEndpoint = databaseEndpoint; + this.circuitBreaker = new CircuitBreaker(config.circuitBreakerConfig); + databaseEndpoint.setCircuitBreaker(circuitBreaker); + } + + public float getWeight() { + return weight; + } + + public C getConnection() { + return connection; + } + + public RedisURI getRedisURI() { + return redisURI; + } + + public DatabaseEndpoint getDatabaseEndpoint() { + return databaseEndpoint; + } + + public CircuitBreaker getCircuitBreaker() { + return circuitBreaker; + } + + public static class RedisDatabaseConfig { + + private RedisURI redisURI; + + private float weight; + + private CircuitBreakerConfig circuitBreakerConfig; + + public RedisDatabaseConfig(RedisURI redisURI, float weight) { + this.redisURI = redisURI; + this.weight = weight; + this.circuitBreakerConfig = CircuitBreakerConfig.DEFAULT; + } + + public RedisDatabaseConfig(RedisURI redisURI, float weight, CircuitBreakerConfig circuitBreakerConfig) { + this.redisURI = redisURI; + this.weight = weight; + this.circuitBreakerConfig = circuitBreakerConfig; + } + + } + +} diff --git a/src/main/java/io/lettuce/core/failover/StatefulRedisMultiDbConnectionImpl.java b/src/main/java/io/lettuce/core/failover/StatefulRedisMultiDbConnectionImpl.java new file mode 100644 index 0000000000..f83572bb5a --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/StatefulRedisMultiDbConnectionImpl.java @@ -0,0 +1,332 @@ +package io.lettuce.core.failover; + +import java.lang.reflect.Proxy; +import java.time.Duration; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisAsyncCommandsImpl; +import io.lettuce.core.RedisConnectionStateListener; +import io.lettuce.core.RedisReactiveCommandsImpl; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.api.push.PushListener; +import io.lettuce.core.api.reactive.RedisReactiveCommands; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.api.sync.RedisClusterCommands; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.core.internal.AbstractInvocationHandler; +import io.lettuce.core.json.JsonParser; +import io.lettuce.core.protocol.RedisCommand; +import io.lettuce.core.resource.ClientResources; + +/** + * Stateful connection wrapper that holds multiple underlying connections and delegates to the currently active one. Command + * interfaces (sync/async/reactive) are dynamic proxies that always target the current active connection at invocation time so + * they remain valid across switches. + * + * @author Ali Takavci + * @since 7.1 + */ +public class StatefulRedisMultiDbConnectionImpl, K, V> + implements StatefulRedisMultiDbConnection { + + protected final Map> databases; + + protected RedisDatabase current; + + protected final RedisCommands sync; + + protected final RedisAsyncCommandsImpl async; + + protected final RedisReactiveCommandsImpl reactive; + + protected final RedisCodec codec; + + protected final Supplier parser; + + protected final Set pushListeners = ConcurrentHashMap.newKeySet(); + + protected final Set connectionStateListeners = ConcurrentHashMap.newKeySet(); + + protected final DatabaseConnectionFactory connectionFactory; + + public StatefulRedisMultiDbConnectionImpl(Map> connections, ClientResources resources, + RedisCodec codec, Supplier parser, DatabaseConnectionFactory connectionFactory) { + if (connections == null || connections.isEmpty()) { + throw new IllegalArgumentException("connections must not be empty"); + } + this.databases = new ConcurrentHashMap<>(connections); + this.codec = codec; + this.parser = parser; + this.connectionFactory = connectionFactory; + this.current = connections.values().stream().max(Comparator.comparingDouble(RedisDatabase::getWeight)).get(); + + this.async = newRedisAsyncCommandsImpl(); + this.sync = newRedisSyncCommandsImpl(); + this.reactive = newRedisReactiveCommandsImpl(); + + databases.values().forEach(db -> db.getCircuitBreaker().addListener(this::onCircuitBreakerStateChange)); + } + + private void onCircuitBreakerStateChange(CircuitBreakerStateChangeEvent event) { + if (event.getCircuitBreaker() == current.getCircuitBreaker() && event.getNewState() == CircuitBreaker.State.OPEN) { + failoverFrom(current); + } + } + + private void failoverFrom(RedisDatabase fromDb) { + RedisDatabase healthyDatabase = getHealthyDatabase(fromDb); + if (healthyDatabase != null) { + switchToDatabase(healthyDatabase.getRedisURI()); + } else { + // No healthy database found, stay on the current one + // TODO: manage max attempts to failover + } + } + + private RedisDatabase getHealthyDatabase(RedisDatabase current) { + return databases.values().stream().filter(db -> db != current) + .filter(db -> db.getCircuitBreaker().getCurrentState() == CircuitBreaker.State.CLOSED) + .max(Comparator.comparingDouble(RedisDatabase::getWeight)).get(); + } + + @Override + public RedisAsyncCommands async() { + return async; + } + + /** + * Create a new instance of {@link RedisCommands}. Can be overriden to extend. + * + * @return a new instance + */ + protected RedisCommands newRedisSyncCommandsImpl() { + return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class); + } + + @SuppressWarnings("unchecked") + protected T syncHandler(Object asyncApi, Class... interfaces) { + AbstractInvocationHandler h = new MultiDbFutureSyncInvocationHandler(this, asyncApi, interfaces); + return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h); + } + + /** + * Create a new instance of {@link RedisAsyncCommandsImpl}. Can be overriden to extend. + * + * @return a new instance + */ + protected RedisAsyncCommandsImpl newRedisAsyncCommandsImpl() { + return new RedisAsyncCommandsImpl<>(this, codec, parser); + } + + @Override + public RedisReactiveCommands reactive() { + return reactive; + } + + /** + * Create a new instance of {@link RedisReactiveCommandsImpl}. Can be overriden to extend. + * + * @return a new instance + */ + protected RedisReactiveCommandsImpl newRedisReactiveCommandsImpl() { + return new RedisReactiveCommandsImpl<>(this, codec, parser); + } + + @Override + public RedisCommands sync() { + return sync; + } + + @Override + public void addListener(RedisConnectionStateListener listener) { + connectionStateListeners.add(listener); + current.getConnection().addListener(listener); + } + + @Override + public void removeListener(RedisConnectionStateListener listener) { + connectionStateListeners.remove(listener); + current.getConnection().removeListener(listener); + } + + @Override + public void setTimeout(Duration timeout) { + databases.values().forEach(db -> db.getConnection().setTimeout(timeout)); + } + + @Override + public Duration getTimeout() { + return current.getConnection().getTimeout(); + } + + @Override + public RedisCommand dispatch(RedisCommand command) { + return current.getConnection().dispatch(command); + } + + @Override + public Collection> dispatch(Collection> commands) { + return current.getConnection().dispatch(commands); + } + + @Override + public void close() { + databases.values().forEach(db -> db.getConnection().close()); + } + + @Override + public CompletableFuture closeAsync() { + return CompletableFuture.allOf(databases.values().stream().map(db -> db.getConnection()) + .map(StatefulConnection::closeAsync).toArray(CompletableFuture[]::new)); + } + + @Override + public boolean isOpen() { + return current.getConnection().isOpen(); + } + + @Override + public ClientOptions getOptions() { + return current.getConnection().getOptions(); + } + + @Override + public ClientResources getResources() { + return current.getConnection().getResources(); + } + + @Override + public void setAutoFlushCommands(boolean autoFlush) { + databases.values().forEach(db -> db.getConnection().setAutoFlushCommands(autoFlush)); + } + + @Override + public void flushCommands() { + current.getConnection().flushCommands(); + } + + @Override + public boolean isMulti() { + return current.getConnection().isMulti(); + } + + @Override + public void addListener(PushListener listener) { + pushListeners.add(listener); + current.getConnection().addListener(listener); + } + + @Override + public void removeListener(PushListener listener) { + pushListeners.remove(listener); + current.getConnection().removeListener(listener); + } + + @Override + public RedisCodec getCodec() { + return codec; + } + + @Override + public RedisURI getCurrentEndpoint() { + return current.getRedisURI(); + } + + @Override + public Iterable getEndpoints() { + return databases.keySet(); + } + + @Override + public void switchToDatabase(RedisURI redisURI) { + RedisDatabase fromDb = current; + RedisDatabase toDb = databases.get(redisURI); + if (fromDb == null || toDb == null) { + throw new UnsupportedOperationException("Cannot initiate switch without a current and target database!"); + } + current = toDb; + connectionStateListeners.forEach(listener -> { + toDb.getConnection().addListener(listener); + fromDb.getConnection().removeListener(listener); + }); + pushListeners.forEach(listener -> { + toDb.getConnection().addListener(listener); + fromDb.getConnection().removeListener(listener); + }); + fromDb.getDatabaseEndpoint().handOverCommandQueue(toDb.getDatabaseEndpoint()); + } + + @Override + public CircuitBreaker getCircuitBreaker(RedisURI endpoint) { + RedisDatabase database = databases.get(endpoint); + if (database == null) { + throw new IllegalArgumentException("Unknown endpoint: " + endpoint); + } + return database.getCircuitBreaker(); + } + + @Override + public void addDatabase(RedisURI redisURI, float weight) { + addDatabase(new DatabaseConfig(redisURI, weight)); + } + + @Override + public void addDatabase(DatabaseConfig databaseConfig) { + if (databaseConfig == null) { + throw new IllegalArgumentException("DatabaseConfig must not be null"); + } + + if (connectionFactory == null) { + throw new UnsupportedOperationException( + "Adding databases dynamically is not supported. Connection was created without a DatabaseConnectionFactory."); + } + + RedisURI redisURI = databaseConfig.getRedisURI(); + if (databases.containsKey(redisURI)) { + throw new IllegalArgumentException("Database already exists: " + redisURI); + } + + // Create new database connection using the factory + RedisDatabase database = connectionFactory.createDatabase(databaseConfig, codec); + + // Add listeners to the new connection if it's the current one + // (though it won't be current initially since we're just adding it) + databases.put(redisURI, database); + + database.getCircuitBreaker().addListener(this::onCircuitBreakerStateChange); + } + + @Override + public void removeDatabase(RedisURI redisURI) { + if (redisURI == null) { + throw new IllegalArgumentException("RedisURI must not be null"); + } + + RedisDatabase database = databases.get(redisURI); + if (database == null) { + throw new IllegalArgumentException("Database not found: " + redisURI); + } + + if (current.getRedisURI().equals(redisURI)) { + throw new UnsupportedOperationException("Cannot remove the currently active database: " + redisURI); + } + + // Remove the database and close its connection + databases.remove(redisURI); + database.getConnection().close(); + database.getCircuitBreaker().removeListener(this::onCircuitBreakerStateChange); + } + +} diff --git a/src/main/java/io/lettuce/core/failover/StatefulRedisMultiDbPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/failover/StatefulRedisMultiDbPubSubConnectionImpl.java new file mode 100644 index 0000000000..527b3afd65 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/StatefulRedisMultiDbPubSubConnectionImpl.java @@ -0,0 +1,147 @@ +package io.lettuce.core.failover; + +import java.lang.reflect.Array; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.function.Supplier; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.failover.api.StatefulRedisMultiDbPubSubConnection; +import io.lettuce.core.json.JsonParser; +import io.lettuce.core.pubsub.PubSubEndpoint; +import io.lettuce.core.pubsub.RedisPubSubAsyncCommandsImpl; +import io.lettuce.core.pubsub.RedisPubSubListener; +import io.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; +import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import io.lettuce.core.resource.ClientResources; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * @author Ali Takavci + * @since 7.1 + */ +public class StatefulRedisMultiDbPubSubConnectionImpl + extends StatefulRedisMultiDbConnectionImpl, K, V> + implements StatefulRedisMultiDbPubSubConnection { + + private final Set> pubSubListeners = ConcurrentHashMap.newKeySet(); + + public StatefulRedisMultiDbPubSubConnectionImpl( + Map>> connections, ClientResources resources, + RedisCodec codec, Supplier parser, + DatabaseConnectionFactory, K, V> connectionFactory) { + super(connections, resources, codec, parser, connectionFactory); + } + + @Override + public void addListener(RedisPubSubListener listener) { + pubSubListeners.add(listener); + current.getConnection().addListener(listener); + } + + @Override + public void removeListener(RedisPubSubListener listener) { + pubSubListeners.remove(listener); + current.getConnection().removeListener(listener); + } + + @Override + public RedisPubSubAsyncCommands async() { + return (RedisPubSubAsyncCommands) async; + } + + @Override + protected RedisPubSubAsyncCommandsImpl newRedisAsyncCommandsImpl() { + return new RedisPubSubAsyncCommandsImpl<>(this, codec); + } + + @Override + public RedisPubSubCommands sync() { + return (RedisPubSubCommands) sync; + } + + @Override + protected RedisPubSubCommands newRedisSyncCommandsImpl() { + return syncHandler(async(), RedisPubSubCommands.class); + } + + @Override + public RedisPubSubReactiveCommands reactive() { + return (RedisPubSubReactiveCommands) reactive; + } + + @Override + protected RedisPubSubReactiveCommandsImpl newRedisReactiveCommandsImpl() { + return new RedisPubSubReactiveCommandsImpl<>(this, codec); + } + + @Override + public void switchToDatabase(RedisURI redisURI) { + + RedisDatabase> fromDb = current; + super.switchToDatabase(redisURI); + pubSubListeners.forEach(listener -> { + current.getConnection().addListener(listener); + fromDb.getConnection().removeListener(listener); + }); + + moveSubscriptions(fromDb, current); + } + + @SuppressWarnings("unchecked") + public void moveSubscriptions(RedisDatabase> fromDb, + RedisDatabase> toDb) { + + PubSubEndpoint fromEndpoint = (PubSubEndpoint) fromDb.getDatabaseEndpoint(); + StatefulRedisPubSubConnection fromConn = (StatefulRedisPubSubConnection) fromDb.getConnection(); + + if (fromEndpoint.hasChannelSubscriptions()) { + K[] channels = toArray(fromEndpoint.getChannels()); + moveSubscriptions(channels, async()::subscribe, fromConn.async()::unsubscribe); + } + + if (fromEndpoint.hasShardChannelSubscriptions()) { + K[] shardChannels = toArray(fromEndpoint.getShardChannels()); + moveSubscriptions(shardChannels, async()::ssubscribe, fromConn.async()::sunsubscribe); + } + + if (fromEndpoint.hasPatternSubscriptions()) { + K[] patterns = toArray(fromEndpoint.getPatterns()); + moveSubscriptions(patterns, async()::psubscribe, fromConn.async()::punsubscribe); + } + } + + private void moveSubscriptions(K[] channels, Function> subscribeFunc, + Function> unsubscribeFunc) { + // Re-subscribe to new endpoint + RedisFuture subscribeFuture = subscribeFunc.apply(channels); + handlePubSubCommandError(subscribeFuture, "Re-subscribe failed: "); + + // Unsubscribe from old endpoint on best effort basis + RedisFuture unsubscribeFuture = unsubscribeFunc.apply(channels); + handlePubSubCommandError(unsubscribeFuture, "Unsubscribe from old endpoint failed (best effort): "); + } + + private void handlePubSubCommandError(RedisFuture future, String message) { + future.exceptionally(throwable -> { + InternalLoggerFactory.getInstance(getClass()).warn(message + future.getError()); + return null; + }); + } + + @SuppressWarnings("unchecked") + private T[] toArray(Collection c) { + Class cls = (Class) c.iterator().next().getClass(); + T[] array = (T[]) Array.newInstance(cls, c.size()); + return c.toArray(array); + } + +} diff --git a/src/main/java/io/lettuce/core/failover/api/BaseRedisClient.java b/src/main/java/io/lettuce/core/failover/api/BaseRedisClient.java new file mode 100644 index 0000000000..0280ddae31 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/api/BaseRedisClient.java @@ -0,0 +1,134 @@ +package io.lettuce.core.failover.api; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisConnectionStateListener; +import io.lettuce.core.event.command.CommandListener; +import io.lettuce.core.resource.ClientResources; +import io.netty.util.concurrent.EventExecutorGroup; + +/** + * @author Ali Takavci + * @since 7.1 + */ +// This is same interface that AbstractRedisClient expose as public +public interface BaseRedisClient extends AutoCloseable { + + /** + * Returns the {@link ClientOptions} which are valid for that client. Connections inherit the current options at the moment + * the connection is created. Changes to options will not affect existing connections. + * + * @return the {@link ClientOptions} for this client + */ + ClientOptions getOptions(); + + /** + * Returns the {@link ClientResources} which are used with that client. + * + * @return the {@link ClientResources} for this client. + * @since 6.0 + * + */ + ClientResources getResources(); + + /** + * Add a listener for the RedisConnectionState. The listener is notified every time a connect/disconnect/IO exception + * happens. The listeners are not bound to a specific connection, so every time a connection event happens on any + * connection, the listener will be notified. The corresponding netty channel handler (async connection) is passed on the + * event. + * + * @param listener must not be {@code null}. + */ + void addListener(RedisConnectionStateListener listener); + + /** + * Removes a listener. + * + * @param listener must not be {@code null}. + */ + void removeListener(RedisConnectionStateListener listener); + + /** + * Add a listener for Redis Command events. The listener is notified on each command start/success/failure. + * + * @param listener must not be {@code null}. + * @since 6.1 + */ + void addListener(CommandListener listener); + + /** + * Removes a listener. + * + * @param listener must not be {@code null}. + * @since 6.1 + */ + void removeListener(CommandListener listener); + + /** + * Shutdown this client and close all open connections once this method is called. Once all connections are closed, the + * associated {@link ClientResources} are shut down/released gracefully considering quiet time and the shutdown timeout. The + * client should be discarded after calling shutdown. The shutdown is executed without quiet time and a timeout of 2 + * {@link TimeUnit#SECONDS}. + * + * @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) + */ + void shutdown(); + + @Override + void close(); + + /** + * Shutdown this client and close all open connections once this method is called. Once all connections are closed, the + * associated {@link ClientResources} are shut down/released gracefully considering quiet time and the shutdown timeout. The + * client should be discarded after calling shutdown. + * + * @param quietPeriod the quiet period to allow the executor gracefully shut down. + * @param timeout the maximum amount of time to wait until the backing executor is shutdown regardless if a task was + * submitted during the quiet period. + * @since 5.0 + * @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) + */ + void shutdown(Duration quietPeriod, Duration timeout); + + /** + * Shutdown this client and close all open connections once this method is called. Once all connections are closed, the + * associated {@link ClientResources} are shut down/released gracefully considering quiet time and the shutdown timeout. The + * client should be discarded after calling shutdown. + * + * @param quietPeriod the quiet period to allow the executor gracefully shut down. + * @param timeout the maximum amount of time to wait until the backing executor is shutdown regardless if a task was + * submitted during the quiet period. + * @param timeUnit the unit of {@code quietPeriod} and {@code timeout}. + * @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) + */ + void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit); + + /** + * Shutdown this client and close all open connections asynchronously. Once all connections are closed, the associated + * {@link ClientResources} are shut down/released gracefully considering quiet time and the shutdown timeout. The client + * should be discarded after calling shutdown. The shutdown is executed without quiet time and a timeout of 2 + * {@link TimeUnit#SECONDS}. + * + * @since 4.4 + * @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) + */ + CompletableFuture shutdownAsync(); + + /** + * Shutdown this client and close all open connections asynchronously. Once all connections are closed, the associated + * {@link ClientResources} are shut down/released gracefully considering quiet time and the shutdown timeout. The client + * should be discarded after calling shutdown. + * + * @param quietPeriod the quiet period to allow the executor gracefully shut down. + * @param timeout the maximum amount of time to wait until the backing executor is shutdown regardless if a task was + * submitted during the quiet period. + * @param timeUnit the unit of {@code quietPeriod} and {@code timeout}. + * @since 4.4 + * @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) + */ + CompletableFuture shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit); + +} diff --git a/src/main/java/io/lettuce/core/failover/api/BaseRedisMultiDbConnection.java b/src/main/java/io/lettuce/core/failover/api/BaseRedisMultiDbConnection.java new file mode 100644 index 0000000000..3480eee37e --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/api/BaseRedisMultiDbConnection.java @@ -0,0 +1,70 @@ +package io.lettuce.core.failover.api; + +import io.lettuce.core.RedisURI; +import io.lettuce.core.failover.CircuitBreaker; +import io.lettuce.core.failover.DatabaseConfig; + +/** + * @author Ali Takavci + * @since 7.1 + */ +public interface BaseRedisMultiDbConnection { + + /** + * Switch to a different database. + * + * @param redisURI the Redis URI of the database to switch to, must not be {@code null} + * @throws IllegalArgumentException if the database does not exist + */ + void switchToDatabase(RedisURI redisURI); + + /** + * Get the current database endpoint. + * + * @return the current database endpoint + */ + RedisURI getCurrentEndpoint(); + + /** + * Get all available database endpoints. + * + * @return an iterable of all database endpoints + */ + Iterable getEndpoints(); + + /** + * Get the circuit breaker for a specific endpoint. + * + * @param endpoint the Redis endpoint URI + * @return the circuit breaker for the endpoint + * @throws IllegalArgumentException if the endpoint is not known + */ + CircuitBreaker getCircuitBreaker(RedisURI endpoint); + + /** + * Add a new database to the multi-database connection. + * + * @param redisURI the Redis URI for the new database, must not be {@code null} + * @param weight the weight for load balancing, must be greater than 0 + * @throws IllegalArgumentException if the database already exists or parameters are invalid + */ + void addDatabase(RedisURI redisURI, float weight); + + /** + * Add a new database to the multi-database connection. + * + * @param databaseConfig the database configuration, must not be {@code null} + * @throws IllegalArgumentException if the database already exists or configuration is invalid + */ + void addDatabase(DatabaseConfig databaseConfig); + + /** + * Remove a database from the multi-database connection. + * + * @param redisURI the Redis URI of the database to remove, must not be {@code null} + * @throws IllegalArgumentException if the database does not exist + * @throws UnsupportedOperationException if attempting to remove the currently active database + */ + void removeDatabase(RedisURI redisURI); + +} diff --git a/src/main/java/io/lettuce/core/failover/api/CircuitBreakerStateListener.java b/src/main/java/io/lettuce/core/failover/api/CircuitBreakerStateListener.java new file mode 100644 index 0000000000..9817466e01 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/api/CircuitBreakerStateListener.java @@ -0,0 +1,20 @@ +package io.lettuce.core.failover.api; + +import io.lettuce.core.failover.CircuitBreakerStateChangeEvent; + +/** + * Listener interface for circuit breaker state change events. + * + * @author Ali Takavci + * @since 7.1 + */ +public interface CircuitBreakerStateListener { + + /** + * Event handler for circuit breaker state change events. + * + * @param event the state change event containing previous state, new state, and the circuit breaker instance + */ + void onCircuitBreakerStateChange(CircuitBreakerStateChangeEvent event); + +} diff --git a/src/main/java/io/lettuce/core/failover/api/StatefulRedisMultiDbConnection.java b/src/main/java/io/lettuce/core/failover/api/StatefulRedisMultiDbConnection.java new file mode 100644 index 0000000000..6fa1c54e44 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/api/StatefulRedisMultiDbConnection.java @@ -0,0 +1,17 @@ +package io.lettuce.core.failover.api; + +import io.lettuce.core.api.StatefulRedisConnection; + +/** + * Stateful multi-database Redis connection that supports failover between multiple endpoints. Each endpoint has its own circuit + * breaker for tracking command metrics. + * + * @param Key type + * @param Value type + * + * @author Ali Takavci + * @since 7.1 + */ +public interface StatefulRedisMultiDbConnection extends StatefulRedisConnection, BaseRedisMultiDbConnection { + +} diff --git a/src/main/java/io/lettuce/core/failover/api/StatefulRedisMultiDbPubSubConnection.java b/src/main/java/io/lettuce/core/failover/api/StatefulRedisMultiDbPubSubConnection.java new file mode 100644 index 0000000000..a85f35b8fd --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/api/StatefulRedisMultiDbPubSubConnection.java @@ -0,0 +1,18 @@ +package io.lettuce.core.failover.api; + +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; + +/** + * Stateful multi-database Redis PubSub connection that supports failover between multiple endpoints. Each endpoint has its own + * circuit breaker for tracking command metrics. + * + * @param Key type + * @param Value type + * + * @author Ali Takavci + * @since 7.1 + */ +public interface StatefulRedisMultiDbPubSubConnection + extends StatefulRedisPubSubConnection, BaseRedisMultiDbConnection { + +} diff --git a/src/main/java/io/lettuce/core/failover/metrics/CircuitBreakerMetrics.java b/src/main/java/io/lettuce/core/failover/metrics/CircuitBreakerMetrics.java new file mode 100644 index 0000000000..00b06421cf --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/metrics/CircuitBreakerMetrics.java @@ -0,0 +1,40 @@ +package io.lettuce.core.failover.metrics; + +/** + * Interface for circuit breaker metrics tracking successes and failures within a time-based sliding window. Thread-safe and + * lock-free using atomic operations. + * + *

+ * This interface defines the contract for tracking metrics over a configurable time period. Old data outside the window is + * automatically expired and cleaned up. + *

+ * + * @author Ali Takavci + * @since 7.1 + */ +public interface CircuitBreakerMetrics { + + /** + * Record a successful command execution. Lock-free operation. + */ + void recordSuccess(); + + /** + * Record a failed command execution. Lock-free operation. + */ + void recordFailure(); + + /** + * Get a snapshot of the current metrics within the time window. Use the snapshot to access success count, failure count, + * total count, and failure rate. + * + * @return an immutable snapshot of current metrics + */ + MetricsSnapshot getSnapshot(); + + /** + * Reset all metrics to zero. + */ + void reset(); + +} diff --git a/src/main/java/io/lettuce/core/failover/metrics/CircuitBreakerMetricsImpl.java b/src/main/java/io/lettuce/core/failover/metrics/CircuitBreakerMetricsImpl.java new file mode 100644 index 0000000000..8fe88df0e3 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/metrics/CircuitBreakerMetricsImpl.java @@ -0,0 +1,88 @@ +package io.lettuce.core.failover.metrics; + +/** + * Lock-free, thread-safe implementation of circuit breaker metrics using a time-based sliding window. Tracks successes and + * failures within a configurable time period. + * + *

+ * This implementation uses a lock-free sliding window mechanism to track metrics over a configurable time period (default: 2 + * seconds). Old data outside the window is automatically expired and cleaned up. + *

+ * + * @author Ali Takavci + * @since 7.1 + */ +public class CircuitBreakerMetricsImpl implements CircuitBreakerMetrics { + + /** + * Default window duration: 2 seconds. + */ + private static final long DEFAULT_WINDOW_DURATION_MS = 2_000; + + /** + * Default bucket duration: 1 second. + */ + private static final long DEFAULT_BUCKET_DURATION_MS = 1_000; + + /** + * Lock-free sliding window metrics implementation. + */ + private final SlidingWindowMetrics slidingWindow; + + /** + * Create metrics instance with default configuration (2 second window, 1 second buckets). + */ + public CircuitBreakerMetricsImpl() { + this(DEFAULT_WINDOW_DURATION_MS, DEFAULT_BUCKET_DURATION_MS); + } + + /** + * Create metrics instance with custom window configuration. + * + * @param windowDurationMs the window duration in milliseconds + * @param bucketDurationMs the bucket duration in milliseconds + */ + public CircuitBreakerMetricsImpl(long windowDurationMs, long bucketDurationMs) { + this.slidingWindow = new LockFreeSlidingWindowMetrics(windowDurationMs, bucketDurationMs); + } + + /** + * Record a successful command execution. Lock-free operation. + */ + @Override + public void recordSuccess() { + slidingWindow.recordSuccess(); + } + + /** + * Record a failed command execution. Lock-free operation. + */ + @Override + public void recordFailure() { + slidingWindow.recordFailure(); + } + + /** + * Get a snapshot of the current metrics within the time window. + * + * @return an immutable snapshot of current metrics + */ + @Override + public MetricsSnapshot getSnapshot() { + return slidingWindow.getSnapshot(); + } + + /** + * Reset all metrics to zero. + */ + @Override + public void reset() { + slidingWindow.reset(); + } + + @Override + public String toString() { + return "CircuitBreakerMetrics{" + slidingWindow.getSnapshot() + '}'; + } + +} diff --git a/src/main/java/io/lettuce/core/failover/metrics/LockFreeSlidingWindowMetrics.java b/src/main/java/io/lettuce/core/failover/metrics/LockFreeSlidingWindowMetrics.java new file mode 100644 index 0000000000..46a01ac6ee --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/metrics/LockFreeSlidingWindowMetrics.java @@ -0,0 +1,185 @@ +package io.lettuce.core.failover.metrics; + +/** + * Lock-free, thread-safe implementation of sliding window metrics using atomic operations and a ring buffer of time buckets. + * + *

+ * This implementation uses: + *

    + *
  • Fixed-size ring buffer of {@link TimeWindowBucket} objects
  • + *
  • Atomic operations for lock-free updates
  • + *
  • Time-based bucketing for efficient expiration
  • + *
  • Lazy expiration: old data is ignored during queries, not explicitly cleaned
  • + *
+ *

+ * + *

+ * Memory overhead: ~160 bytes per instance (2 buckets × 40 bytes each) + *

+ * + *

+ * Thread-safety: All operations are lock-free and thread-safe. Multiple threads can record events and query metrics + * concurrently without contention. + *

+ * + * @author Ali Takavci + * @since 7.1 + */ +public class LockFreeSlidingWindowMetrics implements SlidingWindowMetrics { + + /** + * Default window duration: 2 seconds. + */ + private static final long DEFAULT_WINDOW_DURATION_MS = 2_000; + + /** + * Default bucket duration: 1 second. + */ + private static final long DEFAULT_BUCKET_DURATION_MS = 1_000; + + /** + * Minimum bucket duration: 1 second. + */ + private static final long MIN_BUCKET_DURATION_MS = 1_000; + + /** + * Window duration in milliseconds. + */ + private final long windowDurationMs; + + /** + * Bucket duration in milliseconds. + */ + private final long bucketDurationMs; + + /** + * Number of buckets in the ring buffer. + */ + private final int bucketCount; + + /** + * Ring buffer of time window buckets. Fixed-size, reused for lock-free operation. + */ + private final TimeWindowBucket[] ringBuffer; + + /** + * Create a new lock-free sliding window metrics with default configuration (2 seconds, 1 second buckets). + */ + public LockFreeSlidingWindowMetrics() { + this(DEFAULT_WINDOW_DURATION_MS, DEFAULT_BUCKET_DURATION_MS); + } + + /** + * Create a new lock-free sliding window metrics with custom configuration. + * + * @param windowDurationMs the window duration in milliseconds (must be >= bucketDurationMs) + * @param bucketDurationMs the bucket duration in milliseconds (must be >= 1000) + * @throws IllegalArgumentException if configuration is invalid + */ + public LockFreeSlidingWindowMetrics(long windowDurationMs, long bucketDurationMs) { + if (bucketDurationMs < MIN_BUCKET_DURATION_MS) { + throw new IllegalArgumentException( + "Bucket duration must be at least " + MIN_BUCKET_DURATION_MS + "ms, got: " + bucketDurationMs); + } + if (windowDurationMs < bucketDurationMs) { + throw new IllegalArgumentException("Window duration must be >= bucket duration. Window: " + windowDurationMs + + "ms, Bucket: " + bucketDurationMs + "ms"); + } + + this.windowDurationMs = windowDurationMs; + this.bucketDurationMs = bucketDurationMs; + this.bucketCount = (int) (windowDurationMs / bucketDurationMs); + this.ringBuffer = new TimeWindowBucket[bucketCount]; + + // Initialize all buckets + long currentTime = System.currentTimeMillis(); + for (int i = 0; i < bucketCount; i++) { + ringBuffer[i] = new TimeWindowBucket(currentTime); + } + } + + @Override + public void recordSuccess() { + recordEvent(true); + } + + @Override + public void recordFailure() { + recordEvent(false); + } + + /** + * Record an event (success or failure). Lock-free operation. + * + * @param isSuccess true for success, false for failure + */ + private void recordEvent(boolean isSuccess) { + long currentTimeMs = System.currentTimeMillis(); + TimeWindowBucket bucket = getCurrentBucket(currentTimeMs); + + if (isSuccess) { + bucket.incrementSuccessCount(); + } else { + bucket.incrementFailureCount(); + } + } + + /** + * Get the current bucket for the given time, rotating if necessary. Lock-free operation. + * + * @param currentTimeMs the current time in milliseconds + * @return the current bucket + */ + private TimeWindowBucket getCurrentBucket(long currentTimeMs) { + // Calculate which bucket this time belongs to + int bucketIndex = (int) ((currentTimeMs / bucketDurationMs) % bucketCount); + + // Get the bucket + TimeWindowBucket bucket = ringBuffer[bucketIndex]; + + // Check if bucket needs rotation (is stale) + if (bucket.isStale(currentTimeMs, windowDurationMs)) { + // Reset and update the bucket + bucket.reset(); + bucket.setTimestamp(currentTimeMs); + } + + return bucket; + } + + @Override + public MetricsSnapshot getSnapshot() { + long currentTimeMs = System.currentTimeMillis(); + long windowStart = currentTimeMs - windowDurationMs; + + long totalSuccess = 0; + long totalFailure = 0; + + // Iterate through all buckets and sum valid ones + for (TimeWindowBucket bucket : ringBuffer) { + // Only include buckets within the current window + if (bucket.getTimestamp() >= windowStart) { + totalSuccess += bucket.getSuccessCount(); + totalFailure += bucket.getFailureCount(); + } + } + + return new MetricsSnapshot(totalSuccess, totalFailure, currentTimeMs); + } + + @Override + public void reset() { + for (TimeWindowBucket bucket : ringBuffer) { + bucket.reset(); + bucket.setTimestamp(System.currentTimeMillis()); + } + } + + @Override + public String toString() { + MetricsSnapshot snapshot = getSnapshot(); + return "LockFreeSlidingWindowMetrics{" + "window=" + windowDurationMs + "ms, bucket=" + bucketDurationMs + + "ms, buckets=" + bucketCount + ", " + snapshot + '}'; + } + +} diff --git a/src/main/java/io/lettuce/core/failover/metrics/MetricsSnapshot.java b/src/main/java/io/lettuce/core/failover/metrics/MetricsSnapshot.java new file mode 100644 index 0000000000..0c06ad44c8 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/metrics/MetricsSnapshot.java @@ -0,0 +1,137 @@ +package io.lettuce.core.failover.metrics; + +/** + * Immutable snapshot of metrics at a point in time. Represents the state of success and failure counts within a specific time + * window. + * + *

+ * This class is thread-safe and immutable. It captures a consistent view of metrics at the moment of creation and does not + * change afterward. + *

+ * + * @author Ali Takavci + * @since 7.1 + */ +public class MetricsSnapshot { + + /** + * Number of successful command executions in the time window. + */ + private final long successCount; + + /** + * Number of failed command executions in the time window. + */ + private final long failureCount; + + /** + * Timestamp (milliseconds) when this snapshot was created. + */ + private final long snapshotTime; + + /** + * Create a new metrics snapshot with the given counts. + * + * @param successCount the number of successful commands + * @param failureCount the number of failed commands + */ + public MetricsSnapshot(long successCount, long failureCount) { + this(successCount, failureCount, System.currentTimeMillis()); + } + + /** + * Create a new metrics snapshot with the given counts and timestamp. + * + * @param successCount the number of successful commands + * @param failureCount the number of failed commands + * @param snapshotTime the timestamp when this snapshot was created + */ + public MetricsSnapshot(long successCount, long failureCount, long snapshotTime) { + this.successCount = successCount; + this.failureCount = failureCount; + this.snapshotTime = snapshotTime; + } + + /** + * Get the number of successful command executions in the time window. + * + * @return the success count + */ + public long getSuccessCount() { + return successCount; + } + + /** + * Get the number of failed command executions in the time window. + * + * @return the failure count + */ + public long getFailureCount() { + return failureCount; + } + + /** + * Get the total number of commands (success + failure) in the time window. + * + * @return the total count + */ + public long getTotalCount() { + return successCount + failureCount; + } + + /** + * Get the failure rate as a percentage (0-100). + * + * @return the failure rate, or 0 if no commands have been executed + */ + public double getFailureRate() { + long total = getTotalCount(); + if (total == 0) { + return 0.0; + } + return (failureCount * 100.0) / total; + } + + /** + * Get the success rate as a percentage (0-100). + * + * @return the success rate, or 0 if no commands have been executed + */ + public double getSuccessRate() { + return 100.0 - getFailureRate(); + } + + /** + * Get the timestamp when this snapshot was created. + * + * @return the snapshot time in milliseconds + */ + public long getSnapshotTime() { + return snapshotTime; + } + + /** + * Check if this snapshot has any data (at least one command executed). + * + * @return true if total count > 0 + */ + public boolean hasData() { + return getTotalCount() > 0; + } + + /** + * Check if this snapshot indicates a healthy state (no failures). + * + * @return true if failure count is 0 + */ + public boolean isHealthy() { + return failureCount == 0; + } + + @Override + public String toString() { + return "MetricsSnapshot{" + "success=" + successCount + ", failure=" + failureCount + ", total=" + getTotalCount() + + ", failureRate=" + String.format("%.2f", getFailureRate()) + "%, snapshotTime=" + snapshotTime + '}'; + } + +} diff --git a/src/main/java/io/lettuce/core/failover/metrics/SlidingWindowMetrics.java b/src/main/java/io/lettuce/core/failover/metrics/SlidingWindowMetrics.java new file mode 100644 index 0000000000..d9c6554b0b --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/metrics/SlidingWindowMetrics.java @@ -0,0 +1,45 @@ +package io.lettuce.core.failover.metrics; + +/** + * Interface for time-based sliding window metrics. Provides lock-free, thread-safe tracking of success and failure counts + * within a configurable time window. + * + *

+ * Implementations must be: + *

    + *
  • Lock-free: No explicit locks, using atomic operations
  • + *
  • Thread-safe: Safe for concurrent access from multiple threads
  • + *
  • Efficient: Minimal memory overhead and fast operations
  • + *
  • Time-based: Automatic expiration of old data outside the window
  • + *
+ *

+ * + * @author Ali Takavci + * @since 7.1 + */ +public interface SlidingWindowMetrics { + + /** + * Record a successful command execution. Lock-free operation. + */ + void recordSuccess(); + + /** + * Record a failed command execution. Lock-free operation. + */ + void recordFailure(); + + /** + * Get a snapshot of the current metrics within the time window. This is a point-in-time view and does not change after + * being returned. Use the snapshot to access success count, failure count, total count, and failure rate. + * + * @return an immutable snapshot of current metrics + */ + MetricsSnapshot getSnapshot(); + + /** + * Reset all metrics to zero. This clears all buckets and counters. + */ + void reset(); + +} diff --git a/src/main/java/io/lettuce/core/failover/metrics/TimeWindowBucket.java b/src/main/java/io/lettuce/core/failover/metrics/TimeWindowBucket.java new file mode 100644 index 0000000000..8d25666e11 --- /dev/null +++ b/src/main/java/io/lettuce/core/failover/metrics/TimeWindowBucket.java @@ -0,0 +1,133 @@ +package io.lettuce.core.failover.metrics; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Immutable time window bucket for lock-free sliding window metrics. Each bucket represents a fixed time interval (e.g., 1 + * second) and holds atomic counters for success and failure counts. + * + *

+ * This class is designed for lock-free, thread-safe operation with minimal memory overhead. The timestamp is immutable, while + * the counters are atomic for thread-safe updates without locks. + *

+ * + * @author Ali Takavci + * @since 7.1 + */ +public class TimeWindowBucket { + + /** + * Timestamp (milliseconds) when this bucket was created or reset. Immutable after construction. + */ + private volatile long timestamp; + + /** + * Atomic counter for successful command executions in this bucket. Lock-free updates. + */ + private final AtomicLong successCount; + + /** + * Atomic counter for failed command executions in this bucket. Lock-free updates. + */ + private final AtomicLong failureCount; + + /** + * Create a new time window bucket with the given timestamp. + * + * @param timestamp the bucket start time in milliseconds + */ + public TimeWindowBucket(long timestamp) { + this.timestamp = timestamp; + this.successCount = new AtomicLong(0); + this.failureCount = new AtomicLong(0); + } + + /** + * Get the timestamp (milliseconds) when this bucket was created or reset. + * + * @return the bucket timestamp + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Set the timestamp for this bucket. Used during bucket reset/rotation. + * + * @param timestamp the new timestamp in milliseconds + */ + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + /** + * Get the current success count for this bucket. Lock-free read. + * + * @return the success count + */ + public long getSuccessCount() { + return successCount.get(); + } + + /** + * Atomically increment the success counter. Lock-free operation. + * + * @return the new success count + */ + public long incrementSuccessCount() { + return successCount.incrementAndGet(); + } + + /** + * Get the current failure count for this bucket. Lock-free read. + * + * @return the failure count + */ + public long getFailureCount() { + return failureCount.get(); + } + + /** + * Atomically increment the failure counter. Lock-free operation. + * + * @return the new failure count + */ + public long incrementFailureCount() { + return failureCount.incrementAndGet(); + } + + /** + * Reset both counters to zero. Used during bucket rotation. Lock-free operation. + */ + public void reset() { + successCount.set(0); + failureCount.set(0); + } + + /** + * Get the total count (success + failure) for this bucket. + * + * @return the total count + */ + public long getTotalCount() { + return successCount.get() + failureCount.get(); + } + + /** + * Check if this bucket is stale (older than the given age in milliseconds). + * + * @param currentTimeMs the current time in milliseconds + * @param maxAgeMs the maximum age in milliseconds + * @return true if the bucket is older than maxAgeMs + */ + public boolean isStale(long currentTimeMs, long maxAgeMs) { + return (currentTimeMs - timestamp) > maxAgeMs; + } + + @Override + public String toString() { + return "TimeWindowBucket{" + "timestamp=" + timestamp + ", success=" + successCount.get() + ", failure=" + + failureCount.get() + '}'; + } + +} diff --git a/src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java b/src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java new file mode 100644 index 0000000000..90529ac746 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java @@ -0,0 +1,160 @@ +package io.lettuce.core.failover; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import javax.inject.Inject; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.lettuce.core.RedisURI; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.test.LettuceExtension; + +/** + * Integration tests for circuit breaker metrics tracking in multi-database connections. + * + * @author Ali Takavci + * @since 7.1 + */ +@ExtendWith(LettuceExtension.class) +@Tag("integration") +class CircuitBreakerMetricsIntegrationTests extends MultiDbTestSupport { + + @Inject + CircuitBreakerMetricsIntegrationTests(MultiDbClient client) { + super(client); + } + + @Test + void shouldTrackSuccessfulCommands() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + RedisURI endpoint = connection.getCurrentEndpoint(); + + // Execute successful command + connection.sync().set("key", "value"); + + // Get metrics + CircuitBreaker cb = connection.getCircuitBreaker(endpoint); + assertNotNull(cb); + assertThat(cb.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1); + assertThat(cb.getMetrics().getSnapshot().getFailureCount()).isEqualTo(0); + + connection.close(); + } + + @Test + void shouldTrackMultipleCommands() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + RedisURI endpoint = connection.getCurrentEndpoint(); + + // Execute multiple commands + connection.sync().set("key1", "value1"); + connection.sync().set("key2", "value2"); + connection.sync().get("key1"); + + // Get metrics + CircuitBreaker cb = connection.getCircuitBreaker(endpoint); + assertThat(cb.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(3); + assertThat(cb.getMetrics().getSnapshot().getFailureCount()).isEqualTo(0); + + connection.close(); + } + + @Test + void shouldIsolatMetricsPerEndpoint() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + List endpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .collect(Collectors.toList()); + + // Execute command on first endpoint + connection.sync().set("key1", "value1"); + RedisURI firstEndpoint = connection.getCurrentEndpoint(); + + // Switch to second endpoint + RedisURI secondEndpoint = endpoints.stream().filter(uri -> !uri.equals(firstEndpoint)).findFirst() + .orElseThrow(() -> new IllegalStateException("No second endpoint found")); + connection.switchToDatabase(secondEndpoint); + + // Execute command on second endpoint + connection.sync().set("key2", "value2"); + + // Get metrics for both endpoints + CircuitBreaker cb1 = connection.getCircuitBreaker(firstEndpoint); + CircuitBreaker cb2 = connection.getCircuitBreaker(secondEndpoint); + + // Verify isolation - each endpoint has its own metrics + assertThat(cb1.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1); + assertThat(cb2.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1); + + connection.close(); + } + + @Test + void shouldThrowExceptionForUnknownEndpoint() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + RedisURI unknownEndpoint = RedisURI.create("redis://unknown:9999"); + + assertThatThrownBy(() -> connection.getCircuitBreaker(unknownEndpoint)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown endpoint"); + + connection.close(); + } + + @Test + void shouldMaintainMetricsAfterSwitch() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + RedisURI firstEndpoint = connection.getCurrentEndpoint(); + + // Execute command on first endpoint + connection.sync().set("key1", "value1"); + CircuitBreaker cb1Before = connection.getCircuitBreaker(firstEndpoint); + long successes1Before = cb1Before.getMetrics().getSnapshot().getSuccessCount(); + + // Switch to second endpoint + List endpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .collect(Collectors.toList()); + RedisURI secondEndpoint = endpoints.stream().filter(uri -> !uri.equals(firstEndpoint)).findFirst() + .orElseThrow(() -> new IllegalStateException("No second endpoint found")); + connection.switchToDatabase(secondEndpoint); + + // Execute command on second endpoint + connection.sync().set("key2", "value2"); + + // Switch back to first endpoint + connection.switchToDatabase(firstEndpoint); + + // Verify metrics for first endpoint are unchanged + CircuitBreaker cb1After = connection.getCircuitBreaker(firstEndpoint); + assertThat(cb1After.getMetrics().getSnapshot().getSuccessCount()).isEqualTo(successes1Before); + + connection.close(); + } + + @Test + void shouldExposeMetricsViaCircuitBreaker() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + RedisURI endpoint = connection.getCurrentEndpoint(); + + // Execute commands + connection.sync().set("key", "value"); + connection.sync().get("key"); + + // Get circuit breaker and verify metrics are accessible + CircuitBreaker cb = connection.getCircuitBreaker(endpoint); + assertNotNull(cb); + assertNotNull(cb.getMetrics()); + assertThat(cb.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(2); + + connection.close(); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/CircuitBreakerStateListenerTests.java b/src/test/java/io/lettuce/core/failover/CircuitBreakerStateListenerTests.java new file mode 100644 index 0000000000..fa92878cda --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/CircuitBreakerStateListenerTests.java @@ -0,0 +1,169 @@ +package io.lettuce.core.failover; + +import static org.assertj.core.api.Assertions.*; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Tag; + +import io.lettuce.TestTags; +import io.lettuce.core.failover.api.CircuitBreakerStateListener; + +/** + * Unit tests for {@link CircuitBreakerStateListener} functionality. + * + * @author Ali Takavci + * @since 7.1 + */ +@Tag(TestTags.UNIT_TEST) +class CircuitBreakerStateListenerTests { + + private CircuitBreaker circuitBreaker; + + @BeforeEach + void setUp() { + CircuitBreaker.CircuitBreakerConfig config = new CircuitBreaker.CircuitBreakerConfig(50.0f, 5, + CircuitBreaker.CircuitBreakerConfig.DEFAULT.getTrackedExceptions()); + circuitBreaker = new CircuitBreaker(config); + } + + @Test + void shouldNotifyListenerOnStateChange() { + // Given + TestListener listener = new TestListener(); + circuitBreaker.addListener(listener); + + // When - trigger state change by recording failures + for (int i = 0; i < 10; i++) { + circuitBreaker.getMetrics().recordFailure(); + } + circuitBreaker.evaluateMetrics(); + + // Then + assertThat(listener.events).hasSize(1); + CircuitBreakerStateChangeEvent event = listener.events.get(0); + assertThat(event.getPreviousState()).isEqualTo(CircuitBreaker.State.CLOSED); + assertThat(event.getNewState()).isEqualTo(CircuitBreaker.State.OPEN); + assertThat(event.getCircuitBreaker()).isSameAs(circuitBreaker); + assertThat(event.getTimestamp()).isGreaterThan(0); + } + + @Test + void shouldNotifyMultipleListeners() { + // Given + TestListener listener1 = new TestListener(); + TestListener listener2 = new TestListener(); + circuitBreaker.addListener(listener1); + circuitBreaker.addListener(listener2); + + // When - trigger state change + for (int i = 0; i < 10; i++) { + circuitBreaker.getMetrics().recordFailure(); + } + circuitBreaker.evaluateMetrics(); + + // Then + assertThat(listener1.events).hasSize(1); + assertThat(listener2.events).hasSize(1); + } + + @Test + void shouldNotNotifyRemovedListener() { + // Given + TestListener listener = new TestListener(); + circuitBreaker.addListener(listener); + circuitBreaker.removeListener(listener); + + // When - trigger state change + for (int i = 0; i < 10; i++) { + circuitBreaker.getMetrics().recordFailure(); + } + circuitBreaker.evaluateMetrics(); + + // Then + assertThat(listener.events).isEmpty(); + } + + @Test + void shouldNotNotifyIfStateDoesNotChange() { + // Given + TestListener listener = new TestListener(); + circuitBreaker.addListener(listener); + + // When - evaluate without enough failures + circuitBreaker.getMetrics().recordSuccess(); + circuitBreaker.evaluateMetrics(); + + // Then + assertThat(listener.events).isEmpty(); + } + + @Test + void shouldHandleListenerExceptionsGracefully() { + // Given + FailingListener failingListener = new FailingListener(); + TestListener normalListener = new TestListener(); + circuitBreaker.addListener(failingListener); + circuitBreaker.addListener(normalListener); + + // When - trigger state change + for (int i = 0; i < 10; i++) { + circuitBreaker.getMetrics().recordFailure(); + } + circuitBreaker.evaluateMetrics(); + + // Then - normal listener should still receive the event + assertThat(normalListener.events).hasSize(1); + } + + @Test + void shouldIncludeTimestampInEvent() throws InterruptedException { + // Given + TestListener listener = new TestListener(); + circuitBreaker.addListener(listener); + long beforeTimestamp = System.currentTimeMillis(); + + // When + Thread.sleep(10); // Small delay to ensure timestamp difference + for (int i = 0; i < 10; i++) { + circuitBreaker.getMetrics().recordFailure(); + } + circuitBreaker.evaluateMetrics(); + long afterTimestamp = System.currentTimeMillis(); + + // Then + assertThat(listener.events).hasSize(1); + long eventTimestamp = listener.events.get(0).getTimestamp(); + assertThat(eventTimestamp).isBetween(beforeTimestamp, afterTimestamp); + } + + /** + * Test listener that collects all events. + */ + private static class TestListener implements CircuitBreakerStateListener { + + final List events = new ArrayList<>(); + + @Override + public void onCircuitBreakerStateChange(CircuitBreakerStateChangeEvent event) { + events.add(event); + } + + } + + /** + * Test listener that always throws an exception. + */ + private static class FailingListener implements CircuitBreakerStateListener { + + @Override + public void onCircuitBreakerStateChange(CircuitBreakerStateChangeEvent event) { + throw new RuntimeException("Listener failure"); + } + + } + +} diff --git a/src/test/java/io/lettuce/core/failover/LockFreeSlidingWindowMetricsUnitTests.java b/src/test/java/io/lettuce/core/failover/LockFreeSlidingWindowMetricsUnitTests.java new file mode 100644 index 0000000000..d6756d8708 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/LockFreeSlidingWindowMetricsUnitTests.java @@ -0,0 +1,203 @@ +package io.lettuce.core.failover; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.offset; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import io.lettuce.core.failover.metrics.LockFreeSlidingWindowMetrics; +import io.lettuce.core.failover.metrics.MetricsSnapshot; + +/** + * Unit tests for lock-free sliding window metrics implementation. + * + * @author Ali Takavci + * @since 7.1 + */ +@Tag("unit") +@DisplayName("Lock-Free Sliding Window Metrics") +class LockFreeSlidingWindowMetricsUnitTests { + + private LockFreeSlidingWindowMetrics metrics; + + @BeforeEach + void setUp() { + metrics = new LockFreeSlidingWindowMetrics(); + } + + @Test + @DisplayName("should initialize with default configuration") + void shouldInitializeWithDefaults() { + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getSuccessCount()).isEqualTo(0); + assertThat(snapshot.getFailureCount()).isEqualTo(0); + } + + @Test + @DisplayName("should record successful events") + void shouldRecordSuccessfulEvents() { + metrics.recordSuccess(); + metrics.recordSuccess(); + metrics.recordSuccess(); + + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getSuccessCount()).isEqualTo(3); + assertThat(snapshot.getFailureCount()).isEqualTo(0); + } + + @Test + @DisplayName("should record failed events") + void shouldRecordFailedEvents() { + metrics.recordFailure(); + metrics.recordFailure(); + + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getSuccessCount()).isEqualTo(0); + assertThat(snapshot.getFailureCount()).isEqualTo(2); + } + + @Test + @DisplayName("should track mixed success and failure events") + void shouldTrackMixedEvents() { + metrics.recordSuccess(); + metrics.recordSuccess(); + metrics.recordFailure(); + metrics.recordSuccess(); + metrics.recordFailure(); + + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getSuccessCount()).isEqualTo(3); + assertThat(snapshot.getFailureCount()).isEqualTo(2); + assertThat(snapshot.getTotalCount()).isEqualTo(5); + } + + @Test + @DisplayName("should calculate failure rate correctly") + void shouldCalculateFailureRate() { + metrics.recordSuccess(); + metrics.recordSuccess(); + metrics.recordFailure(); + + MetricsSnapshot snapshot = metrics.getSnapshot(); + double failureRate = snapshot.getFailureRate(); + assertThat(failureRate).isCloseTo(33.33, offset(0.1)); + } + + @Test + @DisplayName("should return zero failure rate when no events") + void shouldReturnZeroFailureRateWhenNoEvents() { + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getFailureRate()).isEqualTo(0.0); + } + + @Test + @DisplayName("should return 100% failure rate when all failures") + void shouldReturn100PercentFailureRateWhenAllFailures() { + metrics.recordFailure(); + metrics.recordFailure(); + + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getFailureRate()).isEqualTo(100.0); + } + + @Test + @DisplayName("should reset all metrics") + void shouldResetAllMetrics() { + metrics.recordSuccess(); + metrics.recordSuccess(); + metrics.recordFailure(); + + metrics.reset(); + + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getSuccessCount()).isEqualTo(0); + assertThat(snapshot.getFailureCount()).isEqualTo(0); + assertThat(snapshot.getTotalCount()).isEqualTo(0); + } + + @Test + @DisplayName("should provide immutable snapshot") + void shouldProvideImmutableSnapshot() { + metrics.recordSuccess(); + metrics.recordSuccess(); + metrics.recordFailure(); + + MetricsSnapshot snapshot1 = metrics.getSnapshot(); + assertThat(snapshot1.getSuccessCount()).isEqualTo(2); + assertThat(snapshot1.getFailureCount()).isEqualTo(1); + + // Record more events + metrics.recordSuccess(); + + // Snapshot should not change + assertThat(snapshot1.getSuccessCount()).isEqualTo(2); + assertThat(snapshot1.getFailureCount()).isEqualTo(1); + + // New snapshot should reflect new state + MetricsSnapshot snapshot2 = metrics.getSnapshot(); + assertThat(snapshot2.getSuccessCount()).isEqualTo(3); + } + + @Test + @DisplayName("should validate configuration") + void shouldValidateConfiguration() { + assertThatThrownBy(() -> new LockFreeSlidingWindowMetrics(60_000, 500)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Bucket duration must be at least"); + + assertThatThrownBy(() -> new LockFreeSlidingWindowMetrics(1_000, 2_000)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Window duration must be >= bucket duration"); + } + + @Test + @DisplayName("should handle high throughput") + void shouldHandleHighThroughput() { + int eventCount = 100_000; + + for (int i = 0; i < eventCount; i++) { + if (i % 2 == 0) { + metrics.recordSuccess(); + } else { + metrics.recordFailure(); + } + } + + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getTotalCount()).isEqualTo(eventCount); + assertThat(snapshot.getSuccessCount()).isEqualTo(eventCount / 2); + assertThat(snapshot.getFailureCount()).isEqualTo(eventCount / 2); + } + + @Test + @DisplayName("should be thread-safe") + void shouldBeThreadSafe() throws InterruptedException { + int threadCount = 10; + int eventsPerThread = 10_000; + + Thread[] threads = new Thread[threadCount]; + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + threads[i] = new Thread(() -> { + for (int j = 0; j < eventsPerThread; j++) { + if ((threadId + j) % 2 == 0) { + metrics.recordSuccess(); + } else { + metrics.recordFailure(); + } + } + }); + threads[i].start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + MetricsSnapshot snapshot = metrics.getSnapshot(); + assertThat(snapshot.getTotalCount()).isEqualTo(threadCount * eventsPerThread); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/MultiDbClientIntegrationTests.java b/src/test/java/io/lettuce/core/failover/MultiDbClientIntegrationTests.java new file mode 100644 index 0000000000..e5093675f9 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/MultiDbClientIntegrationTests.java @@ -0,0 +1,271 @@ +package io.lettuce.core.failover; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import io.lettuce.core.RedisURI; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; + +/** + * Integration tests for MultiDbClient. + *

+ * These tests were migrated from the original MultiDbClient design and adapted to the current API. Most tests require a running + * Redis instance as they test actual connection operations. + *

+ * API Differences from Original Tests: + *

    + *
  • Original: {@code multiDbClient.setActive(uri)} → Current: {@code connection.switchToDatabase(uri)}
  • + *
  • Original: {@code multiDbClient.getActive()} → Current: {@code connection.getCurrentEndpoint()}
  • + *
  • Original: {@code multiDbClient.getEndpoints()} returns {@code RedisEndpoints} → Current: returns + * {@code Iterable}
  • + *
  • Original: {@code multiDbClient.addEndpoint(uri)} → Current: {@code connection.addDatabase(uri, weight)}
  • + *
  • Original: {@code multiDbClient.removeEndpoint(uri)} → Current: {@code connection.removeDatabase(uri)}
  • + *
+ * + * @author Mark Paluch (original) + * @author Ivo Gaydazhiev (original) + * @author Ali Takavci (adapted) + */ +class MultiDbClientIntegrationTests { + + private MultiDbClient client; + + private StatefulRedisMultiDbConnection connection; + + @AfterEach + void tearDown() { + if (connection != null) { + connection.close(); + } + if (client != null) { + client.shutdown(); + } + } + + @Test + void shouldCreateWithMultipleEndpoints() { + RedisURI uri1 = RedisURI.create("redis://localhost:6379"); + RedisURI uri2 = RedisURI.create("redis://localhost:6380"); + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + + assertThat(client).isNotNull(); + } + + @Test + void shouldCreateWithSet() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + + assertThat(client).isNotNull(); + } + + @Test + void shouldRejectNullDatabaseConfigs() { + assertThatThrownBy(() -> MultiDbClient.create((List) null)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldRejectEmptyRedisURIs() { + assertThatThrownBy(() -> MultiDbClient.create(Arrays.asList())).isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldSetActiveEndpoint() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.setActive(uri2) + connection.switchToDatabase(uri2); + + // API CHANGE: Original used multiDbClient.getActive() + assertThat(connection.getCurrentEndpoint()).isEqualTo(uri2); + } + + @Test + void shouldRejectSettingNonExistentEndpointAsActive() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + RedisURI uri3 = RedisURI.create("redis://localhost:9999"); + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.setActive(uri3) + // Note: Current implementation throws UnsupportedOperationException for non-existent endpoints + assertThatThrownBy(() -> connection.switchToDatabase(uri3)).isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void shouldGetActiveEndpoint() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.getActive() + RedisURI active = connection.getCurrentEndpoint(); + assertThat(active).isIn(uri1, uri2); + } + + @Test + void shouldAddEndpoint() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.addEndpoint(uri2) + // Current API: connection.addDatabase(uri2, weight) + connection.addDatabase(uri2, 1.0f); + + // Verify it was added + assertThat(StreamSupport.stream(connection.getEndpoints().spliterator(), false).collect(Collectors.toList())) + .contains(uri2); + } + + @Test + void shouldNotAddDuplicateEndpoint() { + RedisURI uri1 = MultiDbTestSupport.URI1; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.addEndpoint(uri1) where uri1 already exists + // Current API: connection.addDatabase(uri1, weight) should throw exception + assertThatThrownBy(() -> connection.addDatabase(uri1, 1.0f)).isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldRemoveEndpoint() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + connection = client.connect(); + + // Make sure we're not on uri2 before removing it + connection.switchToDatabase(uri1); + + // API CHANGE: Original used multiDbClient.removeEndpoint(uri2) + // Current API: connection.removeDatabase(uri2) + connection.removeDatabase(uri2); + + // Verify it was removed + assertThat(StreamSupport.stream(connection.getEndpoints().spliterator(), false).collect(Collectors.toList())) + .doesNotContain(uri2); + } + + @Test + void shouldNotRemoveNonExistentEndpoint() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI nonExistent = RedisURI.create("redis://localhost:9999"); + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.removeEndpoint(nonExistent) + // Current API: connection.removeDatabase(nonExistent) should throw exception + assertThatThrownBy(() -> connection.removeDatabase(nonExistent)).isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldNotRemoveLastEndpoint() { + RedisURI uri1 = MultiDbTestSupport.URI1; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.removeEndpoint(uri1) where uri1 is the last endpoint + // Current API: connection.removeDatabase(uri1) should throw exception + assertThatThrownBy(() -> connection.removeDatabase(uri1)).isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void shouldSwitchActiveEndpointDynamically() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.setActive(uri1) + connection.switchToDatabase(uri1); + // API CHANGE: Original used multiDbClient.getActive() + assertThat(connection.getCurrentEndpoint()).isEqualTo(uri1); + + // API CHANGE: Original used multiDbClient.setActive(uri2) + connection.switchToDatabase(uri2); + // API CHANGE: Original used multiDbClient.getActive() + assertThat(connection.getCurrentEndpoint()).isEqualTo(uri2); + } + + @Test + void shouldAddEndpointAndSetItActive() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1)); + connection = client.connect(); + + // API CHANGE: Original used multiDbClient.addEndpoint(uri2) then multiDbClient.setActive(uri2) + // Current API: connection.addDatabase(uri2, weight) then connection.switchToDatabase(uri2) + connection.addDatabase(uri2, 1.0f); + connection.switchToDatabase(uri2); + + // Verify it's active + assertThat(connection.getCurrentEndpoint()).isEqualTo(uri2); + } + + @Test + void shouldNotRemoveActiveEndpoint() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + connection = client.connect(); + + // Make sure we're on uri1 + connection.switchToDatabase(uri1); + + // API CHANGE: Original used multiDbClient.removeEndpoint(uri1) where uri1 is active + // Current API: connection.removeDatabase(uri1) should throw exception + assertThatThrownBy(() -> connection.removeDatabase(uri1)).isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void shouldGetEndpointsObject() { + RedisURI uri1 = MultiDbTestSupport.URI1; + RedisURI uri2 = MultiDbTestSupport.URI2; + + client = MultiDbClient.create(MultiDbTestSupport.getDatabaseConfigs(uri1, uri2)); + connection = client.connect(); + + // API CHANGE: Original returned RedisEndpoints object with methods like getAll(), size(), etc. + // Current API returns Iterable + Iterable endpoints = connection.getEndpoints(); + assertThat(endpoints).isNotNull(); + + // Convert to list to check contents + List endpointList = StreamSupport.stream(endpoints.spliterator(), false).collect(Collectors.toList()); + assertThat(endpointList).hasSize(2); + assertThat(endpointList).containsExactlyInAnyOrder(uri1, uri2); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/MultiDbTestSupport.java b/src/test/java/io/lettuce/core/failover/MultiDbTestSupport.java new file mode 100644 index 0000000000..dc9db2ff95 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/MultiDbTestSupport.java @@ -0,0 +1,89 @@ +package io.lettuce.core.failover; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.junit.After; +import org.junit.jupiter.api.BeforeEach; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.TestSupport; +import io.lettuce.test.settings.TestSettings; + +/** + * @author Ali Takavci + * @since 7.1 + */ +public class MultiDbTestSupport extends TestSupport { + + protected final MultiDbClient multiDbClient; + + protected final RedisClient directClient1; + + protected final RedisClient directClient2; + + protected RedisURI uri1; + + protected RedisURI uri2; + + public MultiDbTestSupport(MultiDbClient multiDbClient) { + this.multiDbClient = multiDbClient; + Iterator endpoints = multiDbClient.getRedisURIs().iterator(); + this.uri1 = endpoints.next(); + this.uri2 = endpoints.next(); + + this.directClient1 = RedisClient.create(uri1); + this.directClient2 = RedisClient.create(uri2); + } + + @BeforeEach + void setUpMultiDb() { + directClient1.connect().sync().flushall(); + directClient2.connect().sync().flushall(); + } + + @After + public void tearDownMultiDb() { + directClient1.shutdown(); + directClient2.shutdown(); + } + + public static final RedisURI URI1 = RedisURI.create(TestSettings.host(), TestSettings.port()); + + public static final RedisURI URI2 = RedisURI.create(TestSettings.host(), TestSettings.port(1)); + + public static final DatabaseConfig DB1 = new DatabaseConfig(URI1, 1.0f); + + public static final DatabaseConfig DB2 = new DatabaseConfig(URI2, 0.5f); + + public static final List DBs = getDatabaseConfigs(); + + private static List getDatabaseConfigs() { + return Arrays.asList(DB1, DB2); + } + + public static List getDatabaseConfigs(RedisURI... URIs) { + float weight = 1.0f; + List endpoints = new ArrayList<>(); + for (RedisURI uri : URIs) { + endpoints.add(new DatabaseConfig(uri, weight)); + weight /= 2; + } + return endpoints; + } + + public static List getDatabaseConfigs(CircuitBreaker.CircuitBreakerConfig circuitBreakerConfig, + RedisURI... URIs) { + float weight = 1.0f; + List endpoints = new ArrayList<>(); + for (RedisURI uri : URIs) { + endpoints.add(new DatabaseConfig(uri, weight, null, circuitBreakerConfig)); + weight /= 2; + } + return endpoints; + } + +} diff --git a/src/test/java/io/lettuce/core/failover/RedisMultiDbClientConnectIntegrationTests.java b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientConnectIntegrationTests.java new file mode 100644 index 0000000000..e61f438f79 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientConnectIntegrationTests.java @@ -0,0 +1,117 @@ +/* + * Copyright 2011-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.failover; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static io.lettuce.core.codec.StringCodec.UTF8; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.ExecutionException; +import java.util.stream.StreamSupport; + +import javax.inject.Inject; + +import org.junit.After; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.test.TestFutures; +import io.lettuce.test.LettuceExtension; + +/** + * + * @author Ali Takavci + * @since 7.1 + */ +@ExtendWith(LettuceExtension.class) +@Tag(INTEGRATION_TEST) +class RedisMultiDbClientConnectIntegrationTests extends MultiDbTestSupport { + + @Inject + RedisMultiDbClientConnectIntegrationTests(MultiDbClient client) { + super(client); + } + + @BeforeEach + void setUp() { + directClient1.connect().sync().flushall(); + directClient2.connect().sync().flushall(); + } + + @After + void tearDown() { + directClient1.shutdown(); + directClient2.shutdown(); + } + + /* + * Standalone/Stateful + */ + @Test + void connectClientUri() { + + StatefulRedisConnection connection = multiDbClient.connect(); + assertThat(connection.getTimeout()).isEqualTo(RedisURI.DEFAULT_TIMEOUT_DURATION); + connection.close(); + } + + @Test + void connectCodecClientUri() { + StatefulRedisConnection connection = multiDbClient.connect(UTF8); + assertThat(connection.getTimeout()).isEqualTo(RedisURI.DEFAULT_TIMEOUT_DURATION); + connection.close(); + } + + @Test + void connectAndRunSimpleCommand() throws InterruptedException, ExecutionException { + StatefulRedisConnection connection = multiDbClient.connect(); + RedisFuture futureSet = connection.async().set("key1", "value1"); + TestFutures.awaitOrTimeout(futureSet); + RedisFuture futureGet = connection.async().get("key1"); + TestFutures.awaitOrTimeout(futureGet); + assertEquals("value1", futureGet.get()); + connection.close(); + } + + @Test + void connectAndRunAndSwitchAndRun() throws InterruptedException, ExecutionException { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + RedisFuture futureSet = connection.async().set("key1", "value1"); + TestFutures.awaitOrTimeout(futureSet); + RedisFuture futureGet = connection.async().get("key1"); + TestFutures.awaitOrTimeout(futureGet); + assertEquals(futureGet.get(), "value1"); + RedisURI other = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(connection.getCurrentEndpoint())).findFirst().get(); + connection.switchToDatabase(other); + RedisFuture futureGet2 = connection.async().get("key1"); + TestFutures.awaitOrTimeout(futureGet2); + assertEquals(null, futureGet2.get()); + connection.close(); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/RedisMultiDbClientFactoryUnitTests.java b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientFactoryUnitTests.java new file mode 100644 index 0000000000..70ff834084 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientFactoryUnitTests.java @@ -0,0 +1,60 @@ +package io.lettuce.core.failover; + +import static io.lettuce.TestTags.UNIT_TEST; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Collections; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import io.lettuce.test.resource.FastShutdown; +import io.lettuce.test.resource.TestClientResources; + +/** + * @author Ali Takavci + * @since 7.1 + */ +@Tag(UNIT_TEST) +class RedisMultiDbClientFactoryUnitTests { + + @Test + void plain() { + FastShutdown.shutdown(MultiDbClient.create(MultiDbTestSupport.DBs)); + } + + @Test + void withStringUri() { + FastShutdown.shutdown(MultiDbClient.create(MultiDbTestSupport.DBs)); + } + + @Test + void withUri() { + FastShutdown.shutdown(MultiDbClient.create(MultiDbTestSupport.DBs)); + } + + @Test + void withUriNull() { + assertThatThrownBy(() -> MultiDbClient.create(Collections.singletonList((DatabaseConfig) null))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void clientResourcesWithUri() { + FastShutdown + .shutdown(MultiDbClient.create(TestClientResources.get(), Collections.singletonList(MultiDbTestSupport.DB1))); + } + + @Test + void clientResourcesWithUriNull() { + assertThatThrownBy( + () -> MultiDbClient.create(TestClientResources.get(), Collections.singletonList((DatabaseConfig) null))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void clientResourcesNullWithUri() { + assertThatThrownBy(() -> MultiDbClient.create(null, Collections.singletonList(MultiDbTestSupport.DB1))) + .isInstanceOf(IllegalArgumentException.class); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/RedisMultiDbClientIntegrationTests.java b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientIntegrationTests.java new file mode 100644 index 0000000000..ee135d30e1 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientIntegrationTests.java @@ -0,0 +1,281 @@ +package io.lettuce.core.failover; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisChannelHandler; +import io.lettuce.core.RedisConnectionStateListener; +import io.lettuce.core.TestSupport; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.event.command.CommandFailedEvent; +import io.lettuce.core.event.command.CommandListener; +import io.lettuce.core.event.command.CommandStartedEvent; +import io.lettuce.core.event.command.CommandSucceededEvent; +import io.lettuce.core.protocol.CommandType; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.DefaultClientResources; +import io.lettuce.core.resource.DefaultEventLoopGroupProvider; +import io.lettuce.test.TestFutures; +import io.lettuce.test.Wait; +import io.lettuce.test.resource.FastShutdown; +import io.lettuce.test.resource.TestClientResources; +import io.netty.util.concurrent.EventExecutorGroup; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for {@link MultiDbClient}. + * + * @author Ali Takavci + * @since 7.1 + */ +@Tag(INTEGRATION_TEST) +class RedisMultiDbClientIntegrationTests extends TestSupport { + + private final ClientResources clientResources = TestClientResources.get(); + + @Test + void shouldNotifyConnectionListener() { + + TestConnectionListener listener = new TestConnectionListener(); + + MultiDbClient client = MultiDbClient.create(clientResources, MultiDbTestSupport.DBs); + + client.addListener(listener); + + assertThat(listener.onConnected).isNull(); + assertThat(listener.onDisconnected).isNull(); + assertThat(listener.onException).isNull(); + + StatefulRedisConnection connection = client.connect(); + + Wait.untilTrue(() -> listener.onConnected != null).waitOrTimeout(); + assertThat(listener.onConnectedSocketAddress).isNotNull(); + + // assertThat(listener.onConnected).isEqualTo(connection); + assertThat(listener.onDisconnected).isNull(); + + connection.sync().set(key, value); + connection.close(); + + Wait.untilTrue(() -> listener.onDisconnected != null).waitOrTimeout(); + + // assertThat(listener.onConnected).isEqualTo(connection); + // assertThat(listener.onDisconnected).isEqualTo(connection); + + FastShutdown.shutdown(client); + } + + @Test + void shouldNotNotifyListenerAfterRemoval() { + + final TestConnectionListener removedListener = new TestConnectionListener(); + final TestConnectionListener retainedListener = new TestConnectionListener(); + + MultiDbClient client = MultiDbClient.create(clientResources, MultiDbTestSupport.DBs); + client.addListener(removedListener); + client.addListener(retainedListener); + client.removeListener(removedListener); + + // that's the sut call + client.connect().close(); + + Wait.untilTrue(() -> retainedListener.onConnected != null).waitOrTimeout(); + + assertThat(retainedListener.onConnected).isNotNull(); + + assertThat(removedListener.onConnected).isNull(); + assertThat(removedListener.onConnectedSocketAddress).isNull(); + assertThat(removedListener.onDisconnected).isNull(); + assertThat(removedListener.onException).isNull(); + + FastShutdown.shutdown(client); + } + + @Test + void reuseClientConnections() throws Exception { + + // given + DefaultClientResources clientResources = DefaultClientResources.create(); + Map, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources); + + MultiDbClient redisFailoverClient1 = newClient(clientResources); + MultiDbClient redisFailoverClient2 = newClient(clientResources); + connectAndClose(redisFailoverClient1); + connectAndClose(redisFailoverClient2); + + // when + EventExecutorGroup executor = eventLoopGroups.values().iterator().next(); + redisFailoverClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); + + // then + connectAndClose(redisFailoverClient2); + + TestFutures.awaitOrTimeout(clientResources.shutdown(0, 0, TimeUnit.MILLISECONDS)); + + assertThat(eventLoopGroups).isEmpty(); + assertThat(executor.isShuttingDown()).isTrue(); + assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue(); + } + + @Test + void shouldPropagateCommandTimeoutToCommandListener() throws InterruptedException { + + TestCommandListener commandListener = new TestCommandListener(); + + MultiDbClient client = MultiDbClient.create(clientResources, MultiDbTestSupport.DBs); + client.addListener(commandListener); + ClientOptions options = ClientOptions.builder().timeoutOptions(TimeoutOptions.enabled()).build(); + // HACK : check how to access setOptions + // client.setOptions(options); + + StatefulRedisConnection connection = client.connect(); + connection.setTimeout(Duration.ofMillis(1)); + + assertThat(connection.async().blpop(100, key).await(100, TimeUnit.SECONDS)).isTrue(); + + assertThat(commandListener.started).hasSize(1); + assertThat(commandListener.succeeded).isEmpty(); + + Wait.untilTrue(() -> commandListener.failed.size() == 1); + Wait.untilTrue(() -> commandListener.failed.stream() + .anyMatch(command -> command.getCommand().getType().equals(CommandType.BLPOP))); + + FastShutdown.shutdown(client); + } + + @Test + void reuseClientConnectionsShutdownTwoClients() throws Exception { + + // given + DefaultClientResources clientResources = DefaultClientResources.create(); + Map, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources); + + MultiDbClient redisFailoverClient1 = newClient(clientResources); + MultiDbClient redisFailoverClient2 = newClient(clientResources); + connectAndClose(redisFailoverClient1); + connectAndClose(redisFailoverClient2); + + // when + EventExecutorGroup executor = eventLoopGroups.values().iterator().next(); + + redisFailoverClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); + assertThat(executor.isShutdown()).isFalse(); + connectAndClose(redisFailoverClient2); + redisFailoverClient2.shutdown(0, 0, TimeUnit.MILLISECONDS); + + // then + assertThat(eventLoopGroups).isEmpty(); + assertThat(executor.isShutdown()).isTrue(); + assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isFalse(); + + // cleanup + TestFutures.awaitOrTimeout(clientResources.shutdown(0, 0, TimeUnit.MILLISECONDS)); + assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue(); + } + + @Test + void managedClientResources() throws Exception { + + // given + MultiDbClient redisFailoverClient1 = MultiDbClient.create(MultiDbTestSupport.DBs); + ClientResources clientResources = redisFailoverClient1.getResources(); + Map, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources); + connectAndClose(redisFailoverClient1); + + // when + EventExecutorGroup executor = eventLoopGroups.values().iterator().next(); + + redisFailoverClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); + + // then + assertThat(eventLoopGroups).isEmpty(); + assertThat(executor.isShuttingDown()).isTrue(); + assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue(); + } + + private void connectAndClose(MultiDbClient client) { + client.connect().close(); + } + + private MultiDbClient newClient(DefaultClientResources clientResources) { + return MultiDbClient.create(clientResources, MultiDbTestSupport.DBs); + } + + private Map, EventExecutorGroup> getExecutors(ClientResources clientResources) + throws Exception { + Field eventLoopGroupsField = DefaultEventLoopGroupProvider.class.getDeclaredField("eventLoopGroups"); + eventLoopGroupsField.setAccessible(true); + return (Map) eventLoopGroupsField.get(clientResources.eventLoopGroupProvider()); + } + + private class TestConnectionListener implements RedisConnectionStateListener { + + volatile SocketAddress onConnectedSocketAddress; + + volatile RedisChannelHandler onConnected; + + volatile RedisChannelHandler onDisconnected; + + volatile RedisChannelHandler onException; + + @Override + public void onRedisConnected(RedisChannelHandler connection, SocketAddress socketAddress) { + onConnected = connection; + onConnectedSocketAddress = socketAddress; + } + + @Override + public void onRedisDisconnected(RedisChannelHandler connection) { + onDisconnected = connection; + } + + @Override + public void onRedisExceptionCaught(RedisChannelHandler connection, Throwable cause) { + onException = connection; + } + + } + + static class TestCommandListener implements CommandListener { + + final List started = new ArrayList<>(); + + final List succeeded = new ArrayList<>(); + + final List failed = new ArrayList<>(); + + @Override + public void commandStarted(CommandStartedEvent event) { + synchronized (started) { + started.add(event); + } + } + + @Override + public void commandSucceeded(CommandSucceededEvent event) { + synchronized (succeeded) { + succeeded.add(event); + } + } + + @Override + public void commandFailed(CommandFailedEvent event) { + synchronized (failed) { + failed.add(event); + } + } + + } + +} diff --git a/src/test/java/io/lettuce/core/failover/RedisMultiDbClientUnitTests.java b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientUnitTests.java new file mode 100644 index 0000000000..bd54d5d596 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/RedisMultiDbClientUnitTests.java @@ -0,0 +1,101 @@ +package io.lettuce.core.failover; + +import static io.lettuce.TestTags.UNIT_TEST; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.Closeable; +import java.lang.reflect.Field; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.internal.AsyncCloseable; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.test.ReflectionTestUtils; +import io.netty.util.concurrent.ImmediateEventExecutor; + +/** + * Unit tests for {@link RedisClient}. + * + * @author Ali Takavci + * @since 7.1 + */ +@SuppressWarnings("unchecked") +@ExtendWith(MockitoExtension.class) +@Tag(UNIT_TEST) +class RedisMultiDbClientUnitTests { + + @Mock + ClientResources clientResources; + + @Mock(extraInterfaces = Closeable.class) + AsyncCloseable asyncCloseable; + + @Test + void shutdownShouldDeferResourcesShutdown() throws Exception { + + when(clientResources.eventExecutorGroup()).thenReturn(ImmediateEventExecutor.INSTANCE); + + CompletableFuture completableFuture = new CompletableFuture<>(); + when(asyncCloseable.closeAsync()).thenReturn(completableFuture); + + MultiDbClient redisClient = MultiDbClient.create(clientResources, + MultiDbTestSupport.getDatabaseConfigs(RedisURI.create("redis://foo"), RedisURI.create("redis://bar"))); + + Field field = AbstractRedisClient.class.getDeclaredField("sharedResources"); + field.setAccessible(true); + field.set(redisClient, false); + + Set closeableResources = (Set) ReflectionTestUtils.getField(redisClient, "closeableResources"); + closeableResources.add(asyncCloseable); + + CompletableFuture future = redisClient.shutdownAsync(); + + verify(asyncCloseable).closeAsync(); + verify(clientResources, never()).shutdown(anyLong(), anyLong(), any()); + assertThat(future).isNotDone(); + } + + @Test + void shutdownShutsDownResourcesAfterChannels() throws Exception { + + when(clientResources.eventExecutorGroup()).thenReturn(ImmediateEventExecutor.INSTANCE); + + CompletableFuture completableFuture = new CompletableFuture<>(); + when(asyncCloseable.closeAsync()).thenReturn(completableFuture); + + MultiDbClient redisClient = MultiDbClient.create(clientResources, + MultiDbTestSupport.getDatabaseConfigs(RedisURI.create("redis://foo"), RedisURI.create("redis://bar"))); + + Field field = AbstractRedisClient.class.getDeclaredField("sharedResources"); + field.setAccessible(true); + field.set(redisClient, false); + + Set closeableResources = (Set) ReflectionTestUtils.getField(redisClient, "closeableResources"); + closeableResources.add(asyncCloseable); + + CompletableFuture future = redisClient.shutdownAsync(); + + verify(asyncCloseable).closeAsync(); + verify(clientResources, never()).shutdown(anyLong(), anyLong(), any()); + + completableFuture.complete(null); + + verify(clientResources).shutdown(anyLong(), anyLong(), any()); + assertThat(future).isDone(); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/SlidingWindowMetricsPerformanceTests.java b/src/test/java/io/lettuce/core/failover/SlidingWindowMetricsPerformanceTests.java new file mode 100644 index 0000000000..544e7ff6a0 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/SlidingWindowMetricsPerformanceTests.java @@ -0,0 +1,156 @@ +package io.lettuce.core.failover; + +import java.lang.management.ManagementFactory; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import io.lettuce.core.failover.metrics.LockFreeSlidingWindowMetrics; + +/** + * Performance tests for lock-free sliding window metrics. + * + * @author Ali Takavci + * @since 7.1 + */ +@Tag("performance") +@DisplayName("Sliding Window Metrics Performance") +class SlidingWindowMetricsPerformanceTests { + + @Test + @DisplayName("should record 1M events with minimal overhead") + void shouldRecord1MEventsWithMinimalOverhead() { + LockFreeSlidingWindowMetrics metrics = new LockFreeSlidingWindowMetrics(); + int eventCount = 1_000_000; + + long startTime = System.nanoTime(); + + for (int i = 0; i < eventCount; i++) { + if (i % 2 == 0) { + metrics.recordSuccess(); + } else { + metrics.recordFailure(); + } + } + + long endTime = System.nanoTime(); + long durationNs = endTime - startTime; + long durationMs = durationNs / 1_000_000; + double opsPerSec = (eventCount * 1_000_000_000.0) / durationNs; + + System.out.println("Recorded " + eventCount + " events in " + durationMs + "ms"); + System.out.println("Throughput: " + String.format("%.2f", opsPerSec) + " ops/sec"); + System.out.println("Average latency: " + String.format("%.2f", durationNs / (double) eventCount) + " ns/op"); + } + + @Test + @DisplayName("should query metrics with minimal overhead") + void shouldQueryMetricsWithMinimalOverhead() { + LockFreeSlidingWindowMetrics metrics = new LockFreeSlidingWindowMetrics(); + + // Record some events + for (int i = 0; i < 10_000; i++) { + metrics.recordSuccess(); + } + + int queryCount = 1_000_000; + long startTime = System.nanoTime(); + + for (int i = 0; i < queryCount; i++) { + metrics.getSnapshot(); + } + + long endTime = System.nanoTime(); + long durationNs = endTime - startTime; + long durationMs = durationNs / 1_000_000; + double opsPerSec = (queryCount * 1_000_000_000.0) / durationNs; + + System.out.println("Queried metrics " + queryCount + " times in " + durationMs + "ms"); + System.out.println("Throughput: " + String.format("%.2f", opsPerSec) + " ops/sec"); + System.out.println("Average latency: " + String.format("%.2f", durationNs / (double) queryCount) + " ns/op"); + } + + @Test + @DisplayName("should handle concurrent recording and querying") + void shouldHandleConcurrentRecordingAndQuerying() throws InterruptedException { + LockFreeSlidingWindowMetrics metrics = new LockFreeSlidingWindowMetrics(); + int threadCount = 8; + int operationsPerThread = 100_000; + + Thread[] threads = new Thread[threadCount]; + long startTime = System.nanoTime(); + + for (int i = 0; i < threadCount; i++) { + threads[i] = new Thread(() -> { + for (int j = 0; j < operationsPerThread; j++) { + if (j % 3 == 0) { + metrics.recordSuccess(); + } else if (j % 3 == 1) { + metrics.recordFailure(); + } else { + metrics.getSnapshot(); + } + } + }); + threads[i].start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + long endTime = System.nanoTime(); + long durationNs = endTime - startTime; + long durationMs = durationNs / 1_000_000; + int totalOps = threadCount * operationsPerThread; + double opsPerSec = (totalOps * 1_000_000_000.0) / durationNs; + + System.out.println("Concurrent operations: " + totalOps + " in " + durationMs + "ms"); + System.out.println("Throughput: " + String.format("%.2f", opsPerSec) + " ops/sec"); + System.out.println("Average latency: " + String.format("%.2f", durationNs / (double) totalOps) + " ns/op"); + } + + @Test + @DisplayName("should measure memory overhead") + void shouldMeasureMemoryOverhead() { + Runtime runtime = Runtime.getRuntime(); + long beforeMemory = runtime.totalMemory() - runtime.freeMemory(); + + LockFreeSlidingWindowMetrics metrics = new LockFreeSlidingWindowMetrics(); + + long afterMemory = runtime.totalMemory() - runtime.freeMemory(); + long memoryUsed = afterMemory - beforeMemory; + + System.out.println("Memory overhead per metrics instance: " + memoryUsed + " bytes"); + System.out.println("Expected: ~2.4KB (60 buckets × 40 bytes)"); + } + + @Test + @DisplayName("should measure GC impact") + void shouldMeasureGCImpact() { + System.gc(); + long gcCountBefore = ManagementFactory.getGarbageCollectorMXBeans().stream().mapToLong(b -> b.getCollectionCount()) + .sum(); + + LockFreeSlidingWindowMetrics metrics = new LockFreeSlidingWindowMetrics(); + + // Record many events + for (int i = 0; i < 10_000_000; i++) { + if (i % 2 == 0) { + metrics.recordSuccess(); + } else { + metrics.recordFailure(); + } + } + + System.gc(); + long gcCountAfter = ManagementFactory.getGarbageCollectorMXBeans().stream().mapToLong(b -> b.getCollectionCount()) + .sum(); + + long gcCollections = gcCountAfter - gcCountBefore; + System.out.println("GC collections during 10M operations: " + gcCollections); + System.out.println("Expected: Minimal (lock-free, no allocations)"); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/StatefulMultiDbConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/failover/StatefulMultiDbConnectionIntegrationTests.java new file mode 100644 index 0000000000..9c86e6ac72 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/StatefulMultiDbConnectionIntegrationTests.java @@ -0,0 +1,454 @@ +/* + * Copyright 2011-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.failover; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import javax.inject.Inject; + +import org.junit.After; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; +import io.lettuce.test.TestFutures; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.settings.TestSettings; + +/** + * Integration tests for {@link StatefulRedisMultiDbConnection} with basic commands and database switching. + * + * @author Ali Takavci + * @since 7.1 + */ +@ExtendWith(LettuceExtension.class) +@Tag(INTEGRATION_TEST) +class StatefulMultiDbConnectionIntegrationTests extends MultiDbTestSupport { + + @Inject + StatefulMultiDbConnectionIntegrationTests(MultiDbClient client) { + super(client); + } + + @BeforeEach + void setUp() { + directClient1.connect().sync().flushall(); + directClient2.connect().sync().flushall(); + } + + @After + void tearDownAfter() { + directClient1.shutdown(); + directClient2.shutdown(); + } + + // ============ Basic Connection Tests ============ + + @Test + void shouldConnectToMultipleEndpoints() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + assertNotNull(connection); + assertThat(connection.getEndpoints()).isNotNull(); + connection.close(); + } + + @Test + void shouldGetCurrentEndpoint() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + RedisURI currentEndpoint = connection.getCurrentEndpoint(); + assertNotNull(currentEndpoint); + assertThat(currentEndpoint).isIn(connection.getEndpoints()); + connection.close(); + } + + @Test + void shouldListAllEndpoints() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + Iterable endpoints = connection.getEndpoints(); + assertThat(endpoints).isNotNull(); + assertThat(StreamSupport.stream(endpoints.spliterator(), false).count()).isGreaterThanOrEqualTo(2); + connection.close(); + } + + // ============ Basic Command Tests (Sync) ============ + + @Test + void shouldSetAndGetValueSync() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + connection.sync().set("testKey", "testValue"); + String value = connection.sync().get("testKey"); + assertEquals("testValue", value); + connection.close(); + } + + @Test + void shouldDeleteKeySync() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + connection.sync().set("deleteKey", "value"); + Long deleted = connection.sync().del("deleteKey"); + assertEquals(1L, deleted); + String value = connection.sync().get("deleteKey"); + assertNull(value); + connection.close(); + } + + @Test + void shouldIncrementValueSync() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + connection.sync().set("counter", "10"); + Long result = connection.sync().incr("counter"); + assertEquals(11L, result); + connection.close(); + } + + @Test + void shouldAppendToStringSync() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + connection.sync().set("mykey", "Hello"); + Long length = connection.sync().append("mykey", " World"); + assertEquals(11L, length); + String value = connection.sync().get("mykey"); + assertEquals("Hello World", value); + connection.close(); + } + + // ============ Basic Command Tests (Async) ============ + + @Test + void shouldSetAndGetValueAsync() throws Exception { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + RedisFuture setFuture = connection.async().set("asyncKey", "asyncValue"); + TestFutures.awaitOrTimeout(setFuture); + RedisFuture getFuture = connection.async().get("asyncKey"); + TestFutures.awaitOrTimeout(getFuture); + assertEquals("asyncValue", getFuture.get()); + connection.close(); + } + + @Test + void shouldDeleteKeyAsync() throws Exception { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + connection.async().set("asyncDeleteKey", "value"); + RedisFuture deleteFuture = connection.async().del("asyncDeleteKey"); + TestFutures.awaitOrTimeout(deleteFuture); + assertEquals(1L, deleteFuture.get()); + connection.close(); + } + + @Test + void shouldIncrementValueAsync() throws Exception { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + connection.async().set("asyncCounter", "5"); + RedisFuture incrFuture = connection.async().incr("asyncCounter"); + TestFutures.awaitOrTimeout(incrFuture); + assertEquals(6L, incrFuture.get()); + connection.close(); + } + + // ============ Database Switching Tests ============ + + @Test + void shouldSwitchBetweenDatabases() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Set value in first database + connection.sync().set("switchKey", "value1"); + assertEquals("value1", connection.sync().get("switchKey")); + + // Switch to second database + RedisURI other = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(connection.getCurrentEndpoint())).findFirst().get(); + connection.switchToDatabase(other); + + // Value should not exist in second database + assertNull(connection.sync().get("switchKey")); + + // Set different value in second database + connection.sync().set("switchKey", "value2"); + assertEquals("value2", connection.sync().get("switchKey")); + + connection.close(); + } + + @Test + void shouldMaintainDataAfterSwitch() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Set value in first database + connection.sync().set("persistKey", "persistValue"); + RedisURI firstDb = connection.getCurrentEndpoint(); + + // Switch to second database + RedisURI secondDb = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(firstDb)).findFirst().get(); + connection.switchToDatabase(secondDb); + + // Switch back to first database + connection.switchToDatabase(firstDb); + + // Original value should still exist + assertEquals("persistValue", connection.sync().get("persistKey")); + + connection.close(); + } + + @Test + void shouldSwitchAndExecuteCommandsAsync() throws Exception { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Set value in first database + RedisFuture setFuture1 = connection.async().set("asyncSwitchKey", "asyncValue1"); + TestFutures.awaitOrTimeout(setFuture1); + + // Switch to second database + RedisURI other = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(connection.getCurrentEndpoint())).findFirst().get(); + connection.switchToDatabase(other); + + // Set different value in second database + RedisFuture setFuture2 = connection.async().set("asyncSwitchKey", "asyncValue2"); + TestFutures.awaitOrTimeout(setFuture2); + + // Get value from second database + RedisFuture getFuture = connection.async().get("asyncSwitchKey"); + TestFutures.awaitOrTimeout(getFuture); + assertEquals("asyncValue2", getFuture.get()); + + connection.close(); + } + + @Test + void shouldHandleMultipleSwitches() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + RedisURI firstDb = connection.getCurrentEndpoint(); + RedisURI secondDb = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(firstDb)).findFirst().get(); + + // First database + connection.sync().set("key", "value1"); + assertEquals("value1", connection.sync().get("key")); + + // Switch to second + connection.switchToDatabase(secondDb); + connection.sync().set("key", "value2"); + assertEquals("value2", connection.sync().get("key")); + + // Switch back to first + connection.switchToDatabase(firstDb); + assertEquals("value1", connection.sync().get("key")); + + // Switch to second again + connection.switchToDatabase(secondDb); + assertEquals("value2", connection.sync().get("key")); + + connection.close(); + } + + // ============ List Operations Tests ============ + + @Test + void shouldPushAndPopFromList() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + connection.sync().rpush("mylist", "a", "b", "c"); + String value = connection.sync().lpop("mylist"); + assertEquals("a", value); + connection.close(); + } + + @Test + void shouldGetListLength() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + connection.sync().rpush("listlen", "one", "two", "three"); + Long length = connection.sync().llen("listlen"); + assertEquals(3L, length); + connection.close(); + } + + // ========== Dynamic Database Management Tests ========== + + @Test + void shouldAddDatabase() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Get initial endpoint count + List initialEndpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .collect(Collectors.toList()); + int initialCount = initialEndpoints.size(); + + // Add a new database (using port(2) which should be available in test environment) + RedisURI newUri = RedisURI.Builder.redis(TestSettings.host(), TestSettings.port(5)) + .withPassword(TestSettings.password()).build(); + connection.addDatabase(newUri, 1.0f); + + // Verify it was added + List updatedEndpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .collect(Collectors.toList()); + + assertThat(updatedEndpoints).hasSize(initialCount + 1); + assertThat(updatedEndpoints).contains(newUri); + + connection.close(); + } + + @Test + void shouldRejectAddingDuplicateDatabase() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Get an existing endpoint + RedisURI existingUri = connection.getCurrentEndpoint(); + + // Try to add it again - should fail + assertThatThrownBy(() -> connection.addDatabase(existingUri, 1.0f)).isInstanceOf(IllegalArgumentException.class); + + connection.close(); + } + + @Test + void shouldRemoveDatabase() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Add a new database + RedisURI newUri = RedisURI.Builder.redis(TestSettings.host(), TestSettings.port(5)) + .withPassword(TestSettings.password()).build(); + connection.addDatabase(newUri, 1.0f); + + // Verify it was added + List endpointsAfterAdd = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .collect(Collectors.toList()); + assertThat(endpointsAfterAdd).contains(newUri); + + // Remove it + connection.removeDatabase(newUri); + + // Verify it was removed + List endpointsAfterRemove = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .collect(Collectors.toList()); + assertThat(endpointsAfterRemove).doesNotContain(newUri); + + connection.close(); + } + + @Test + void shouldRejectRemovingNonExistentDatabase() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Try to remove a database that doesn't exist + RedisURI nonExistentUri = RedisURI.create("redis://localhost:9999"); + + assertThatThrownBy(() -> connection.removeDatabase(nonExistentUri)).isInstanceOf(IllegalArgumentException.class); + + connection.close(); + } + + @Test + void shouldRejectRemovingLastDatabase() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Get all endpoints + List endpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .collect(Collectors.toList()); + + // If we have more than one endpoint, remove all but one + if (endpoints.size() > 1) { + for (int i = 0; i < endpoints.size() - 1; i++) { + RedisURI endpoint = endpoints.get(i); + // Switch away from this endpoint before removing it + if (endpoint.equals(connection.getCurrentEndpoint())) { + connection.switchToDatabase(endpoints.get(endpoints.size() - 1)); + } + connection.removeDatabase(endpoint); + } + } + + // Now we should have exactly one endpoint left + List remainingEndpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false) + .collect(Collectors.toList()); + assertThat(remainingEndpoints).hasSize(1); + + // Try to remove the last one - should fail + RedisURI lastEndpoint = remainingEndpoints.get(0); + + assertThatThrownBy(() -> connection.removeDatabase(lastEndpoint)).isInstanceOf(UnsupportedOperationException.class); + + connection.close(); + } + + @Test + void shouldAddDatabaseAndSwitchToIt() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Add a new database + RedisURI newUri = RedisURI.Builder.redis(TestSettings.host(), TestSettings.port(5)) + .withPassword(TestSettings.password()).build(); + connection.addDatabase(newUri, 1.0f); + + // Switch to it + connection.switchToDatabase(newUri); + + // Verify it's now active + assertThat(connection.getCurrentEndpoint()).isEqualTo(newUri); + + // Verify we can execute commands on it + connection.sync().set("test-key", "test-value"); + assertThat(connection.sync().get("test-key")).isEqualTo("test-value"); + + connection.close(); + } + + @Test + void shouldRejectRemovingActiveDatabase() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Get the current active endpoint + RedisURI activeEndpoint = connection.getCurrentEndpoint(); + + // Try to remove it - should fail + assertThatThrownBy(() -> connection.removeDatabase(activeEndpoint)).isInstanceOf(UnsupportedOperationException.class); + + connection.close(); + } + + @Test + void shouldRejectSwitchingToNonExistentEndpoint() { + StatefulRedisMultiDbConnection connection = multiDbClient.connect(); + + // Create a URI that's not in the configured endpoints + RedisURI nonExistentUri = RedisURI.create("redis://localhost:9999"); + + // Note: Current implementation throws UnsupportedOperationException for non-existent endpoints + assertThatThrownBy(() -> connection.switchToDatabase(nonExistentUri)).isInstanceOf(UnsupportedOperationException.class); + + connection.close(); + } + +} diff --git a/src/test/java/io/lettuce/core/failover/StatefulMultiDbPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/failover/StatefulMultiDbPubSubConnectionIntegrationTests.java new file mode 100644 index 0000000000..015c391878 --- /dev/null +++ b/src/test/java/io/lettuce/core/failover/StatefulMultiDbPubSubConnectionIntegrationTests.java @@ -0,0 +1,410 @@ +/* + * Copyright 2011-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.failover; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.StreamSupport; + +import javax.inject.Inject; + +import org.junit.After; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.failover.api.StatefulRedisMultiDbPubSubConnection; +import io.lettuce.core.internal.LettuceFactories; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.Wait; + +/** + * Integration tests for {@link StatefulRedisMultiDbPubSubConnection} with pubsub functionality and database switching. + * + * @author Ali Takavci + * @since 7.1 + */ +@ExtendWith(LettuceExtension.class) +@Tag(INTEGRATION_TEST) +class StatefulMultiDbPubSubConnectionIntegrationTests extends MultiDbTestSupport { + + @Inject + StatefulMultiDbPubSubConnectionIntegrationTests(MultiDbClient client) { + super(client); + } + + @BeforeEach + void setUp() { + directClient1.connect().sync().flushall(); + directClient2.connect().sync().flushall(); + } + + @After + void tearDown() { + directClient1.shutdown(); + directClient2.shutdown(); + } + + // ============ Basic PubSub Connection Tests ============ + + @Test + void shouldConnectPubSubToMultipleEndpoints() { + StatefulRedisMultiDbPubSubConnection connection = multiDbClient.connectPubSub(); + assertNotNull(connection); + assertThat(connection.getEndpoints()).isNotNull(); + connection.close(); + } + + @Test + void shouldGetCurrentEndpointForPubSub() { + StatefulRedisMultiDbPubSubConnection connection = multiDbClient.connectPubSub(); + RedisURI currentEndpoint = connection.getCurrentEndpoint(); + assertNotNull(currentEndpoint); + assertThat(currentEndpoint).isIn(connection.getEndpoints()); + connection.close(); + } + + // ============ Basic PubSub Functionality Tests ============ + + @Test + void shouldSubscribeToChannel() throws Exception { + StatefulRedisMultiDbPubSubConnection pubsub = multiDbClient.connectPubSub(); + BlockingQueue channels = LettuceFactories.newBlockingQueue(); + + pubsub.addListener(new RedisPubSubAdapter() { + + @Override + public void subscribed(String channel, long count) { + channels.add(channel); + } + + }); + + pubsub.sync().subscribe("testchannel"); + String channel = channels.take(); + assertEquals("testchannel", channel); + pubsub.close(); + } + + @Test + void shouldPublishAndReceiveMessage() throws Exception { + StatefulRedisMultiDbPubSubConnection pubsub = multiDbClient.connectPubSub(); + BlockingQueue messages = LettuceFactories.newBlockingQueue(); + + pubsub.addListener(new RedisPubSubAdapter() { + + @Override + public void message(String channel, String message) { + messages.add(message); + } + + }); + + pubsub.sync().subscribe("msgchannel"); + + // Publish message from another connection + StatefulRedisMultiDbPubSubConnection publisher = multiDbClient.connectPubSub(); + publisher.sync().publish("msgchannel", "Hello World"); + + String message = messages.take(); + assertEquals("Hello World", message); + + pubsub.close(); + publisher.close(); + } + + // ============ PubSub with Database Switching Tests ============ + + @Test + void shouldMaintainSubscriptionAfterDatabaseSwitch() throws Exception { + StatefulRedisMultiDbPubSubConnection pubsub = multiDbClient.connectPubSub(); + BlockingQueue channels = LettuceFactories.newBlockingQueue(); + + pubsub.addListener(new RedisPubSubAdapter() { + + @Override + public void subscribed(String channel, long count) { + channels.add(channel); + } + + }); + + // Subscribe on first database + pubsub.sync().subscribe("channel1"); + String channel = channels.take(); + assertEquals("channel1", channel); + + // Switch to second database + RedisURI other = StreamSupport.stream(pubsub.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(pubsub.getCurrentEndpoint())).findFirst().get(); + pubsub.switchToDatabase(other); + + // Listener should still be active + assertThat(pubsub).isNotNull(); + + pubsub.close(); + } + + @Test + void shouldReceiveMessagesAfterDatabaseSwitch() throws Exception { + StatefulRedisMultiDbPubSubConnection pubsub = multiDbClient.connectPubSub(); + BlockingQueue messages = LettuceFactories.newBlockingQueue(); + + pubsub.addListener(new RedisPubSubAdapter() { + + @Override + public void message(String channel, String message) { + messages.add(message); + } + + }); + + // Subscribe on first database + pubsub.sync().subscribe("switchchannel"); + + // Switch to second database - subscriptions are automatically re-subscribed + RedisURI other = StreamSupport.stream(pubsub.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(pubsub.getCurrentEndpoint())).findFirst().get(); + pubsub.switchToDatabase(other); + + // Publish from another connection on second database + StatefulRedisMultiDbPubSubConnection publisher = multiDbClient.connectPubSub(); + publisher.switchToDatabase(other); + publisher.sync().publish("switchchannel", "Message after switch"); + + String message = messages.poll(1, TimeUnit.SECONDS); + assertEquals("Message after switch", message); + + pubsub.close(); + publisher.close(); + } + + @Test + void shouldHandleMultipleDatabaseSwitchesWithPubSub() throws Exception { + StatefulRedisMultiDbPubSubConnection pubsub = multiDbClient.connectPubSub(); + BlockingQueue messages = LettuceFactories.newBlockingQueue(); + + pubsub.addListener(new RedisPubSubAdapter() { + + @Override + public void message(String channel, String message) { + messages.add(message); + } + + }); + + RedisURI firstDb = pubsub.getCurrentEndpoint(); + RedisURI secondDb = StreamSupport.stream(pubsub.getEndpoints().spliterator(), false).filter(uri -> !uri.equals(firstDb)) + .findFirst().get(); + + // Subscribe on first database + pubsub.sync().subscribe("multichannel"); + + // Switch to second database + pubsub.switchToDatabase(secondDb); + pubsub.sync().subscribe("multichannel"); + + // Switch back to first database + pubsub.switchToDatabase(firstDb); + + // Publish on first database + StatefulRedisMultiDbPubSubConnection publisher = multiDbClient.connectPubSub(); + publisher.sync().publish("multichannel", "Message from DB1"); + + String message = messages.take(); + assertEquals("Message from DB1", message); + + pubsub.close(); + publisher.close(); + } + + @Test + void shouldHandleListenerAdditionAfterSwitch() throws Exception { + StatefulRedisMultiDbPubSubConnection pubsub = multiDbClient.connectPubSub(); + BlockingQueue messages = LettuceFactories.newBlockingQueue(); + + // Subscribe on first database + pubsub.sync().subscribe("listenertest"); + + // Switch to second database + RedisURI other = StreamSupport.stream(pubsub.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(pubsub.getCurrentEndpoint())).findFirst().get(); + pubsub.switchToDatabase(other); + + // Add listener after switch + pubsub.addListener(new RedisPubSubAdapter() { + + @Override + public void message(String channel, String message) { + messages.add(message); + } + + }); + + // Subscribe on second database + pubsub.sync().subscribe("listenertest"); + + // Publish on second database + StatefulRedisMultiDbPubSubConnection publisher = multiDbClient.connectPubSub(); + publisher.switchToDatabase(other); + publisher.sync().publish("listenertest", "Listener test message"); + + String message = messages.take(); + assertEquals("Listener test message", message); + + pubsub.close(); + publisher.close(); + } + + // ============ Tests for Isolation After Database Switch ============ + + @Test + void shouldNotReceiveMessagesFromOldEndpointAfterSwitch() throws Exception { + try (StatefulRedisMultiDbPubSubConnection multiDbConn = multiDbClient.connectPubSub()) { + + // Get the endpoints + RedisURI firstDb = multiDbConn.getCurrentEndpoint(); + RedisURI secondDb = StreamSupport.stream(multiDbConn.getEndpoints().spliterator(), false) + .filter(uri -> !uri.equals(firstDb)).findFirst().get(); + + BlockingQueue messages = LettuceFactories.newBlockingQueue(); + multiDbConn.addListener(new RedisPubSubAdapter() { + + @Override + public void message(String channel, String message) { + messages.add(message); + } + + }); + + try (StatefulRedisPubSubConnection conn1 = RedisClient.create(firstDb).connectPubSub()) { + try (StatefulRedisPubSubConnection conn2 = RedisClient.create(secondDb).connectPubSub()) { + + // Subscribe on first database + multiDbConn.sync().subscribe("isolationtest"); + + // Wait for subscription to be established + Wait.untilTrue(() -> { + AtomicInteger msgId = new AtomicInteger(0); + try { + String msg = "Initial message " + msgId.incrementAndGet(); + conn1.sync().publish("isolationtest", "Initial message"); + String received = messages.poll(1, TimeUnit.SECONDS); + return msg.equals(received); + } catch (InterruptedException e) { + } + return false; + }); + + // Switch to second database + multiDbConn.switchToDatabase(secondDb); + + Wait.untilTrue(() -> conn2.sync().pubsubChannels().contains("isolationtest")); + assertThat(conn2.sync().pubsubChannels()).contains("isolationtest"); + + assertThat(conn1.sync().pubsubChannels()).doesNotContain("isolationtest"); + + // Publish on the OLD endpoint (firstDb) - should NOT be received + conn1.sync().publish("isolationtest", "Message from first db"); + + conn2.sync().publish("isolationtest", "Message from second db"); + + // We should only receive the message from the second endpoint + String message = messages.poll(1, TimeUnit.SECONDS); + assertEquals("Message from second db", message); + + // Verify no additional messages are received (old endpoint message should not arrive) + String unexpectedMessage = messages.poll(500, TimeUnit.MILLISECONDS); + assertThat(unexpectedMessage).isNull(); + } + } + } + } + + @Test + void shouldUnsubscribeChannelsFromOldEndpointAfterSwitch() throws Exception { + StatefulRedisMultiDbPubSubConnection pubsub = multiDbClient.connectPubSub(); + BlockingQueue messages = LettuceFactories.newBlockingQueue(); + + pubsub.addListener(new RedisPubSubAdapter() { + + @Override + public void message(String channel, String message) { + messages.add(message); + } + + }); + + // Get the endpoints + RedisURI firstDb = pubsub.getCurrentEndpoint(); + RedisURI secondDb = StreamSupport.stream(pubsub.getEndpoints().spliterator(), false).filter(uri -> !uri.equals(firstDb)) + .findFirst().get(); + + // Subscribe on first database + pubsub.sync().subscribe("unsubtest"); + + // Wait for subscription to be established + Thread.sleep(200); + + // Switch to second database - this should trigger unsubscribe on old endpoint + pubsub.switchToDatabase(secondDb); + + // Wait a bit for unsubscribe to complete + Thread.sleep(500); + + // Verify that the old endpoint no longer receives messages + // by publishing on the old endpoint and verifying no message is received + StatefulRedisMultiDbPubSubConnection oldPublisher = multiDbClient.connectPubSub(); + oldPublisher.sync().publish("unsubtest", "Message from old endpoint after unsubscribe"); + + // Verify no message is received from old endpoint + String unexpectedMessage = messages.poll(1, TimeUnit.SECONDS); + assertThat(unexpectedMessage).isNull(); + + // Now verify that the new endpoint still receives messages + // by re-subscribing on the new endpoint + pubsub.sync().subscribe("unsubtest"); + + // Publish on the new endpoint + StatefulRedisMultiDbPubSubConnection newPublisher = multiDbClient.connectPubSub(); + newPublisher.switchToDatabase(secondDb); + newPublisher.sync().publish("unsubtest", "Message from new endpoint"); + + // Verify message is received from new endpoint + String expectedMessage = messages.poll(1, TimeUnit.SECONDS); + assertEquals("Message from new endpoint", expectedMessage); + + pubsub.close(); + oldPublisher.close(); + newPublisher.close(); + } + +} diff --git a/src/test/java/io/lettuce/test/LettuceExtension.java b/src/test/java/io/lettuce/test/LettuceExtension.java index 88bc216874..28db276df9 100644 --- a/src/test/java/io/lettuce/test/LettuceExtension.java +++ b/src/test/java/io/lettuce/test/LettuceExtension.java @@ -7,7 +7,6 @@ import java.lang.annotation.Target; import java.lang.reflect.Parameter; import java.lang.reflect.Type; -import java.time.Duration; import java.util.*; import java.util.function.Function; import java.util.function.Supplier; @@ -25,10 +24,12 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.dynamic.support.ResolvableType; +import io.lettuce.core.failover.MultiDbClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.resource.ClientResources; import io.lettuce.test.resource.DefaultRedisClient; import io.lettuce.test.resource.DefaultRedisClusterClient; +import io.lettuce.test.resource.DefaultRedisMultiDbClient; import io.lettuce.test.resource.TestClientResources; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -88,16 +89,17 @@ public class LettuceExtension implements ParameterResolver, AfterAllCallback, Af private final ExtensionContext.Namespace LETTUCE = ExtensionContext.Namespace.create("lettuce.parameters"); - private static final Set> SUPPORTED_INJECTABLE_TYPES = new HashSet<>( - Arrays.asList(StatefulRedisConnection.class, StatefulRedisPubSubConnection.class, RedisCommands.class, - RedisClient.class, ClientResources.class, StatefulRedisClusterConnection.class, RedisClusterClient.class)); + private static final Set> SUPPORTED_INJECTABLE_TYPES = new HashSet<>(Arrays.asList(StatefulRedisConnection.class, + StatefulRedisPubSubConnection.class, RedisCommands.class, RedisClient.class, ClientResources.class, + StatefulRedisClusterConnection.class, RedisClusterClient.class, MultiDbClient.class)); private static final Set> CLOSE_AFTER_EACH = new HashSet<>(Arrays.asList(StatefulRedisConnection.class, StatefulRedisPubSubConnection.class, StatefulRedisClusterConnection.class)); private static final List> SUPPLIERS = Arrays.asList(ClientResourcesSupplier.INSTANCE, RedisClusterClientSupplier.INSTANCE, RedisClientSupplier.INSTANCE, StatefulRedisConnectionSupplier.INSTANCE, - StatefulRedisPubSubConnectionSupplier.INSTANCE, StatefulRedisClusterConnectionSupplier.INSTANCE); + StatefulRedisPubSubConnectionSupplier.INSTANCE, StatefulRedisClusterConnectionSupplier.INSTANCE, + RedisMultiDbClientSupplier.INSTANCE); private static final List> RESOURCE_FUNCTIONS = Collections.singletonList(RedisCommandsFunction.INSTANCE); @@ -294,6 +296,17 @@ public RedisClient get() { } + enum RedisMultiDbClientSupplier implements Supplier { + + INSTANCE; + + @Override + public MultiDbClient get() { + return DefaultRedisMultiDbClient.get(); + } + + } + enum RedisClusterClientSupplier implements Supplier { INSTANCE; diff --git a/src/test/java/io/lettuce/test/resource/DefaultRedisMultiDbClient.java b/src/test/java/io/lettuce/test/resource/DefaultRedisMultiDbClient.java new file mode 100644 index 0000000000..9280b98063 --- /dev/null +++ b/src/test/java/io/lettuce/test/resource/DefaultRedisMultiDbClient.java @@ -0,0 +1,29 @@ +package io.lettuce.test.resource; + +import io.lettuce.core.failover.MultiDbClient; +import io.lettuce.core.failover.MultiDbTestSupport; + +/** + * @author Ali Takavci + */ +public class DefaultRedisMultiDbClient { + + private static final DefaultRedisMultiDbClient instance = new DefaultRedisMultiDbClient(); + + private final MultiDbClient redisClient; + + private DefaultRedisMultiDbClient() { + redisClient = MultiDbClient.create(MultiDbTestSupport.DBs); + Runtime.getRuntime().addShutdownHook(new Thread(() -> FastShutdown.shutdown(redisClient))); + } + + /** + * Do not close the client. + * + * @return the default redis client for the tests. + */ + public static MultiDbClient get() { + return instance.redisClient; + } + +} diff --git a/src/test/java/io/lettuce/test/resource/FastShutdown.java b/src/test/java/io/lettuce/test/resource/FastShutdown.java index 4f4f3434dc..e9900ad4c5 100644 --- a/src/test/java/io/lettuce/test/resource/FastShutdown.java +++ b/src/test/java/io/lettuce/test/resource/FastShutdown.java @@ -3,6 +3,7 @@ import java.util.concurrent.TimeUnit; import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.failover.MultiDbClient; import io.lettuce.core.resource.ClientResources; /** @@ -19,6 +20,15 @@ public static void shutdown(AbstractRedisClient redisClient) { redisClient.shutdown(0, 10, TimeUnit.MILLISECONDS); } + /** + * Shut down a {@link AbstractRedisClient} with a timeout of 10ms. + * + * @param redisClient + */ + public static void shutdown(MultiDbClient redisClient) { + redisClient.shutdown(0, 10, TimeUnit.MILLISECONDS); + } + /** * Shut down a {@link ClientResources} client with a timeout of 10ms. *