Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions sources/Valkey.Glide/Abstract/Condition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public override string ToString()

internal override List<ICmd> CreateCommands()
=> [
Request.CustomCommand(["WATCH", key]),
Request.Watch([key]),
Request.CustomCommand([cmd.ToString(), key, expectedValue]),
];

Expand Down Expand Up @@ -495,7 +495,7 @@ public override string ToString()

internal sealed override List<ICmd> CreateCommands()
=> [
Request.CustomCommand(["WATCH", key]),
Request.Watch([key]),
Request.CustomCommand(cmd == ValkeyCommand.GET
? [cmd.ToString(), key]
: [cmd.ToString(), key, memberName]
Expand Down Expand Up @@ -540,7 +540,7 @@ public override string ToString()

internal sealed override List<ICmd> CreateCommands()
=> [
Request.CustomCommand(["WATCH", key]),
Request.Watch([key]),
Request.CustomCommand([ValkeyCommand.LINDEX.ToString(), key, index.ToString()]),
];

Expand Down Expand Up @@ -588,7 +588,7 @@ private string GetComparisonString()

internal sealed override List<ICmd> CreateCommands()
=> [
Request.CustomCommand(["WATCH", key]),
Request.Watch([key]),
Request.CustomCommand([cmd.ToString(), key]),
];

Expand Down Expand Up @@ -624,7 +624,7 @@ private string GetComparisonString()

internal sealed override List<ICmd> CreateCommands()
=> [
Request.CustomCommand(["WATCH", key]),
Request.Watch([key]),
Request.CustomCommand(["ZCOUNT", key, min, max]),
];

Expand Down Expand Up @@ -658,7 +658,7 @@ public override string ToString()

internal sealed override List<ICmd> CreateCommands()
=> [
Request.CustomCommand(["WATCH", key]),
Request.Watch([key]),
Request.CustomCommand(["ZCOUNT", key, sortedSetScore, sortedSetScore]),
];

Expand Down
53 changes: 53 additions & 0 deletions sources/Valkey.Glide/Commands/ITransactionBaseCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

namespace Valkey.Glide.Commands;

/// <summary>
/// Supports commands for the "Transaction Commands" group for standalone and cluster clients.
/// <br />
/// See more on <see href="https://valkey.io/commands/?group=transactions">valkey.io</see>.
/// </summary>
public interface ITransactionBaseCommands
{
/// <summary>
/// Marks the given keys to be watched for conditional execution of a transaction. Transactions
/// will only execute commands if the watched keys are not modified before execution of the
/// transaction.
/// </summary>
/// <param name="keys">The keys to watch.</param>
/// <param name="flags">The flags to use for this operation. Currently flags are ignored.</param>
/// <returns>"OK" if the keys were successfully watched.</returns>
/// <remarks>
/// <para>
/// In cluster mode, if keys in <paramref name="keys"/> map to different hash slots, the command
/// will be split across these slots and executed separately for each. This means the command
/// is atomic only at the slot level. If one or more slot-specific requests fail, the entire
/// call will return the first encountered error, even though some requests may have succeeded
/// while others did not. If this behavior impacts your application logic, consider splitting
/// the request into sub-requests per slot to ensure atomicity.
/// </para>
/// <example>
/// <code>
/// bool result = await client.WatchAsync(["sampleKey"]);
/// // result is "OK"
///
/// // Execute transaction
/// var batch = new Batch(true)
/// .StringSetAsync("sampleKey", "foobar");
/// object[] transactionResult = await client.Exec(batch, false);
/// // transactionResult is not null if transaction executed successfully
///
/// // Watch key again
/// await client.WatchAsync(["sampleKey"]);
/// var batch2 = new Batch(true)
/// .StringSetAsync("sampleKey", "foobar");
/// // Modify the watched key from another client/connection
/// await client.StringSetAsync("sampleKey", "hello world");
/// object[] transactionResult2 = await client.Exec(batch2, true);
/// // transactionResult2 is null because the watched key was modified
/// </code>
/// </example>
/// </remarks>
/// <seealso href="https://valkey.io/commands/watch/"/>
Task<string> WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None);
}
46 changes: 46 additions & 0 deletions sources/Valkey.Glide/Commands/ITransactionClusterCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

namespace Valkey.Glide.Commands;

/// <summary>
/// Supports commands for the "Transaction Commands" group for cluster clients.
/// <br />
/// See more on <see href="https://valkey.io/commands/?group=transactions">valkey.io</see>.
/// </summary>
public interface ITransactionClusterCommands : ITransactionBaseCommands
{
/// <summary>
/// Flushes all the previously watched keys for a transaction. Executing a transaction will
/// automatically flush all previously watched keys.
/// The command will be routed to all primary nodes.
/// </summary>
/// <param name="flags">The flags to use for this operation. Currently flags are ignored.</param>
/// <returns>"OK" if the keys were successfully unwatched.</returns>
/// <example>
/// <code>
/// await client.WatchAsync(["sampleKey"]);
/// string result = await client.UnwatchAsync();
/// // result is "OK", "sampleKey" is no longer watched on all primary nodes
/// </code>
/// </example>
/// <seealso href="https://valkey.io/commands/unwatch/"/>
Task<string> UnwatchAsync(CommandFlags flags = CommandFlags.None);

/// <summary>
/// Flushes all the previously watched keys for a transaction. Executing a transaction will
/// automatically flush all previously watched keys.
/// </summary>
/// <param name="route">Specifies the routing configuration for the command. The client will route the
/// command to the nodes defined by <paramref name="route"/>.</param>
/// <param name="flags">The flags to use for this operation. Currently flags are ignored.</param>
/// <returns>"OK" if the keys were successfully unwatched.</returns>
/// <example>
/// <code>
/// await client.WatchAsync(["sampleKey"]);
/// string result = await client.UnwatchAsync(Route.AllPrimaries);
/// // result is "OK", "sampleKey" is no longer watched on all primary nodes
/// </code>
/// </example>
/// <seealso href="https://valkey.io/commands/unwatch/"/>
Task<string> UnwatchAsync(Route route, CommandFlags flags = CommandFlags.None);
}
27 changes: 27 additions & 0 deletions sources/Valkey.Glide/Commands/ITransactionCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

namespace Valkey.Glide.Commands;

/// <summary>
/// Supports commands for the "Transaction Commands" group for standalone clients.
/// <br />
/// See more on <see href="https://valkey.io/commands/?group=transactions">valkey.io</see>.
/// </summary>
public interface ITransactionCommands : ITransactionBaseCommands
{
/// <summary>
/// Flushes all the previously watched keys for a transaction. Executing a transaction will
/// automatically flush all previously watched keys.
/// </summary>
/// <param name="flags">The flags to use for this operation. Currently flags are ignored.</param>
/// <returns>"OK" if the keys were successfully unwatched.</returns>
/// <example>
/// <code>
/// await client.WatchAsync(["sampleKey"]);
/// bool result = await client.UnwatchAsync();
/// // result is "OK", "sampleKey" is no longer watched
/// </code>
/// </example>
/// <seealso href="https://valkey.io/commands/unwatch/"/>
Task<string> UnwatchAsync(CommandFlags flags = CommandFlags.None);
}
14 changes: 13 additions & 1 deletion sources/Valkey.Glide/GlideClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Valkey.Glide;
/// <summary>
/// Client used for connection to standalone servers. Use <see cref="CreateClient"/> to request a client.
/// </summary>
public class GlideClient : BaseClient, IGenericCommands, IServerManagementCommands, IConnectionManagementCommands
public class GlideClient : BaseClient, IGenericCommands, IServerManagementCommands, IConnectionManagementCommands, ITransactionCommands
{
internal GlideClient() { }

Expand Down Expand Up @@ -202,6 +202,18 @@ public async IAsyncEnumerable<ValkeyKey> KeysAsync(int database = -1, ValkeyValu
public async Task<(string cursor, ValkeyKey[] keys)> ScanAsync(string cursor, ScanOptions? options = null)
=> await Command(Request.ScanAsync(cursor, options));

public async Task<string> WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None)
{
Utils.Requires<NotImplementedException>(flags == CommandFlags.None, "Command flags are not supported by GLIDE");
return await Command(Request.Watch(keys));
}

public async Task<string> UnwatchAsync(CommandFlags flags = CommandFlags.None)
{
Utils.Requires<NotImplementedException>(flags == CommandFlags.None, "Command flags are not supported by GLIDE");
return await Command(Request.Unwatch());
}

protected override async Task<Version> GetServerVersionAsync()
{
if (_serverVersion == null)
Expand Down
20 changes: 19 additions & 1 deletion sources/Valkey.Glide/GlideClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Valkey.Glide;
/// <summary>
/// Client used for connection to cluster servers. Use <see cref="CreateClient"/> to request a client.
/// </summary>
public sealed class GlideClusterClient : BaseClient, IGenericClusterCommands, IServerManagementClusterCommands, IConnectionManagementClusterCommands
public sealed class GlideClusterClient : BaseClient, IGenericClusterCommands, IServerManagementClusterCommands, IConnectionManagementClusterCommands, ITransactionClusterCommands
{
private GlideClusterClient() { }

Expand Down Expand Up @@ -305,6 +305,24 @@ public async Task<string> SelectAsync(long index, CommandFlags flags = CommandFl
return await Command(Request.Select(index), Route.Random);
}

public async Task<string> WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None)
{
Utils.Requires<NotImplementedException>(flags == CommandFlags.None, "Command flags are not supported by GLIDE");
return await Command(Request.Watch(keys));
}

public async Task<string> UnwatchAsync(CommandFlags flags = CommandFlags.None)
{
Utils.Requires<NotImplementedException>(flags == CommandFlags.None, "Command flags are not supported by GLIDE");
return await Command(Request.Unwatch(), AllPrimaries);
}

public async Task<string> UnwatchAsync(Route route, CommandFlags flags = CommandFlags.None)
{
Utils.Requires<NotImplementedException>(flags == CommandFlags.None, "Command flags are not supported by GLIDE");
return await Command(Request.Unwatch(), route);
}

protected override async Task<Version> GetServerVersionAsync()
{
if (_serverVersion == null)
Expand Down
26 changes: 26 additions & 0 deletions sources/Valkey.Glide/Internals/Request.TransactionCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

using static Valkey.Glide.Internals.FFI;

namespace Valkey.Glide.Internals;

internal partial class Request
{
/// <summary>
/// Creates a command to watch keys for conditional execution of a transaction.
/// </summary>
/// <param name="keys">The keys to watch.</param>
/// <returns>A command that watches the specified keys.</returns>
public static Cmd<string, string> Watch(ValkeyKey[] keys)
{
GlideString[] args = [.. keys.Select(k => (GlideString)k)];
return new(RequestType.Watch, args, false, response => response);
}

/// <summary>
/// Creates a command to flush all previously watched keys for a transaction.
/// </summary>
/// <returns>A command that unwatches all previously watched keys.</returns>
public static Cmd<string, string> Unwatch()
=> new(RequestType.UnWatch, [], false, response => response);
}
105 changes: 105 additions & 0 deletions tests/Valkey.Glide.IntegrationTests/SharedBatchTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,109 @@ public async Task BatchDumpAndRestore(BaseClient client, bool isAtomic)
);

}

[Theory(DisableDiscoveryEnumeration = true)]
[MemberData(nameof(GetTestClientWithAtomic))]
public async Task WatchTransactionTest(BaseClient client, bool isAtomic)
{
string key1 = "{key}-1" + Guid.NewGuid();
string key2 = "{key}-2" + Guid.NewGuid();
string key3 = "{key}-3" + Guid.NewGuid();
string foobarString = "foobar";
string helloString = "hello";
ValkeyKey[] keys = [key1, key2, key3];

bool isCluster = client is GlideClusterClient;

// Returns null when a watched key is modified before transaction execution
string watchResult = isCluster
? await ((GlideClusterClient)client).WatchAsync(keys)
: await ((GlideClient)client).WatchAsync(keys);
Assert.Equal("OK", watchResult);

await client.StringSetAsync(key2, helloString);

object?[]? execResult;
if (isCluster)
{
var clusterBatch = new ClusterBatch(true);
_ = clusterBatch.StringSetAsync(key1, foobarString)
.StringSetAsync(key2, foobarString)
.StringSetAsync(key3, foobarString);
execResult = await ((GlideClusterClient)client).Exec(clusterBatch, true);
}
else
{
var batch = new Batch(true);
_ = batch.StringSetAsync(key1, foobarString)
.StringSetAsync(key2, foobarString)
.StringSetAsync(key3, foobarString);
execResult = await ((GlideClient)client).Exec(batch, true);
}

// The transaction should fail (return null) because key2 was modified after being watched
Assert.Null(execResult);

// Verify the key values: transaction was aborted, so only key2 (set before transaction) should have a value
var key1Value = await client.StringGetAsync(key1);
var key2Value = await client.StringGetAsync(key2);
var key3Value = await client.StringGetAsync(key3);

Assert.True(key1Value.IsNull); // key1 should not be set
Assert.Equal(helloString, key2Value); // key2 should have the value set before transaction
Assert.True(key3Value.IsNull); // key3 should not be set
}

[Theory(DisableDiscoveryEnumeration = true)]
[MemberData(nameof(GetTestClientWithAtomic))]
public async Task UnwatchTest(BaseClient client, bool isAtomic)
{
string key1 = "{key}-1" + Guid.NewGuid();
string key2 = "{key}-2" + Guid.NewGuid();
string foobarString = "foobar";
string helloString = "hello";
ValkeyKey[] keys = [key1, key2];

bool isCluster = client is GlideClusterClient;

// UNWATCH returns OK when there are no watched keys
string unwatchResult = isCluster
? await ((GlideClusterClient)client).UnwatchAsync()
: await ((GlideClient)client).UnwatchAsync();
Assert.Equal("OK", unwatchResult);

// Transaction executes successfully after modifying a watched key then calling UNWATCH
if (isCluster)
{
await ((GlideClusterClient)client).WatchAsync(keys);
}
else
{
await ((GlideClient)client).WatchAsync(keys);
}
await client.StringSetAsync(key2, helloString);
unwatchResult = isCluster
? await ((GlideClusterClient)client).UnwatchAsync()
: await ((GlideClient)client).UnwatchAsync();
Assert.Equal("OK", unwatchResult);

object?[]? execResult;
if (isCluster)
{
var clusterBatch = new ClusterBatch(true);
_ = clusterBatch.StringSetAsync(key1, foobarString).StringSetAsync(key2, foobarString);
execResult = await ((GlideClusterClient)client).Exec(clusterBatch, true);
}
else
{
var batch = new Batch(true);
_ = batch.StringSetAsync(key1, foobarString).StringSetAsync(key2, foobarString);
execResult = await ((GlideClient)client).Exec(batch, true);
}

Assert.NotNull(execResult); // Transaction should succeed after unwatch
Assert.Equal(2, execResult.Length);
Assert.Equal(foobarString, await client.StringGetAsync(key1));
Assert.Equal(foobarString, await client.StringGetAsync(key2));
}
}
Loading
Loading