Skip to content

chore: remove usage of deprecated connection methods in command APIs in integration tests (#3328) #3343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1112,11 +1112,6 @@ public RedisFuture<List<Map<String, Object>>> functionList(String libraryName) {
return dispatch(commandBuilder.functionList(libraryName));
}

@Override
public void flushCommands() {
connection.flushCommands();
}

@Override
public RedisFuture<String> flushall() {
return dispatch(commandBuilder.flushall());
Expand Down Expand Up @@ -1552,11 +1547,6 @@ public RedisFuture<String> info(String section) {
return dispatch(commandBuilder.info(section));
}

@Override
public boolean isOpen() {
return connection.isOpen();
}

@Override
public RedisFuture<String> ftCreate(K index, CreateArgs<K, V> options, List<FieldArgs<K>> fieldArgs) {
return dispatch(searchCommandBuilder.ftCreate(index, options, fieldArgs));
Expand Down Expand Up @@ -2557,11 +2547,6 @@ public RedisFuture<V> setGet(K key, V value, SetArgs setArgs) {
return dispatch(commandBuilder.setGet(key, value, setArgs));
}

@Override
public void setAutoFlushCommands(boolean autoFlush) {
connection.setAutoFlushCommands(autoFlush);
}

@Override
public void setTimeout(Duration timeout) {
connection.setTimeout(timeout);
Expand Down
19 changes: 0 additions & 19 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,6 @@ public Mono<Long> clientUnblock(long id, UnblockType type) {
return createMono(() -> commandBuilder.clientUnblock(id, type));
}

public void close() {
connection.close();
}

@Override
public Mono<String> clusterAddSlots(int... slots) {
return createMono(() -> commandBuilder.clusterAddslots(slots));
Expand Down Expand Up @@ -1176,11 +1172,6 @@ public Flux<Map<String, Object>> functionList(String libraryName) {
return createDissolvingFlux(() -> commandBuilder.functionList(libraryName));
}

@Override
public void flushCommands() {
connection.flushCommands();
}

@Override
public Mono<String> flushall() {
return createMono(commandBuilder::flushall);
Expand Down Expand Up @@ -1617,11 +1608,6 @@ public Mono<String> info(String section) {
return createMono(() -> commandBuilder.info(section));
}

@Override
public boolean isOpen() {
return connection.isOpen();
}

@Override
public Mono<String> ftCreate(K index, CreateArgs<K, V> options, List<FieldArgs<K>> fieldArgs) {
return createMono(() -> searchCommandBuilder.ftCreate(index, options, fieldArgs));
Expand Down Expand Up @@ -2641,11 +2627,6 @@ public Mono<V> setGet(K key, V value, SetArgs setArgs) {
return createMono(() -> commandBuilder.setGet(key, value, setArgs));
}

@Override
public void setAutoFlushCommands(boolean autoFlush) {
connection.setAutoFlushCommands(autoFlush);
}

@Override
public void setTimeout(Duration timeout) {
connection.setTimeout(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,6 @@ public interface BaseRedisAsyncCommands<K, V> {
*/
<T> RedisFuture<T> dispatch(ProtocolKeyword type, CommandOutput<K, V, T> output, CommandArgs<K, V> args);

/**
* @return {@code true} if the connection is open (connected and not closed).
* @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#isOpen()} method on the
* connection interface. To be removed with Lettuce 7.0.
*/
@Deprecated
boolean isOpen();

/**
* Reset the command state. Queued commands will be canceled and the internal state will be reset. This is useful when the
* internal state machine gets out of sync with the connection.
Expand All @@ -206,26 +198,4 @@ public interface BaseRedisAsyncCommands<K, V> {
@Deprecated
void reset();

/**
* Disable or enable auto-flush behavior. Default is {@code true}. If autoFlushCommands is disabled, multiple commands can
* be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is
* issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis.
*
* @param autoFlush state of autoFlush.
* @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#setAutoFlushCommands(boolean)}
* method on the connection interface. To be removed with Lettuce 7.0.
*/
@Deprecated
void setAutoFlushCommands(boolean autoFlush);

/**
* Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to
* achieve batching. No-op if channel is not connected.
*
* @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#flushCommands()} method on the
* connection interface. To be removed with Lettuce 7.0.
*/
@Deprecated
void flushCommands();

}
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,6 @@ public interface BaseRedisReactiveCommands<K, V> {
*/
<T> Flux<T> dispatch(ProtocolKeyword type, CommandOutput<K, V, ?> output, CommandArgs<K, V> args);

/**
* @return {@code true} if the connection is open (connected and not closed).
* @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#isOpen()} method on the
* connection interface. To be removed with Lettuce 7.0.
*/
@Deprecated
boolean isOpen();

/**
* Reset the command state. Queued commands will be canceled and the internal state will be reset. This is useful when the
* internal state machine gets out of sync with the connection.
Expand All @@ -207,28 +199,6 @@ public interface BaseRedisReactiveCommands<K, V> {
@Deprecated
void reset();

/**
* Disable or enable auto-flush behavior. Default is {@code true}. If autoFlushCommands is disabled, multiple commands can
* be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is
* issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis.
*
* @param autoFlush state of autoFlush.
* @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#setAutoFlushCommands(boolean)}
* method on the connection interface. To be removed with Lettuce 7.0.
*/
@Deprecated
void setAutoFlushCommands(boolean autoFlush);

/**
* Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to
* achieve batching. No-op if channel is not connected.
*
* @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#flushCommands()} method on the
* connection interface. To be removed with Lettuce 7.0.
*/
@Deprecated
void flushCommands();

/**
* @return the currently configured instance of the {@link JsonParser}
* @since 6.5
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,6 @@ public interface BaseRedisCommands<K, V> {
*/
<T> T dispatch(ProtocolKeyword type, CommandOutput<K, V, T> output, CommandArgs<K, V> args);

/**
* @return {@code true} if the connection is open (connected and not closed).
* @deprecated since 6.2. Use the corresponding {@link io.lettuce.core.api.StatefulConnection#isOpen()} method on the
* connection interface. To be removed with Lettuce 7.0.
*/
@Deprecated
boolean isOpen();

/**
* Reset the command state. Queued commands will be canceled and the internal state will be reset. This is useful when the
* internal state machine gets out of sync with the connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,22 @@ public RedisFuture<String> clientSetname(K name) {

RedisURI uri = redisClusterNode.getUri();

CompletableFuture<RedisClusterAsyncCommands<K, V>> byNodeId = getConnectionAsync(redisClusterNode.getNodeId());
CompletableFuture<StatefulRedisConnection<K, V>> byNodeId = getStatefulConnection(redisClusterNode.getNodeId());

executions.put("NodeId: " + redisClusterNode.getNodeId(), byNodeId.thenCompose(c -> {

if (c.isOpen()) {
return c.clientSetname(name);
return c.async().clientSetname(name);
}
return ok;
}));

CompletableFuture<RedisClusterAsyncCommands<K, V>> byHost = getConnectionAsync(uri.getHost(), uri.getPort());
CompletableFuture<StatefulRedisConnection<K, V>> byHost = getStatefulConnection(uri.getHost(), uri.getPort());

executions.put("HostAndPort: " + redisClusterNode.getNodeId(), byHost.thenCompose(c -> {

if (c.isOpen()) {
return c.clientSetname(name);
return c.async().clientSetname(name);
}
return ok;
}));
Expand Down Expand Up @@ -596,9 +596,12 @@ public RedisClusterAsyncCommands<K, V> getConnection(String host, int port) {
return getStatefulConnection().getConnection(host, port).async();
}

private CompletableFuture<RedisClusterAsyncCommands<K, V>> getConnectionAsync(String nodeId) {
return getConnectionProvider().<K, V> getConnectionAsync(ConnectionIntent.WRITE, nodeId)
.thenApply(StatefulRedisConnection::async);
private CompletableFuture<StatefulRedisConnection<K, V>> getStatefulConnection(String nodeId) {
return getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, nodeId);
}

private CompletableFuture<StatefulRedisConnection<K, V>> getStatefulConnection(String host, int port) {
return getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, host, port);
}

private CompletableFuture<RedisClusterAsyncCommands<K, V>> getConnectionAsync(String host, int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,23 +136,23 @@ public Mono<String> clientSetname(K name) {

for (RedisClusterNode redisClusterNode : getStatefulConnection().getPartitions()) {

Mono<RedisClusterReactiveCommands<K, V>> byNodeId = getConnectionReactive(redisClusterNode.getNodeId());
Mono<StatefulRedisConnection<K, V>> byNodeId = getStatefulConnection(redisClusterNode.getNodeId());

publishers.add(byNodeId.flatMap(conn -> {

if (conn.isOpen()) {
return conn.clientSetname(name);
return conn.reactive().clientSetname(name);
}
return Mono.empty();
}));

Mono<RedisClusterReactiveCommands<K, V>> byHost = getConnectionReactive(redisClusterNode.getUri().getHost(),
Mono<StatefulRedisConnection<K, V>> byHost = getStatefulConnection(redisClusterNode.getUri().getHost(),
redisClusterNode.getUri().getPort());

publishers.add(byHost.flatMap(conn -> {

if (conn.isOpen()) {
return conn.clientSetname(name);
return conn.reactive().clientSetname(name);
}
return Mono.empty();
}));
Expand Down Expand Up @@ -441,6 +441,10 @@ public RedisClusterReactiveCommands<K, V> getConnection(String nodeId) {
return getStatefulConnection().getConnection(nodeId).reactive();
}

private Mono<StatefulRedisConnection<K, V>> getStatefulConnection(String nodeId) {
return getMono(getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, nodeId));
}

private Mono<RedisClusterReactiveCommands<K, V>> getConnectionReactive(String nodeId) {
return getMono(getConnectionProvider().<K, V> getConnectionAsync(ConnectionIntent.WRITE, nodeId))
.map(StatefulRedisConnection::reactive);
Expand All @@ -456,6 +460,10 @@ private Mono<RedisClusterReactiveCommands<K, V>> getConnectionReactive(String ho
.map(StatefulRedisConnection::reactive);
}

private Mono<StatefulRedisConnection<K, V>> getStatefulConnection(String host, int port) {
return getMono(getConnectionProvider().<K, V> getConnectionAsync(ConnectionIntent.WRITE, host, port));
}

@Override
public StatefulRedisClusterConnection<K, V> getStatefulConnection() {
return (StatefulRedisClusterConnection<K, V>) super.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,6 @@ public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
return asyncCommand;
}

public void close() {
connection.close();
}

@Override
public boolean isOpen() {
return connection.isOpen();
}

@Override
public StatefulRedisSentinelConnection<K, V> getStatefulConnection() {
return (StatefulRedisSentinelConnection<K, V>) connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,6 @@ public <T> Flux<T> dispatch(ProtocolKeyword type, CommandOutput<K, V, ?> output,
return (Flux) createFlux(() -> new Command<>(type, output, args));
}

@Override
public void close() {
getStatefulConnection().close();
}

@Override
public boolean isOpen() {
return getStatefulConnection().isOpen();
}

@Override
public StatefulRedisSentinelConnection<K, V> getStatefulConnection() {
return (StatefulRedisSentinelConnection<K, V>) super.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,6 @@ public interface RedisSentinelAsyncCommands<K, V> {
*/
<T> RedisFuture<T> dispatch(ProtocolKeyword type, CommandOutput<K, V, T> output, CommandArgs<K, V> args);

/**
* @return {@code true} if the connection is open (connected and not closed).
*/
boolean isOpen();

/**
* @return the underlying connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,6 @@ public interface RedisSentinelReactiveCommands<K, V> {
*/
<T> Flux<T> dispatch(ProtocolKeyword type, CommandOutput<K, V, ?> output, CommandArgs<K, V> args);

/**
* @return {@code true} if the connection is open (connected and not closed).
*/
boolean isOpen();

/**
* @return the underlying connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,6 @@ public interface RedisSentinelCommands<K, V> {
*/
<T> T dispatch(ProtocolKeyword type, CommandOutput<K, V, T> output, CommandArgs<K, V> args);

/**
* @return {@code true} if the connection is open (connected and not closed).
*/
boolean isOpen();

/**
* @return the underlying connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,34 +188,5 @@ interface BaseRedisCoroutinesCommands<K : Any, V : Any> {
*/
fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): Flow<T>

/**
* @return {@code true} if the connection is open (connected and not closed).
* @deprecated since 6.2. Use the corresponding [io.lettuce.core.api.StatefulConnection#isOpen()] method on the connection
* interface. To be removed with Lettuce 7.0.
*/
@Deprecated("since 6.2, to be removed with Lettuce 7")
fun isOpen(): Boolean

/**
* Disable or enable auto-flush behavior. Default is `true`. If autoFlushCommands is disabled, multiple commands can
* be issued without writing them actually to the transport. Commands are buffered until a [flushCommands] is
* issued. After calling [flushCommands] commands are sent to the transport and executed by Redis.
*
* @param autoFlush state of autoFlush.
* @deprecated since 6.2. Use the corresponding [io.lettuce.core.api.StatefulConnection#setAutoFlushCommands(boolean)] method on the connection
* interface. To be removed with Lettuce 7.0.
*/
@Deprecated("since 6.2, to be removed with Lettuce 7")
fun setAutoFlushCommands(autoFlush: Boolean)

/**
* Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to
* achieve batching. No-op if channel is not connected.
* @deprecated since 6.2. Use the corresponding [io.lettuce.core.api.StatefulConnection#flushCommands()] method on the connection
* interface. To be removed with Lettuce 7.0.
*/
@Deprecated("since 6.2, to be removed with Lettuce 7")
fun flushCommands()

}

Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,5 @@ internal class BaseRedisCoroutinesCommandsImpl<K : Any, V : Any>(internal val op

override fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): Flow<T> = ops.dispatch<T>(type, output, args).asFlow()

override fun isOpen(): Boolean = ops.isOpen

override fun setAutoFlushCommands(autoFlush: Boolean) = ops.setAutoFlushCommands(autoFlush)

override fun flushCommands() = ops.flushCommands()

}

Loading
Loading