diff --git a/src/Libraries/Microsoft.Extensions.Diagnostics.Testing/Logging/FakeLogCollector.LogEnumeration.cs b/src/Libraries/Microsoft.Extensions.Diagnostics.Testing/Logging/FakeLogCollector.LogEnumeration.cs new file mode 100644 index 00000000000..76a8b604b03 --- /dev/null +++ b/src/Libraries/Microsoft.Extensions.Diagnostics.Testing/Logging/FakeLogCollector.LogEnumeration.cs @@ -0,0 +1,189 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Extensions.Logging.Testing; + +public partial class FakeLogCollector +{ + private int _recordCollectionVersion; + + private TaskCompletionSource _logEnumerationSharedWaiter = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + private int _waitingEnumeratorCount; + + public IAsyncEnumerable GetLogsAsync(CancellationToken cancellationToken = default) + => new LogAsyncEnumerable(this, cancellationToken); + + private class LogAsyncEnumerable : IAsyncEnumerable + { + private readonly FakeLogCollector _collector; + private readonly CancellationToken _enumerableCancellationToken; + + internal LogAsyncEnumerable( + FakeLogCollector collector, + CancellationToken enumerableCancellationToken) + { + _collector = collector; + _enumerableCancellationToken = enumerableCancellationToken; + } + + public IAsyncEnumerator GetAsyncEnumerator( + CancellationToken enumeratorCancellationToken = default) + => new StreamEnumerator(_collector, _enumerableCancellationToken, enumeratorCancellationToken); + } + + private sealed class StreamEnumerator : IAsyncEnumerator + { + private readonly FakeLogCollector _collector; + private readonly CancellationTokenSource _mainCts; + + private FakeLogRecord? _current; + private int _index; + private bool _disposed; + private int _observedRecordCollectionVersion; + + // Concurrent MoveNextAsync guard + private int _moveNextActive; // 0 = inactive, 1 = active (int type used for net462 compatibility) + + public StreamEnumerator( + FakeLogCollector collector, + CancellationToken enumerableCancellationToken, + CancellationToken enumeratorCancellationToken) + { + _collector = collector; + _mainCts = enumerableCancellationToken.CanBeCanceled || enumeratorCancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource([enumerableCancellationToken, enumeratorCancellationToken]) + : new CancellationTokenSource(); + _observedRecordCollectionVersion = collector._recordCollectionVersion; + } + + public FakeLogRecord Current => _current ?? throw new InvalidOperationException("Enumeration not started."); + + public async ValueTask MoveNextAsync() + { + if (Interlocked.CompareExchange(ref _moveNextActive, 1, 0) == 1) + { + throw new InvalidOperationException("MoveNextAsync is already in progress. Concurrent calls are not allowed."); + } + + try + { + ThrowIfDisposed(); + + var masterCancellationToken = _mainCts.Token; + + masterCancellationToken.ThrowIfCancellationRequested(); + + while (true) + { + TaskCompletionSource? waiter = null; + + try + { + masterCancellationToken.ThrowIfCancellationRequested(); + + lock (_collector._records) + { + if (_observedRecordCollectionVersion != _collector._recordCollectionVersion) + { + _index = 0; // based on assumption that version changed on full collection clear + _observedRecordCollectionVersion = _collector._recordCollectionVersion; + } + + if (_index < _collector._records.Count) + { + _current = _collector._records[_index++]; + return true; + } + + // waiter needs to be subscribed within records lock + // if not: more records could be added in the meantime and the waiter could be stuck waiting even though the index is behind the actual count + waiter = _collector._logEnumerationSharedWaiter; + _collector._waitingEnumeratorCount++; + } + + // Compatibility path for net462: emulate Task.WaitAsync(cancellationToken). + // After the wait is complete in normal flow, no need to decrement because the shared waiter will be swapped and counter reset. + await AwaitWithCancellationAsync(waiter.Task, masterCancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + if (waiter is not null) + { + lock (_collector._records) + { + if ( + _collector._waitingEnumeratorCount > 0 // counter can be zero during the cancellation path + && waiter == _collector._logEnumerationSharedWaiter // makes sure we adjust the counter for the same shared waiting session + ) + { + _collector._waitingEnumeratorCount--; + } + } + } + + throw; + } + } + + } + finally + { + Volatile.Write(ref _moveNextActive, 0); + } + } + + public ValueTask DisposeAsync() + { + if (_disposed) + { + return default; + } + + _disposed = true; + + _mainCts.Cancel(); + _mainCts.Dispose(); + + return default; + } + + private static async Task AwaitWithCancellationAsync(Task task, CancellationToken cancellationToken) + { + if (!cancellationToken.CanBeCanceled || task.IsCompleted) + { + await task.ConfigureAwait(false); + return; + } + + var cancelTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + CancellationTokenRegistration ctr = default; + try + { + ctr = cancellationToken.Register(static s => + ((TaskCompletionSource)s!).TrySetCanceled(), cancelTcs); + + var completed = await Task.WhenAny(task, cancelTcs.Task).ConfigureAwait(false); + await completed.ConfigureAwait(false); + } + finally + { + ctr.Dispose(); + } + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(StreamEnumerator)); + } + } + } +} diff --git a/src/Libraries/Microsoft.Extensions.Diagnostics.Testing/Logging/FakeLogCollector.cs b/src/Libraries/Microsoft.Extensions.Diagnostics.Testing/Logging/FakeLogCollector.cs index 24b9f933b9c..75366ee940f 100644 --- a/src/Libraries/Microsoft.Extensions.Diagnostics.Testing/Logging/FakeLogCollector.cs +++ b/src/Libraries/Microsoft.Extensions.Diagnostics.Testing/Logging/FakeLogCollector.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Threading.Tasks; using Microsoft.Extensions.Options; using Microsoft.Shared.Diagnostics; @@ -14,7 +15,7 @@ namespace Microsoft.Extensions.Logging.Testing; /// [DebuggerDisplay("Count = {Count}, LatestRecord = {LatestRecord}")] [DebuggerTypeProxy(typeof(FakeLogCollectorDebugView))] -public class FakeLogCollector +public partial class FakeLogCollector { private readonly List _records = []; private readonly FakeLogCollectorOptions _options; @@ -54,6 +55,7 @@ public void Clear() lock (_records) { _records.Clear(); + _recordCollectionVersion++; } } @@ -136,11 +138,23 @@ internal void AddRecord(FakeLogRecord record) return; } + TaskCompletionSource? logEnumerationSharedWaiterToWake = null; + lock (_records) { _records.Add(record); + + if (_waitingEnumeratorCount > 0) + { + logEnumerationSharedWaiterToWake = _logEnumerationSharedWaiter; + _logEnumerationSharedWaiter = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _waitingEnumeratorCount = 0; + } } + // it is possible the task was already completed, but it does not matter and we can avoid locking + _ = logEnumerationSharedWaiterToWake?.TrySetResult(null); + _options.OutputSink?.Invoke(_options.OutputFormatter(record)); } diff --git a/test/Libraries/Microsoft.Extensions.Diagnostics.Testing.Tests/Logging/FakeLogCollectorTests.Waiting.cs b/test/Libraries/Microsoft.Extensions.Diagnostics.Testing.Tests/Logging/FakeLogCollectorTests.Waiting.cs new file mode 100644 index 00000000000..f7e09783220 --- /dev/null +++ b/test/Libraries/Microsoft.Extensions.Diagnostics.Testing.Tests/Logging/FakeLogCollectorTests.Waiting.cs @@ -0,0 +1,175 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.Extensions.Logging.Testing.Test.Logging; + +public partial class FakeLogCollectorTests +{ + private readonly ITestOutputHelper _outputHelper; + + public FakeLogCollectorTests(ITestOutputHelper outputHelper) + { + _outputHelper = outputHelper; + } + + [Theory] + [InlineData(true, false)] + [InlineData(false, true)] + public async Task LogAwaitingDemo(bool arrivesInAwaitedOrder, bool expectedToCancel) + { + var collector = FakeLogCollector.Create(new FakeLogCollectorOptions()); + var eventTracker = new ConcurrentQueue(); + + var waitingTimeout = TimeSpan.FromMilliseconds(1_000); + + string[] logsToEmit = arrivesInAwaitedOrder + ? ["Sync", "Log A", "LogC", "Sync", "Sync", "Log B", "Sync", "Sync", "Log C", "Sync", "Sync", "Sync"] + : ["Sync", "Log A", "LogC", "Sync", "Sync", "Log B", "Sync", "Sync", "Log C", "Log D"] // Log C after A, B, C progression is not followed by Sync + ; + + var logEmittingTask = EmitLogs(collector, logsToEmit, eventTracker); + + var res = await AwaitSequence( + new Queue(["Log A", "Log B", "Sync"]), // Wait for event A and B followed by Sync + fromIndex: 0, + collector, + eventTracker, + timeout: waitingTimeout); + + Assert.False(res.wasCancelled); + Assert.Equal(6, res.index); + + // This gap simulates an action on the tested running code expected to trigger event C followed by Sync + await Task.Delay(2_000); + + res = await AwaitSequence( + new Queue(["Log C", "Sync"]), // Wait for Log C followed by Sync + fromIndex: res.index + 1, + collector, + eventTracker, + timeout: waitingTimeout); + + Assert.Equal(expectedToCancel, res.wasCancelled); + Assert.Equal(expectedToCancel ? -1 : 9, res.index); + + await logEmittingTask; + + if (!expectedToCancel) + { + // The user may want to await partial states to perform actions, but then perform a sanity check on the whole history + var snapshot = collector.GetSnapshot(); + + var expectedProgression = new Queue(["Log A", "Log B", "Sync", "Log C", "Sync"]); + foreach (var item in snapshot.Select(x => x.Message)) + { + if (expectedProgression.Count == 0) + { + break; + } + + if (item == expectedProgression.Peek()) + { + expectedProgression.Dequeue(); + } + } + + Assert.Empty(expectedProgression); + } + + OutputEventTracker(_outputHelper, eventTracker); + } + + private static async Task<(bool wasCancelled, int index)> AwaitSequence( + Queue sequence, + int fromIndex, + FakeLogCollector collector, + ConcurrentQueue eventTracker, + TimeSpan? timeout = null) + { + using var cts = timeout.HasValue ? new CancellationTokenSource(timeout.Value) : new CancellationTokenSource(); + try + { + int index = -1; + var enumeration = collector.GetLogsAsync(cancellationToken: cts.Token); + await foreach (var log in enumeration) + { + index++; + + if (index < fromIndex) + { + continue; + } + + var msg = log.Message; + var currentExpectation = sequence.Peek(); + + eventTracker.Enqueue($"Checking log: \"{msg}\"."); + + if (msg == currentExpectation) + { + sequence.Dequeue(); + + if (sequence.Count != 0) + { + continue; + } + + eventTracker.Enqueue($"Sequence satisfied at {DateTime.Now}"); + + return (false, index); + } + } + } + catch (OperationCanceledException) + { + eventTracker.Enqueue($"Operation cancelled at {DateTime.Now}"); + return (true, -1); + } + + throw new Exception("Enumeration was supposed to be unbound."); + } + + private static void OutputEventTracker(ITestOutputHelper testOutputHelper, ConcurrentQueue eventTracker) + { + while (eventTracker.TryDequeue(out var item)) + { + testOutputHelper.WriteLine(item); + } + } + + private async Task EmitLogs( + FakeLogCollector fakeLogCollector, + IEnumerable logsToEmit, + ConcurrentQueue eventTracker, + TimeSpan? delayBetweenEmissions = null) + { + var logger = new FakeLogger(fakeLogCollector); + + await Task.Run(async () => + { + eventTracker.Enqueue($"Started emitting logs at {DateTime.Now}"); + + foreach(var log in logsToEmit) + { + eventTracker.Enqueue($"Emitting item: \"{log}\" at {DateTime.Now}, currently items: {logger.Collector.Count}"); + logger.Log(LogLevel.Debug, log); + + if (delayBetweenEmissions.HasValue) + { + await Task.Delay(delayBetweenEmissions.Value, CancellationToken.None); + } + } + }); + + eventTracker.Enqueue($"Finished emitting logs at {DateTime.Now}"); + } +} diff --git a/test/Libraries/Microsoft.Extensions.Diagnostics.Testing.Tests/Logging/FakeLogCollectorTests.cs b/test/Libraries/Microsoft.Extensions.Diagnostics.Testing.Tests/Logging/FakeLogCollectorTests.cs index 69fd33600d6..63ce2a7e718 100644 --- a/test/Libraries/Microsoft.Extensions.Diagnostics.Testing.Tests/Logging/FakeLogCollectorTests.cs +++ b/test/Libraries/Microsoft.Extensions.Diagnostics.Testing.Tests/Logging/FakeLogCollectorTests.cs @@ -9,7 +9,7 @@ namespace Microsoft.Extensions.Logging.Testing.Test.Logging; -public class FakeLogCollectorTests +public partial class FakeLogCollectorTests { private class Output : ITestOutputHelper {