diff --git a/sources/Valkey.Glide/Abstract/Condition.cs b/sources/Valkey.Glide/Abstract/Condition.cs index 4d8d481..4e68bd8 100644 --- a/sources/Valkey.Glide/Abstract/Condition.cs +++ b/sources/Valkey.Glide/Abstract/Condition.cs @@ -450,7 +450,7 @@ public override string ToString() internal override List CreateCommands() => [ - Request.CustomCommand(["WATCH", key]), + Request.Watch([key]), Request.CustomCommand([cmd.ToString(), key, expectedValue]), ]; @@ -495,7 +495,7 @@ public override string ToString() internal sealed override List CreateCommands() => [ - Request.CustomCommand(["WATCH", key]), + Request.Watch([key]), Request.CustomCommand(cmd == ValkeyCommand.GET ? [cmd.ToString(), key] : [cmd.ToString(), key, memberName] @@ -540,7 +540,7 @@ public override string ToString() internal sealed override List CreateCommands() => [ - Request.CustomCommand(["WATCH", key]), + Request.Watch([key]), Request.CustomCommand([ValkeyCommand.LINDEX.ToString(), key, index.ToString()]), ]; @@ -588,7 +588,7 @@ private string GetComparisonString() internal sealed override List CreateCommands() => [ - Request.CustomCommand(["WATCH", key]), + Request.Watch([key]), Request.CustomCommand([cmd.ToString(), key]), ]; @@ -624,7 +624,7 @@ private string GetComparisonString() internal sealed override List CreateCommands() => [ - Request.CustomCommand(["WATCH", key]), + Request.Watch([key]), Request.CustomCommand(["ZCOUNT", key, min, max]), ]; @@ -658,7 +658,7 @@ public override string ToString() internal sealed override List CreateCommands() => [ - Request.CustomCommand(["WATCH", key]), + Request.Watch([key]), Request.CustomCommand(["ZCOUNT", key, sortedSetScore, sortedSetScore]), ]; diff --git a/sources/Valkey.Glide/Commands/ITransactionBaseCommands.cs b/sources/Valkey.Glide/Commands/ITransactionBaseCommands.cs new file mode 100644 index 0000000..913af14 --- /dev/null +++ b/sources/Valkey.Glide/Commands/ITransactionBaseCommands.cs @@ -0,0 +1,53 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +namespace Valkey.Glide.Commands; + +/// +/// Supports commands for the "Transaction Commands" group for standalone and cluster clients. +///
+/// See more on valkey.io. +///
+public interface ITransactionBaseCommands +{ + /// + /// Marks the given keys to be watched for conditional execution of a transaction. Transactions + /// will only execute commands if the watched keys are not modified before execution of the + /// transaction. + /// + /// The keys to watch. + /// The flags to use for this operation. Currently flags are ignored. + /// "OK" if the keys were successfully watched. + /// + /// + /// In cluster mode, if keys in map to different hash slots, the command + /// will be split across these slots and executed separately for each. This means the command + /// is atomic only at the slot level. If one or more slot-specific requests fail, the entire + /// call will return the first encountered error, even though some requests may have succeeded + /// while others did not. If this behavior impacts your application logic, consider splitting + /// the request into sub-requests per slot to ensure atomicity. + /// + /// + /// + /// bool result = await client.WatchAsync(["sampleKey"]); + /// // result is "OK" + /// + /// // Execute transaction + /// var batch = new Batch(true) + /// .StringSetAsync("sampleKey", "foobar"); + /// object[] transactionResult = await client.Exec(batch, false); + /// // transactionResult is not null if transaction executed successfully + /// + /// // Watch key again + /// await client.WatchAsync(["sampleKey"]); + /// var batch2 = new Batch(true) + /// .StringSetAsync("sampleKey", "foobar"); + /// // Modify the watched key from another client/connection + /// await client.StringSetAsync("sampleKey", "hello world"); + /// object[] transactionResult2 = await client.Exec(batch2, true); + /// // transactionResult2 is null because the watched key was modified + /// + /// + /// + /// + Task WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None); +} \ No newline at end of file diff --git a/sources/Valkey.Glide/Commands/ITransactionClusterCommands.cs b/sources/Valkey.Glide/Commands/ITransactionClusterCommands.cs new file mode 100644 index 0000000..5fe6790 --- /dev/null +++ b/sources/Valkey.Glide/Commands/ITransactionClusterCommands.cs @@ -0,0 +1,46 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +namespace Valkey.Glide.Commands; + +/// +/// Supports commands for the "Transaction Commands" group for cluster clients. +///
+/// See more on valkey.io. +///
+public interface ITransactionClusterCommands : ITransactionBaseCommands +{ + /// + /// Flushes all the previously watched keys for a transaction. Executing a transaction will + /// automatically flush all previously watched keys. + /// The command will be routed to all primary nodes. + /// + /// The flags to use for this operation. Currently flags are ignored. + /// "OK" if the keys were successfully unwatched. + /// + /// + /// await client.WatchAsync(["sampleKey"]); + /// string result = await client.UnwatchAsync(); + /// // result is "OK", "sampleKey" is no longer watched on all primary nodes + /// + /// + /// + Task UnwatchAsync(CommandFlags flags = CommandFlags.None); + + /// + /// Flushes all the previously watched keys for a transaction. Executing a transaction will + /// automatically flush all previously watched keys. + /// + /// Specifies the routing configuration for the command. The client will route the + /// command to the nodes defined by . + /// The flags to use for this operation. Currently flags are ignored. + /// "OK" if the keys were successfully unwatched. + /// + /// + /// await client.WatchAsync(["sampleKey"]); + /// string result = await client.UnwatchAsync(Route.AllPrimaries); + /// // result is "OK", "sampleKey" is no longer watched on all primary nodes + /// + /// + /// + Task UnwatchAsync(Route route, CommandFlags flags = CommandFlags.None); +} \ No newline at end of file diff --git a/sources/Valkey.Glide/Commands/ITransactionCommands.cs b/sources/Valkey.Glide/Commands/ITransactionCommands.cs new file mode 100644 index 0000000..8d31437 --- /dev/null +++ b/sources/Valkey.Glide/Commands/ITransactionCommands.cs @@ -0,0 +1,27 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +namespace Valkey.Glide.Commands; + +/// +/// Supports commands for the "Transaction Commands" group for standalone clients. +///
+/// See more on valkey.io. +///
+public interface ITransactionCommands : ITransactionBaseCommands +{ + /// + /// Flushes all the previously watched keys for a transaction. Executing a transaction will + /// automatically flush all previously watched keys. + /// + /// The flags to use for this operation. Currently flags are ignored. + /// "OK" if the keys were successfully unwatched. + /// + /// + /// await client.WatchAsync(["sampleKey"]); + /// bool result = await client.UnwatchAsync(); + /// // result is "OK", "sampleKey" is no longer watched + /// + /// + /// + Task UnwatchAsync(CommandFlags flags = CommandFlags.None); +} \ No newline at end of file diff --git a/sources/Valkey.Glide/GlideClient.cs b/sources/Valkey.Glide/GlideClient.cs index 7b25adc..e0519ca 100644 --- a/sources/Valkey.Glide/GlideClient.cs +++ b/sources/Valkey.Glide/GlideClient.cs @@ -14,7 +14,7 @@ namespace Valkey.Glide; /// /// Client used for connection to standalone servers. Use to request a client. /// -public class GlideClient : BaseClient, IGenericCommands, IServerManagementCommands, IConnectionManagementCommands +public class GlideClient : BaseClient, IGenericCommands, IServerManagementCommands, IConnectionManagementCommands, ITransactionCommands { internal GlideClient() { } @@ -202,6 +202,18 @@ public async IAsyncEnumerable KeysAsync(int database = -1, ValkeyValu public async Task<(string cursor, ValkeyKey[] keys)> ScanAsync(string cursor, ScanOptions? options = null) => await Command(Request.ScanAsync(cursor, options)); + public async Task WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None) + { + Utils.Requires(flags == CommandFlags.None, "Command flags are not supported by GLIDE"); + return await Command(Request.Watch(keys)); + } + + public async Task UnwatchAsync(CommandFlags flags = CommandFlags.None) + { + Utils.Requires(flags == CommandFlags.None, "Command flags are not supported by GLIDE"); + return await Command(Request.Unwatch()); + } + protected override async Task GetServerVersionAsync() { if (_serverVersion == null) diff --git a/sources/Valkey.Glide/GlideClusterClient.cs b/sources/Valkey.Glide/GlideClusterClient.cs index e866a58..3bd4954 100644 --- a/sources/Valkey.Glide/GlideClusterClient.cs +++ b/sources/Valkey.Glide/GlideClusterClient.cs @@ -20,7 +20,7 @@ namespace Valkey.Glide; /// /// Client used for connection to cluster servers. Use to request a client. /// -public sealed class GlideClusterClient : BaseClient, IGenericClusterCommands, IServerManagementClusterCommands, IConnectionManagementClusterCommands +public sealed class GlideClusterClient : BaseClient, IGenericClusterCommands, IServerManagementClusterCommands, IConnectionManagementClusterCommands, ITransactionClusterCommands { private GlideClusterClient() { } @@ -305,6 +305,24 @@ public async Task SelectAsync(long index, CommandFlags flags = CommandFl return await Command(Request.Select(index), Route.Random); } + public async Task WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None) + { + Utils.Requires(flags == CommandFlags.None, "Command flags are not supported by GLIDE"); + return await Command(Request.Watch(keys)); + } + + public async Task UnwatchAsync(CommandFlags flags = CommandFlags.None) + { + Utils.Requires(flags == CommandFlags.None, "Command flags are not supported by GLIDE"); + return await Command(Request.Unwatch(), AllPrimaries); + } + + public async Task UnwatchAsync(Route route, CommandFlags flags = CommandFlags.None) + { + Utils.Requires(flags == CommandFlags.None, "Command flags are not supported by GLIDE"); + return await Command(Request.Unwatch(), route); + } + protected override async Task GetServerVersionAsync() { if (_serverVersion == null) diff --git a/sources/Valkey.Glide/Internals/Request.TransactionCommands.cs b/sources/Valkey.Glide/Internals/Request.TransactionCommands.cs new file mode 100644 index 0000000..15dca6a --- /dev/null +++ b/sources/Valkey.Glide/Internals/Request.TransactionCommands.cs @@ -0,0 +1,26 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +using static Valkey.Glide.Internals.FFI; + +namespace Valkey.Glide.Internals; + +internal partial class Request +{ + /// + /// Creates a command to watch keys for conditional execution of a transaction. + /// + /// The keys to watch. + /// A command that watches the specified keys. + public static Cmd Watch(ValkeyKey[] keys) + { + GlideString[] args = [.. keys.Select(k => (GlideString)k)]; + return new(RequestType.Watch, args, false, response => response); + } + + /// + /// Creates a command to flush all previously watched keys for a transaction. + /// + /// A command that unwatches all previously watched keys. + public static Cmd Unwatch() + => new(RequestType.UnWatch, [], false, response => response); +} \ No newline at end of file diff --git a/tests/Valkey.Glide.IntegrationTests/SharedBatchTests.cs b/tests/Valkey.Glide.IntegrationTests/SharedBatchTests.cs index 2f39288..faa5445 100644 --- a/tests/Valkey.Glide.IntegrationTests/SharedBatchTests.cs +++ b/tests/Valkey.Glide.IntegrationTests/SharedBatchTests.cs @@ -86,4 +86,109 @@ public async Task BatchDumpAndRestore(BaseClient client, bool isAtomic) ); } + + [Theory(DisableDiscoveryEnumeration = true)] + [MemberData(nameof(GetTestClientWithAtomic))] + public async Task WatchTransactionTest(BaseClient client, bool isAtomic) + { + string key1 = "{key}-1" + Guid.NewGuid(); + string key2 = "{key}-2" + Guid.NewGuid(); + string key3 = "{key}-3" + Guid.NewGuid(); + string foobarString = "foobar"; + string helloString = "hello"; + ValkeyKey[] keys = [key1, key2, key3]; + + bool isCluster = client is GlideClusterClient; + + // Returns null when a watched key is modified before transaction execution + string watchResult = isCluster + ? await ((GlideClusterClient)client).WatchAsync(keys) + : await ((GlideClient)client).WatchAsync(keys); + Assert.Equal("OK", watchResult); + + await client.StringSetAsync(key2, helloString); + + object?[]? execResult; + if (isCluster) + { + var clusterBatch = new ClusterBatch(true); + _ = clusterBatch.StringSetAsync(key1, foobarString) + .StringSetAsync(key2, foobarString) + .StringSetAsync(key3, foobarString); + execResult = await ((GlideClusterClient)client).Exec(clusterBatch, true); + } + else + { + var batch = new Batch(true); + _ = batch.StringSetAsync(key1, foobarString) + .StringSetAsync(key2, foobarString) + .StringSetAsync(key3, foobarString); + execResult = await ((GlideClient)client).Exec(batch, true); + } + + // The transaction should fail (return null) because key2 was modified after being watched + Assert.Null(execResult); + + // Verify the key values: transaction was aborted, so only key2 (set before transaction) should have a value + var key1Value = await client.StringGetAsync(key1); + var key2Value = await client.StringGetAsync(key2); + var key3Value = await client.StringGetAsync(key3); + + Assert.True(key1Value.IsNull); // key1 should not be set + Assert.Equal(helloString, key2Value); // key2 should have the value set before transaction + Assert.True(key3Value.IsNull); // key3 should not be set + } + + [Theory(DisableDiscoveryEnumeration = true)] + [MemberData(nameof(GetTestClientWithAtomic))] + public async Task UnwatchTest(BaseClient client, bool isAtomic) + { + string key1 = "{key}-1" + Guid.NewGuid(); + string key2 = "{key}-2" + Guid.NewGuid(); + string foobarString = "foobar"; + string helloString = "hello"; + ValkeyKey[] keys = [key1, key2]; + + bool isCluster = client is GlideClusterClient; + + // UNWATCH returns OK when there are no watched keys + string unwatchResult = isCluster + ? await ((GlideClusterClient)client).UnwatchAsync() + : await ((GlideClient)client).UnwatchAsync(); + Assert.Equal("OK", unwatchResult); + + // Transaction executes successfully after modifying a watched key then calling UNWATCH + if (isCluster) + { + await ((GlideClusterClient)client).WatchAsync(keys); + } + else + { + await ((GlideClient)client).WatchAsync(keys); + } + await client.StringSetAsync(key2, helloString); + unwatchResult = isCluster + ? await ((GlideClusterClient)client).UnwatchAsync() + : await ((GlideClient)client).UnwatchAsync(); + Assert.Equal("OK", unwatchResult); + + object?[]? execResult; + if (isCluster) + { + var clusterBatch = new ClusterBatch(true); + _ = clusterBatch.StringSetAsync(key1, foobarString).StringSetAsync(key2, foobarString); + execResult = await ((GlideClusterClient)client).Exec(clusterBatch, true); + } + else + { + var batch = new Batch(true); + _ = batch.StringSetAsync(key1, foobarString).StringSetAsync(key2, foobarString); + execResult = await ((GlideClient)client).Exec(batch, true); + } + + Assert.NotNull(execResult); // Transaction should succeed after unwatch + Assert.Equal(2, execResult.Length); + Assert.Equal(foobarString, await client.StringGetAsync(key1)); + Assert.Equal(foobarString, await client.StringGetAsync(key2)); + } } diff --git a/tests/Valkey.Glide.UnitTests/CommandTests.cs b/tests/Valkey.Glide.UnitTests/CommandTests.cs index 30fee0e..d174000 100644 --- a/tests/Valkey.Glide.UnitTests/CommandTests.cs +++ b/tests/Valkey.Glide.UnitTests/CommandTests.cs @@ -623,7 +623,16 @@ public void ValidateCommandConverters() () => Assert.Equal(0L, Request.HyperLogLogLengthAsync("key").Converter(0L)), () => Assert.Equal(100L, Request.HyperLogLogLengthAsync(["key1", "key2"]).Converter(100L)), () => Assert.Equal("OK", Request.HyperLogLogMergeAsync("dest", "src1", "src2").Converter("OK")), - () => Assert.Equal("OK", Request.HyperLogLogMergeAsync("dest", ["src1", "src2"]).Converter("OK")) + () => Assert.Equal("OK", Request.HyperLogLogMergeAsync("dest", ["src1", "src2"]).Converter("OK")), + + // Transaction Commands + () => Assert.Equal(["WATCH", "key1"], Request.Watch(["key1"]).GetArgs()), + () => Assert.Equal(["WATCH", "key1", "key2", "key3"], Request.Watch(["key1", "key2", "key3"]).GetArgs()), + () => Assert.Equal(["UNWATCH"], Request.Unwatch().GetArgs()), + () => Assert.Equal("OK", Request.Watch(["key1"]).Converter("OK")), + () => Assert.Equal("ERROR", Request.Watch(["key1"]).Converter("ERROR")), + () => Assert.Equal("OK", Request.Unwatch().Converter("OK")), + () => Assert.Equal("ERROR", Request.Unwatch().Converter("ERROR")) ); }