From 1ba11628a9543b39274da933289eacdd7eb09625 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 08:24:45 +0200 Subject: [PATCH 01/26] Use continuations to report tasks to support slow sources --- MoreLinq/Experimental/Await.cs | 165 +++++++++++---------------------- 1 file changed, 54 insertions(+), 111 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index c431f2ed0..1d02ba17b 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -429,7 +429,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered var enumerator = source.Index() - .Select(e => (e.Key, Item: e.Value, Task: evaluator(e.Value, cancellationToken))) + .Select(e => (e.Key, Item: e.Value, Task: Lazy(() => evaluator(e.Value, cancellationToken)))) .GetEnumerator(); IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings @@ -442,7 +442,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered enumerator, e => e.Task, notices, - (e, r) => (Notice.Result, (e.Key, e.Item, e.Task), default), + (e, r) => (Notice.Result, (e.Key, e.Item, e.Task.Value), default), ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), (Notice.End, default, default), maxConcurrency, cancellationTokenSource), @@ -531,11 +531,13 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered } } + static Lazy Lazy(Func valueFactory) => new Lazy(valueFactory, LazyThreadSafetyMode.None); + enum Notice { Result, Error, End } static async Task CollectToAsync( this IEnumerator e, - Func> taskSelector, + Func>> taskSelector, BlockingCollection collection, Func, TNotice> completionNoticeSelector, Func errorNoticeSelector, @@ -543,94 +545,69 @@ static async Task CollectToAsync( int maxConcurrency, CancellationTokenSource cancellationTokenSource) { - Reader reader = null; - try { - reader = new Reader(e); - var cancellationToken = cancellationTokenSource.Token; - var cancellationTaskSource = new TaskCompletionSource(); - cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); - - var tasks = new List<(T Item, Task Task)>(); - - for (var i = 0; i < maxConcurrency; i++) - { - if (!reader.TryRead(out var item)) - break; - tasks.Add((item, taskSelector(item))); - } + var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency); + var pendingCount = 1; // terminator - while (tasks.Count > 0) + while (e.MoveNext()) { - // Task.WaitAny is synchronous and blocking but allows the - // waiting to be cancelled via a CancellationToken. - // Task.WhenAny can be awaited so it is better since the - // thread won't be blocked and can return to the pool. - // However, it doesn't support cancellation so instead a - // task is built on top of the CancellationToken that - // completes when the CancellationToken trips. - // - // Also, Task.WhenAny returns the task (Task) object that - // completed but task objects may not be unique due to - // caching, e.g.: - // - // async Task Foo() => true; - // async Task Bar() => true; - // var foo = Foo(); - // var bar = Bar(); - // var same = foo.Equals(bar); // == true - // - // In this case, the task returned by Task.WhenAny will - // match `foo` and `bar`: - // - // var done = Task.WhenAny(foo, bar); - // - // Logically speaking, the uniqueness of a task does not - // matter but here it does, especially when Await (the main - // user of CollectAsync) needs to return results ordered. - // Fortunately, we compose our own task on top of the - // original that links each item with the task result and as - // a consequence generate new and unique task objects. - - var completedTask = await - Task.WhenAny(tasks.Select(it => (Task) it.Task).Concat(cancellationTaskSource.Task)) - .ConfigureAwait(continueOnCapturedContext: false); - - if (completedTask == cancellationTaskSource.Task) + try { - // Cancellation during the wait means the enumeration - // has been stopped by the user so the results of the - // remaining tasks are no longer needed. Those tasks - // should cancel as a result of sharing the same - // cancellation token and provided that they passed it - // on to any downstream asynchronous operations. Either - // way, this loop is done so exit hard here. - - return; + await semaphore.WaitAsync(cancellationToken); + } + catch (OperationCanceledException) + { + break; } - var i = tasks.FindIndex(it => it.Task.Equals(completedTask)); + if (cancellationToken.IsCancellationRequested) + break; - { - var (item, task) = tasks[i]; - tasks.RemoveAt(i); + Interlocked.Increment(ref pendingCount); - // Await the task rather than using its result directly - // to avoid having the task's exception bubble up as - // AggregateException if the task failed. + var item = e.Current; + var task = taskSelector(item).Value; - collection.Add(completionNoticeSelector(item, task)); - } + // Add a continutation that notifies completion of the task, + // along with the necessary housekeeping, in case it + // completes before maximum concurrency is reached. - { - if (reader.TryRead(out var item)) - tasks.Add((item, taskSelector(item))); - } + #pragma warning disable 4014 // https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/compiler-messages/cs4014 + + task.ContinueWith(cancellationToken: cancellationToken, + continuationOptions: TaskContinuationOptions.ExecuteSynchronously, + scheduler: TaskScheduler.Current, + continuationAction: t => + { + semaphore.Release(); + + // + // try + // { + // collection.Add(completionNoticeSelector(item, t)); + // } + // catch (Exception exception) + // { + // collection.Add(errorNoticeSelector(exception)); + // } + // + + if (cancellationToken.IsCancellationRequested) + return; + + collection.Add(completionNoticeSelector(item, t)); + + if (Interlocked.Decrement(ref pendingCount) == 0) + collection.Add(endNotice); + }); + + #pragma warning restore 4014 } - collection.Add(endNotice); + if (Interlocked.Decrement(ref pendingCount) == 0) + collection.Add(endNotice); } catch (Exception ex) { @@ -639,40 +616,6 @@ static async Task CollectToAsync( } finally { - reader?.Dispose(); - } - - collection.CompleteAdding(); - } - - sealed class Reader : IDisposable - { - IEnumerator _enumerator; - - public Reader(IEnumerator enumerator) => - _enumerator = enumerator; - - public bool TryRead(out T item) - { - var ended = false; - if (_enumerator == null || (ended = !_enumerator.MoveNext())) - { - if (ended) - Dispose(); - item = default; - return false; - } - - item = _enumerator.Current; - return true; - } - - public void Dispose() - { - var e = _enumerator; - if (e == null) - return; - _enumerator = null; e.Dispose(); } } From a30df54384e280dea58a411c0d5f3f65f9388af7 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 15:03:53 +0200 Subject: [PATCH 02/26] End notice unnecessary --- MoreLinq/Experimental/Await.cs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 1d02ba17b..3f48a7dd9 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -444,7 +444,6 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered notices, (e, r) => (Notice.Result, (e.Key, e.Item, e.Task.Value), default), ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), - (Notice.End, default, default), maxConcurrency, cancellationTokenSource), CancellationToken.None, TaskCreationOptions.DenyChildAttach, @@ -458,9 +457,6 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered if (kind == Notice.Error) error.Throw(); - if (kind == Notice.End) - break; - Debug.Assert(kind == Notice.Result); var (key, inp, value) = result; @@ -533,7 +529,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered static Lazy Lazy(Func valueFactory) => new Lazy(valueFactory, LazyThreadSafetyMode.None); - enum Notice { Result, Error, End } + enum Notice { Result, Error } static async Task CollectToAsync( this IEnumerator e, @@ -541,7 +537,6 @@ static async Task CollectToAsync( BlockingCollection collection, Func, TNotice> completionNoticeSelector, Func errorNoticeSelector, - TNotice endNotice, int maxConcurrency, CancellationTokenSource cancellationTokenSource) { @@ -600,14 +595,14 @@ static async Task CollectToAsync( collection.Add(completionNoticeSelector(item, t)); if (Interlocked.Decrement(ref pendingCount) == 0) - collection.Add(endNotice); + collection.CompleteAdding(); }); #pragma warning restore 4014 } if (Interlocked.Decrement(ref pendingCount) == 0) - collection.Add(endNotice); + collection.CompleteAdding(); } catch (Exception ex) { From d973670617ca0393b576204457945ef139541283 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 15:38:21 +0200 Subject: [PATCH 03/26] Refactor to observable --- MoreLinq/Experimental/Await.cs | 43 ++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 3f48a7dd9..51ef06e59 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -438,12 +438,12 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered { Task.Factory.StartNew( () => - CollectToAsync( - enumerator, - e => e.Task, - notices, - (e, r) => (Notice.Result, (e.Key, e.Item, e.Task.Value), default), - ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), + enumerator.StartAsync(e => e.Task, + (e, task) => (Notice.Result, (e.Key, e.Item, task), default), + new Observer<(Notice, (int, T, Task), ExceptionDispatchInfo)>( + notices.Add, + e => notices.Add((Notice.Error, default, ExceptionDispatchInfo.Capture(e))), + notices.CompleteAdding), maxConcurrency, cancellationTokenSource), CancellationToken.None, TaskCreationOptions.DenyChildAttach, @@ -527,16 +527,33 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered } } + sealed class Observer : IObserver + { + readonly Action _onNext; + readonly Action _onError; + readonly Action _onCompleted; + + public Observer(Action onNext, Action onError, Action onCompleted) + { + _onNext = onNext ?? throw new ArgumentNullException(nameof(onNext)); + _onError = onError ?? throw new ArgumentNullException(nameof(onError)); + _onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted)); + } + + public void OnNext(T value) => _onNext(value); + public void OnError(Exception error) => _onError(error); + public void OnCompleted() => _onCompleted(); + } + static Lazy Lazy(Func valueFactory) => new Lazy(valueFactory, LazyThreadSafetyMode.None); enum Notice { Result, Error } - static async Task CollectToAsync( + static async Task StartAsync( this IEnumerator e, Func>> taskSelector, - BlockingCollection collection, Func, TNotice> completionNoticeSelector, - Func errorNoticeSelector, + IObserver observer, int maxConcurrency, CancellationTokenSource cancellationTokenSource) { @@ -592,22 +609,22 @@ static async Task CollectToAsync( if (cancellationToken.IsCancellationRequested) return; - collection.Add(completionNoticeSelector(item, t)); + observer.OnNext(completionNoticeSelector(item, t)); if (Interlocked.Decrement(ref pendingCount) == 0) - collection.CompleteAdding(); + observer.OnCompleted(); }); #pragma warning restore 4014 } if (Interlocked.Decrement(ref pendingCount) == 0) - collection.CompleteAdding(); + observer.OnCompleted(); } catch (Exception ex) { cancellationTokenSource.Cancel(); - collection.Add(errorNoticeSelector(ex)); + observer.OnError(ex); } finally { From 86c94f9d78169c09fbc3c4c5a18e379dad7d5cf8 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 15:56:27 +0200 Subject: [PATCH 04/26] Move general helpers down --- MoreLinq/Experimental/Await.cs | 40 +++++++++++++++++----------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 51ef06e59..c84d4f44f 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -527,26 +527,6 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered } } - sealed class Observer : IObserver - { - readonly Action _onNext; - readonly Action _onError; - readonly Action _onCompleted; - - public Observer(Action onNext, Action onError, Action onCompleted) - { - _onNext = onNext ?? throw new ArgumentNullException(nameof(onNext)); - _onError = onError ?? throw new ArgumentNullException(nameof(onError)); - _onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted)); - } - - public void OnNext(T value) => _onNext(value); - public void OnError(Exception error) => _onError(error); - public void OnCompleted() => _onCompleted(); - } - - static Lazy Lazy(Func valueFactory) => new Lazy(valueFactory, LazyThreadSafetyMode.None); - enum Notice { Result, Error } static async Task StartAsync( @@ -664,6 +644,26 @@ public IAwaitQuery WithOptions(AwaitQueryOptions options) IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } + sealed class Observer : IObserver + { + readonly Action _onNext; + readonly Action _onError; + readonly Action _onCompleted; + + public Observer(Action onNext, Action onError, Action onCompleted) + { + _onNext = onNext ?? throw new ArgumentNullException(nameof(onNext)); + _onError = onError ?? throw new ArgumentNullException(nameof(onError)); + _onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted)); + } + + public void OnNext(T value) => _onNext(value); + public void OnError(Exception error) => _onError(error); + public void OnCompleted() => _onCompleted(); + } + + static Lazy Lazy(Func valueFactory) => new Lazy(valueFactory, LazyThreadSafetyMode.None); + static class TupleComparer { public static readonly IComparer<(T1, T2, T3)> Item1 = From 4a4e08dd11dbdf6d4bd7009b9beee073850b5132 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 16:00:13 +0200 Subject: [PATCH 05/26] To-do notes about exceptional cases Especially if we fail to notify under low memory conditions! --- MoreLinq/Experimental/Await.cs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index c84d4f44f..831dcd94a 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -575,35 +575,32 @@ static async Task StartAsync( { semaphore.Release(); - // - // try - // { - // collection.Add(completionNoticeSelector(item, t)); - // } - // catch (Exception exception) - // { - // collection.Add(errorNoticeSelector(exception)); - // } - // - if (cancellationToken.IsCancellationRequested) return; + // TODO Consider what happens if following fails observer.OnNext(completionNoticeSelector(item, t)); if (Interlocked.Decrement(ref pendingCount) == 0) + { + // TODO Consider what happens if following fails observer.OnCompleted(); + } }); #pragma warning restore 4014 } if (Interlocked.Decrement(ref pendingCount) == 0) + { + // TODO Consider what happens if following fails observer.OnCompleted(); + } } catch (Exception ex) { cancellationTokenSource.Cancel(); + // TODO Consider what happens if following fails observer.OnError(ex); } finally From 3111a902fc23440cae062daab4175376cf73e767 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 16:11:00 +0200 Subject: [PATCH 06/26] Add arg validation to StartAsync --- MoreLinq/Experimental/Await.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 831dcd94a..279d2d981 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -537,6 +537,12 @@ static async Task StartAsync( int maxConcurrency, CancellationTokenSource cancellationTokenSource) { + if (e == null) throw new ArgumentNullException(nameof(e)); + if (taskSelector == null) throw new ArgumentNullException(nameof(taskSelector)); + if (completionNoticeSelector == null) throw new ArgumentNullException(nameof(completionNoticeSelector)); + if (observer == null) throw new ArgumentNullException(nameof(observer)); + if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); + try { var cancellationToken = cancellationTokenSource.Token; From 688c2ee9130b11ca04f389f8c33a4a3e83cab823 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 16:14:53 +0200 Subject: [PATCH 07/26] Don't allocate semaphore if unbounded concurrency --- MoreLinq/Experimental/Await.cs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 279d2d981..d376701aa 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -416,11 +416,11 @@ public static IAwaitQuery AwaitCompletion( return AwaitQuery.Create( - options => _(options.MaxConcurrency ?? int.MaxValue, + options => _(options.MaxConcurrency, options.Scheduler ?? TaskScheduler.Default, options.PreserveOrder)); - IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered) + IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordered) { var notices = new BlockingCollection<(Notice, (int, T, Task), ExceptionDispatchInfo)>(); var cancellationTokenSource = new CancellationTokenSource(); @@ -534,7 +534,7 @@ static async Task StartAsync( Func>> taskSelector, Func, TNotice> completionNoticeSelector, IObserver observer, - int maxConcurrency, + int? maxConcurrency, CancellationTokenSource cancellationTokenSource) { if (e == null) throw new ArgumentNullException(nameof(e)); @@ -546,18 +546,23 @@ static async Task StartAsync( try { var cancellationToken = cancellationTokenSource.Token; - var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency); + var semaphore = maxConcurrency is int count + ? new SemaphoreSlim(count, count) + : null; var pendingCount = 1; // terminator while (e.MoveNext()) { - try + if (semaphore != null) { - await semaphore.WaitAsync(cancellationToken); - } - catch (OperationCanceledException) - { - break; + try + { + await semaphore.WaitAsync(cancellationToken); + } + catch (OperationCanceledException) + { + break; + } } if (cancellationToken.IsCancellationRequested) @@ -579,7 +584,7 @@ static async Task StartAsync( scheduler: TaskScheduler.Current, continuationAction: t => { - semaphore.Release(); + semaphore?.Release(); if (cancellationToken.IsCancellationRequested) return; From 616241b846f4cac3b1be3827ddd4e3d7f7d16fc0 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 17:12:28 +0200 Subject: [PATCH 08/26] Fold lazy start semantics into StartAsync This removes the need for Lazy allocations --- MoreLinq/Experimental/Await.cs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index d376701aa..79b38d60e 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -429,7 +429,6 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere var enumerator = source.Index() - .Select(e => (e.Key, Item: e.Value, Task: Lazy(() => evaluator(e.Value, cancellationToken)))) .GetEnumerator(); IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings @@ -438,8 +437,9 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere { Task.Factory.StartNew( () => - enumerator.StartAsync(e => e.Task, - (e, task) => (Notice.Result, (e.Key, e.Item, task), default), + enumerator.StartAsync( + e => evaluator(e.Value, cancellationToken), + (e, task) => (Notice.Result, (e.Key, e.Value, task), default), new Observer<(Notice, (int, T, Task), ExceptionDispatchInfo)>( notices.Add, e => notices.Add((Notice.Error, default, ExceptionDispatchInfo.Capture(e))), @@ -531,14 +531,14 @@ enum Notice { Result, Error } static async Task StartAsync( this IEnumerator e, - Func>> taskSelector, + Func> taskStarter, Func, TNotice> completionNoticeSelector, IObserver observer, int? maxConcurrency, CancellationTokenSource cancellationTokenSource) { if (e == null) throw new ArgumentNullException(nameof(e)); - if (taskSelector == null) throw new ArgumentNullException(nameof(taskSelector)); + if (taskStarter == null) throw new ArgumentNullException(nameof(taskStarter)); if (completionNoticeSelector == null) throw new ArgumentNullException(nameof(completionNoticeSelector)); if (observer == null) throw new ArgumentNullException(nameof(observer)); if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); @@ -571,7 +571,7 @@ static async Task StartAsync( Interlocked.Increment(ref pendingCount); var item = e.Current; - var task = taskSelector(item).Value; + var task = taskStarter(item); // Add a continutation that notifies completion of the task, // along with the necessary housekeeping, in case it @@ -670,8 +670,6 @@ public Observer(Action onNext, Action onError, Action onCompleted) public void OnCompleted() => _onCompleted(); } - static Lazy Lazy(Func valueFactory) => new Lazy(valueFactory, LazyThreadSafetyMode.None); - static class TupleComparer { public static readonly IComparer<(T1, T2, T3)> Item1 = From 4514060ca4834eee105e2f8fffade78d2187f808 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 21:56:43 +0200 Subject: [PATCH 09/26] Consolidate pending completion actions --- MoreLinq/Experimental/Await.cs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 79b38d60e..b82beaa38 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -551,6 +551,15 @@ static async Task StartAsync( : null; var pendingCount = 1; // terminator + void OnPendingCompleted() + { + if (Interlocked.Decrement(ref pendingCount) == 0) + { + // TODO Consider what happens if following fails + observer.OnCompleted(); + } + } + while (e.MoveNext()) { if (semaphore != null) @@ -592,21 +601,13 @@ static async Task StartAsync( // TODO Consider what happens if following fails observer.OnNext(completionNoticeSelector(item, t)); - if (Interlocked.Decrement(ref pendingCount) == 0) - { - // TODO Consider what happens if following fails - observer.OnCompleted(); - } + OnPendingCompleted(); }); #pragma warning restore 4014 } - if (Interlocked.Decrement(ref pendingCount) == 0) - { - // TODO Consider what happens if following fails - observer.OnCompleted(); - } + OnPendingCompleted(); } catch (Exception ex) { From da262bc8c1e58295a741f33f444862f23b50f1e8 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 22:44:05 +0200 Subject: [PATCH 10/26] Revert "Refactor to observable" This reverts commit d973670617ca0393b576204457945ef139541283. --- MoreLinq/Experimental/Await.cs | 45 +++++++++++----------------------- 1 file changed, 14 insertions(+), 31 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index b82beaa38..f6019f24d 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -437,13 +437,12 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere { Task.Factory.StartNew( () => - enumerator.StartAsync( + enumerator.CollectToAsync( e => evaluator(e.Value, cancellationToken), - (e, task) => (Notice.Result, (e.Key, e.Value, task), default), - new Observer<(Notice, (int, T, Task), ExceptionDispatchInfo)>( - notices.Add, - e => notices.Add((Notice.Error, default, ExceptionDispatchInfo.Capture(e))), - notices.CompleteAdding), + notices, + (e, r) => (Notice.Result, (e.Key, e.Value, r), default), + ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), + (Notice.End, default, default), maxConcurrency, cancellationTokenSource), CancellationToken.None, TaskCreationOptions.DenyChildAttach, @@ -527,20 +526,22 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere } } - enum Notice { Result, Error } + enum Notice { End, Result, Error } - static async Task StartAsync( + static async Task CollectToAsync( this IEnumerator e, Func> taskStarter, + BlockingCollection collection, Func, TNotice> completionNoticeSelector, - IObserver observer, + Func errorNoticeSelector, + TNotice endNotice, int? maxConcurrency, CancellationTokenSource cancellationTokenSource) { if (e == null) throw new ArgumentNullException(nameof(e)); if (taskStarter == null) throw new ArgumentNullException(nameof(taskStarter)); if (completionNoticeSelector == null) throw new ArgumentNullException(nameof(completionNoticeSelector)); - if (observer == null) throw new ArgumentNullException(nameof(observer)); + if (errorNoticeSelector == null) throw new ArgumentNullException(nameof(errorNoticeSelector)); if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); try @@ -556,7 +557,7 @@ void OnPendingCompleted() if (Interlocked.Decrement(ref pendingCount) == 0) { // TODO Consider what happens if following fails - observer.OnCompleted(); + collection.Add(endNotice); } } @@ -599,7 +600,7 @@ void OnPendingCompleted() return; // TODO Consider what happens if following fails - observer.OnNext(completionNoticeSelector(item, t)); + collection.Add(completionNoticeSelector(item, t)); OnPendingCompleted(); }); @@ -613,7 +614,7 @@ void OnPendingCompleted() { cancellationTokenSource.Cancel(); // TODO Consider what happens if following fails - observer.OnError(ex); + collection.Add(errorNoticeSelector(ex)); } finally { @@ -653,24 +654,6 @@ public IAwaitQuery WithOptions(AwaitQueryOptions options) IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } - sealed class Observer : IObserver - { - readonly Action _onNext; - readonly Action _onError; - readonly Action _onCompleted; - - public Observer(Action onNext, Action onError, Action onCompleted) - { - _onNext = onNext ?? throw new ArgumentNullException(nameof(onNext)); - _onError = onError ?? throw new ArgumentNullException(nameof(onError)); - _onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted)); - } - - public void OnNext(T value) => _onNext(value); - public void OnError(Exception error) => _onError(error); - public void OnCompleted() => _onCompleted(); - } - static class TupleComparer { public static readonly IComparer<(T1, T2, T3)> Item1 = From 03419d3ef5069083bd496532cd136265f0879562 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 22:51:31 +0200 Subject: [PATCH 11/26] Don't treat cancellation as completion; return early --- MoreLinq/Experimental/Await.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index f6019f24d..05a26e388 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -571,12 +571,12 @@ void OnPendingCompleted() } catch (OperationCanceledException) { - break; + return; } } if (cancellationToken.IsCancellationRequested) - break; + return; Interlocked.Increment(ref pendingCount); From 91f48963cf70f8bddadf75ffdcc0e8ffc8de9602 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 22:54:07 +0200 Subject: [PATCH 12/26] Filter token when catching OperationCanceledException --- MoreLinq/Experimental/Await.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 05a26e388..de54f2946 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -569,7 +569,7 @@ void OnPendingCompleted() { await semaphore.WaitAsync(cancellationToken); } - catch (OperationCanceledException) + catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken) { return; } From 98ddd418825e60a4e03f191d694ea3fdd89f8f3e Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 22:57:47 +0200 Subject: [PATCH 13/26] Add back (missing) end notice break --- MoreLinq/Experimental/Await.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index de54f2946..f8ba54a4c 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -456,6 +456,9 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere if (kind == Notice.Error) error.Throw(); + if (kind == Notice.End) + break; + Debug.Assert(kind == Notice.Result); var (key, inp, value) = result; From cf71a25481bb9f0038cb800d8da7f5933af0e735 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 28 May 2018 23:23:38 +0200 Subject: [PATCH 14/26] Crititcal error handling --- MoreLinq/Experimental/Await.cs | 50 ++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index f8ba54a4c..123f81387 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -427,6 +427,9 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere var cancellationToken = cancellationTokenSource.Token; var completed = false; + Exception lastCriticalError = null; + var consumerCancellationTokenSource = new CancellationTokenSource(); + var enumerator = source.Index() .GetEnumerator(); @@ -442,6 +445,14 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere notices, (e, r) => (Notice.Result, (e.Key, e.Value, r), default), ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), + ex => + { + lastCriticalError = ex; + // Don't use ExceptionDispatchInfo.Capture + // to avoid inducing allocations if already + // under low memory conditions. + consumerCancellationTokenSource.Cancel(); + }, (Notice.End, default, default), maxConcurrency, cancellationTokenSource), CancellationToken.None, @@ -451,8 +462,20 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere var nextKey = 0; var holds = ordered ? new List<(int, T, Task)>() : null; - foreach (var (kind, result, error) in notices.GetConsumingEnumerable()) + using (var e = notices.GetConsumingEnumerable(consumerCancellationTokenSource.Token).GetEnumerator()) + while (true) { + try + { + if (!e.MoveNext()) + break; + } + catch (OperationCanceledException ex) when (ex.CancellationToken == consumerCancellationTokenSource.Token) + { + throw new Exception("A critical error has occurred.", lastCriticalError); + } + + var (kind, result, error) = e.Current; if (kind == Notice.Error) error.Throw(); @@ -537,6 +560,7 @@ static async Task CollectToAsync( BlockingCollection collection, Func, TNotice> completionNoticeSelector, Func errorNoticeSelector, + Action onCriticalError, TNotice endNotice, int? maxConcurrency, CancellationTokenSource cancellationTokenSource) @@ -547,6 +571,19 @@ static async Task CollectToAsync( if (errorNoticeSelector == null) throw new ArgumentNullException(nameof(errorNoticeSelector)); if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); + void PostNotice(TNotice notice) + { + try + { + collection.Add(notice); + } + catch (Exception ex) + { + onCriticalError(ex); + throw; + } + } + try { var cancellationToken = cancellationTokenSource.Token; @@ -558,10 +595,7 @@ static async Task CollectToAsync( void OnPendingCompleted() { if (Interlocked.Decrement(ref pendingCount) == 0) - { - // TODO Consider what happens if following fails - collection.Add(endNotice); - } + PostNotice(endNotice); } while (e.MoveNext()) @@ -602,8 +636,7 @@ void OnPendingCompleted() if (cancellationToken.IsCancellationRequested) return; - // TODO Consider what happens if following fails - collection.Add(completionNoticeSelector(item, t)); + PostNotice(completionNoticeSelector(item, t)); OnPendingCompleted(); }); @@ -616,8 +649,7 @@ void OnPendingCompleted() catch (Exception ex) { cancellationTokenSource.Cancel(); - // TODO Consider what happens if following fails - collection.Add(errorNoticeSelector(ex)); + PostNotice(errorNoticeSelector(ex)); } finally { From af05bd67dab681aed3d34324fe7f801b296fb70a Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 29 May 2018 09:08:35 +0200 Subject: [PATCH 15/26] Minor reivew/clean-up of formatting and names --- MoreLinq/Experimental/Await.cs | 46 +++++++++++++++++----------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 123f81387..ee1f8eb20 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -430,10 +430,7 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere Exception lastCriticalError = null; var consumerCancellationTokenSource = new CancellationTokenSource(); - var enumerator = - source.Index() - .GetEnumerator(); - + var enumerator = source.Index().GetEnumerator(); IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings try @@ -462,20 +459,22 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere var nextKey = 0; var holds = ordered ? new List<(int, T, Task)>() : null; - using (var e = notices.GetConsumingEnumerable(consumerCancellationTokenSource.Token).GetEnumerator()) + using (var notice = notices.GetConsumingEnumerable(consumerCancellationTokenSource.Token) + .GetEnumerator()) while (true) { try { - if (!e.MoveNext()) + if (!notice.MoveNext()) break; } - catch (OperationCanceledException ex) when (ex.CancellationToken == consumerCancellationTokenSource.Token) + catch (OperationCanceledException e) when (e.CancellationToken == consumerCancellationTokenSource.Token) { throw new Exception("A critical error has occurred.", lastCriticalError); } - var (kind, result, error) = e.Current; + var (kind, result, error) = notice.Current; + if (kind == Notice.Error) error.Throw(); @@ -555,7 +554,7 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere enum Notice { End, Result, Error } static async Task CollectToAsync( - this IEnumerator e, + this IEnumerator enumerator, Func> taskStarter, BlockingCollection collection, Func, TNotice> completionNoticeSelector, @@ -565,7 +564,7 @@ static async Task CollectToAsync( int? maxConcurrency, CancellationTokenSource cancellationTokenSource) { - if (e == null) throw new ArgumentNullException(nameof(e)); + if (enumerator == null) throw new ArgumentNullException(nameof(enumerator)); if (taskStarter == null) throw new ArgumentNullException(nameof(taskStarter)); if (completionNoticeSelector == null) throw new ArgumentNullException(nameof(completionNoticeSelector)); if (errorNoticeSelector == null) throw new ArgumentNullException(nameof(errorNoticeSelector)); @@ -577,19 +576,15 @@ void PostNotice(TNotice notice) { collection.Add(notice); } - catch (Exception ex) + catch (Exception e) { - onCriticalError(ex); + onCriticalError(e); throw; } } try { - var cancellationToken = cancellationTokenSource.Token; - var semaphore = maxConcurrency is int count - ? new SemaphoreSlim(count, count) - : null; var pendingCount = 1; // terminator void OnPendingCompleted() @@ -598,7 +593,13 @@ void OnPendingCompleted() PostNotice(endNotice); } - while (e.MoveNext()) + var cancellationToken = cancellationTokenSource.Token; + + var semaphore = maxConcurrency is int count + ? new SemaphoreSlim(count, count) + : null; + + while (enumerator.MoveNext()) { if (semaphore != null) { @@ -606,7 +607,7 @@ void OnPendingCompleted() { await semaphore.WaitAsync(cancellationToken); } - catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken) + catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) { return; } @@ -617,7 +618,7 @@ void OnPendingCompleted() Interlocked.Increment(ref pendingCount); - var item = e.Current; + var item = enumerator.Current; var task = taskStarter(item); // Add a continutation that notifies completion of the task, @@ -637,7 +638,6 @@ void OnPendingCompleted() return; PostNotice(completionNoticeSelector(item, t)); - OnPendingCompleted(); }); @@ -646,14 +646,14 @@ void OnPendingCompleted() OnPendingCompleted(); } - catch (Exception ex) + catch (Exception e) { cancellationTokenSource.Cancel(); - PostNotice(errorNoticeSelector(ex)); + PostNotice(errorNoticeSelector(e)); } finally { - e.Dispose(); + enumerator.Dispose(); } } From 5f6a23b3578f35b09de7a13f58b9ba3372a5f910 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 29 May 2018 09:57:16 +0200 Subject: [PATCH 16/26] Move notification & error handling in main iterator --- MoreLinq/Experimental/Await.cs | 116 ++++++++++++++++----------------- 1 file changed, 56 insertions(+), 60 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index ee1f8eb20..76747356a 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -422,36 +422,59 @@ public static IAwaitQuery AwaitCompletion( IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordered) { - var notices = new BlockingCollection<(Notice, (int, T, Task), ExceptionDispatchInfo)>(); var cancellationTokenSource = new CancellationTokenSource(); - var cancellationToken = cancellationTokenSource.Token; - var completed = false; - - Exception lastCriticalError = null; var consumerCancellationTokenSource = new CancellationTokenSource(); + Exception lastCriticalError = null; + + var notices = new BlockingCollection<(Notice, (int, T, Task), ExceptionDispatchInfo)>(); + + void PostNotice(Notice notice, + (int, T, Task) item, + ExceptionDispatchInfo error) + { + try + { + if (notice == Notice.Error) + cancellationTokenSource.Cancel(); + notices.Add((notice, item, error)); + } + catch (Exception e) + { + // Don't use ExceptionDispatchInfo.Capture to avoid + // inducing allocations if already under low memory + // conditions. + + lastCriticalError = e; + consumerCancellationTokenSource.Cancel(); + throw; + } + } + + var completed = false; var enumerator = source.Index().GetEnumerator(); IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings try { + var cancellationToken = cancellationTokenSource.Token; + Task.Factory.StartNew( - () => - enumerator.CollectToAsync( - e => evaluator(e.Value, cancellationToken), - notices, - (e, r) => (Notice.Result, (e.Key, e.Value, r), default), - ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), - ex => - { - lastCriticalError = ex; - // Don't use ExceptionDispatchInfo.Capture - // to avoid inducing allocations if already - // under low memory conditions. - consumerCancellationTokenSource.Cancel(); - }, - (Notice.End, default, default), - maxConcurrency, cancellationTokenSource), + async () => + { + try + { + await enumerator.StartAsync( + e => evaluator(e.Value, cancellationToken), + (e, r) => PostNotice(Notice.Result, (e.Key, e.Value, r), default), + () => PostNotice(Notice.End, default, default), + maxConcurrency, cancellationToken); + } + catch (Exception e) + { + PostNotice(Notice.Error, default, ExceptionDispatchInfo.Capture(e)); + } + }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler); @@ -553,48 +576,30 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere enum Notice { End, Result, Error } - static async Task CollectToAsync( + static async Task StartAsync( this IEnumerator enumerator, - Func> taskStarter, - BlockingCollection collection, - Func, TNotice> completionNoticeSelector, - Func errorNoticeSelector, - Action onCriticalError, - TNotice endNotice, + Func> starter, + Action> onCompletion, + Action onEnd, int? maxConcurrency, - CancellationTokenSource cancellationTokenSource) + CancellationToken cancellationToken) { if (enumerator == null) throw new ArgumentNullException(nameof(enumerator)); - if (taskStarter == null) throw new ArgumentNullException(nameof(taskStarter)); - if (completionNoticeSelector == null) throw new ArgumentNullException(nameof(completionNoticeSelector)); - if (errorNoticeSelector == null) throw new ArgumentNullException(nameof(errorNoticeSelector)); + if (starter == null) throw new ArgumentNullException(nameof(starter)); + if (onCompletion == null) throw new ArgumentNullException(nameof(onCompletion)); + if (onEnd == null) throw new ArgumentNullException(nameof(onEnd)); if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); - void PostNotice(TNotice notice) - { - try - { - collection.Add(notice); - } - catch (Exception e) - { - onCriticalError(e); - throw; - } - } - - try + using (enumerator) { var pendingCount = 1; // terminator void OnPendingCompleted() { if (Interlocked.Decrement(ref pendingCount) == 0) - PostNotice(endNotice); + onEnd(); } - var cancellationToken = cancellationTokenSource.Token; - var semaphore = maxConcurrency is int count ? new SemaphoreSlim(count, count) : null; @@ -619,7 +624,7 @@ void OnPendingCompleted() Interlocked.Increment(ref pendingCount); var item = enumerator.Current; - var task = taskStarter(item); + var task = starter(item); // Add a continutation that notifies completion of the task, // along with the necessary housekeeping, in case it @@ -637,7 +642,7 @@ void OnPendingCompleted() if (cancellationToken.IsCancellationRequested) return; - PostNotice(completionNoticeSelector(item, t)); + onCompletion(item, t); OnPendingCompleted(); }); @@ -646,15 +651,6 @@ void OnPendingCompleted() OnPendingCompleted(); } - catch (Exception e) - { - cancellationTokenSource.Cancel(); - PostNotice(errorNoticeSelector(e)); - } - finally - { - enumerator.Dispose(); - } } static class AwaitQuery From 5c759e6669154a4e0199709d2b1e790da1dc4f1d Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 29 May 2018 10:43:00 +0200 Subject: [PATCH 17/26] Remove reundant cancellation for simplicity --- MoreLinq/Experimental/Await.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 76747356a..e664a77d1 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -422,7 +422,6 @@ public static IAwaitQuery AwaitCompletion( IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordered) { - var cancellationTokenSource = new CancellationTokenSource(); var consumerCancellationTokenSource = new CancellationTokenSource(); Exception lastCriticalError = null; @@ -434,8 +433,6 @@ void PostNotice(Notice notice, { try { - if (notice == Notice.Error) - cancellationTokenSource.Cancel(); notices.Add((notice, item, error)); } catch (Exception e) @@ -451,6 +448,7 @@ void PostNotice(Notice notice, } var completed = false; + var cancellationTokenSource = new CancellationTokenSource(); var enumerator = source.Index().GetEnumerator(); IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings From 69d5341de3c47dceff63423db1d56ab610104c23 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 29 May 2018 10:48:19 +0200 Subject: [PATCH 18/26] Move EDI capture into critical section --- MoreLinq/Experimental/Await.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index e664a77d1..7205a42d3 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -429,11 +429,14 @@ IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordere void PostNotice(Notice notice, (int, T, Task) item, - ExceptionDispatchInfo error) + Exception error) { try { - notices.Add((notice, item, error)); + var edi = error != null + ? ExceptionDispatchInfo.Capture(error) + : null; + notices.Add((notice, item, edi)); } catch (Exception e) { @@ -470,7 +473,7 @@ await enumerator.StartAsync( } catch (Exception e) { - PostNotice(Notice.Error, default, ExceptionDispatchInfo.Capture(e)); + PostNotice(Notice.Error, default, e); } }, CancellationToken.None, From 8dbdcbebdf290b3bfd1a76ae911a7b3bcc493125 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 29 May 2018 11:27:30 +0200 Subject: [PATCH 19/26] Capture original error with error notification failure --- MoreLinq/Experimental/Await.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 7205a42d3..765e1a1ee 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -423,7 +423,7 @@ public static IAwaitQuery AwaitCompletion( IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordered) { var consumerCancellationTokenSource = new CancellationTokenSource(); - Exception lastCriticalError = null; + (Exception, Exception) lastCriticalErrors = default; var notices = new BlockingCollection<(Notice, (int, T, Task), ExceptionDispatchInfo)>(); @@ -444,7 +444,7 @@ void PostNotice(Notice notice, // inducing allocations if already under low memory // conditions. - lastCriticalError = e; + lastCriticalErrors = (e, error); consumerCancellationTokenSource.Cancel(); throw; } @@ -494,7 +494,10 @@ await enumerator.StartAsync( } catch (OperationCanceledException e) when (e.CancellationToken == consumerCancellationTokenSource.Token) { - throw new Exception("A critical error has occurred.", lastCriticalError); + var (error1, error2) = lastCriticalErrors; + throw new Exception("One or more critical errors have occurred.", + error2 != null ? new AggregateException(error1, error2) + : new AggregateException(error1)); } var (kind, result, error) = notice.Current; From 57212f24336486d64293890937d6b8d0248c9867 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 29 May 2018 14:34:39 +0200 Subject: [PATCH 20/26] Lots of comments --- MoreLinq/Experimental/Await.cs | 38 +++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 765e1a1ee..94c1d10d9 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -422,15 +422,39 @@ public static IAwaitQuery AwaitCompletion( IEnumerable _(int? maxConcurrency, TaskScheduler scheduler, bool ordered) { - var consumerCancellationTokenSource = new CancellationTokenSource(); - (Exception, Exception) lastCriticalErrors = default; + // A separate task will enumerate the source and launch tasks. + // It will post all progress as notices to the collection below. + // A notice is essentially a discriminated union like: + // + // type Notice<'a, 'b> = + // | End + // | Result of (int * 'a * Task<'b>) + // | Error of ExceptionDispatchInfo + // + // Note that BlockingCollection.CompleteAdding is never used to + // to mark the end (which its own notice above) because + // BlockingCollection.Add throws if called after CompleteAdding + // and we want to deliberately tolerate the race condition. var notices = new BlockingCollection<(Notice, (int, T, Task), ExceptionDispatchInfo)>(); + var consumerCancellationTokenSource = new CancellationTokenSource(); + (Exception, Exception) lastCriticalErrors = default; + void PostNotice(Notice notice, (int, T, Task) item, Exception error) { + // If a notice fails to post then assume critical error + // conditions (like low memory), capture the error without + // further allocation of resources and trip the cancellation + // token source used by the main loop waiting on notices. + // Note that only the "last" critical error is reported + // as maintaining a list would incur allocations. The idea + // here is to make a best effort attempt to report any of + // the error conditions that may be occuring, which is still + // better than nothing. + try { var edi = error != null @@ -440,7 +464,7 @@ void PostNotice(Notice notice, } catch (Exception e) { - // Don't use ExceptionDispatchInfo.Capture to avoid + // Don't use ExceptionDispatchInfo.Capture here to avoid // inducing allocations if already under low memory // conditions. @@ -460,6 +484,11 @@ void PostNotice(Notice notice, { var cancellationToken = cancellationTokenSource.Token; + // Fire-up a parallel loop to iterate through the source and + // launch tasks, posting a result-notice as each task + // completes and another, an end-notice, when all tasks have + // completed. + Task.Factory.StartNew( async () => { @@ -480,6 +509,9 @@ await enumerator.StartAsync( TaskCreationOptions.DenyChildAttach, scheduler); + // Remainde here is the main loop that waits for and + // processes notices. + var nextKey = 0; var holds = ordered ? new List<(int, T, Task)>() : null; From bda7187a372e71b3823046d58bbcaefc96ec95e9 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 29 May 2018 22:19:08 +0200 Subject: [PATCH 21/26] Fix typo (s/Remainde/Remainder/) --- MoreLinq/Experimental/Await.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 94c1d10d9..e56d0cd74 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -509,7 +509,7 @@ await enumerator.StartAsync( TaskCreationOptions.DenyChildAttach, scheduler); - // Remainde here is the main loop that waits for and + // Remainder here is the main loop that waits for and // processes notices. var nextKey = 0; From e956b461d198a601d0cc891f4a821bcfc9adb03f Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 29 May 2018 23:10:57 +0200 Subject: [PATCH 22/26] Model a concurrency gate around the semaphore --- MoreLinq/Experimental/Await.cs | 57 ++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index e56d0cd74..0dbffb4f5 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -636,22 +636,20 @@ void OnPendingCompleted() onEnd(); } - var semaphore = maxConcurrency is int count - ? new SemaphoreSlim(count, count) - : null; + var concurrencyGate + = maxConcurrency is int count + ? new ConcurrencyGate(count) + : ConcurrencyGate.Unbounded; while (enumerator.MoveNext()) { - if (semaphore != null) + try { - try - { - await semaphore.WaitAsync(cancellationToken); - } - catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) - { - return; - } + await concurrencyGate.EnterAsync(cancellationToken); + } + catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) + { + return; } if (cancellationToken.IsCancellationRequested) @@ -673,7 +671,7 @@ void OnPendingCompleted() scheduler: TaskScheduler.Current, continuationAction: t => { - semaphore?.Release(); + concurrencyGate.Exit(); if (cancellationToken.IsCancellationRequested) return; @@ -732,6 +730,39 @@ static class TupleComparer public static readonly IComparer<(T1, T2, T3)> Item3 = Comparer<(T1, T2, T3)>.Create((x, y) => Comparer.Default.Compare(x.Item3, y.Item3)); } + + sealed class ConcurrencyGate + { + public static readonly ConcurrencyGate Unbounded = new ConcurrencyGate(); + + static readonly Task CompletedTask; + + static ConcurrencyGate() + { + #if NET451 || NETSTANDARD1_0 + + var tcs = new TaskCompletionSource(); + tcs.SetResult(null); + CompletedTask = tcs.Task; + + #else + + CompletedTask = Task.CompletedTask; + + #endif + } + + readonly SemaphoreSlim _semaphore; + + ConcurrencyGate(SemaphoreSlim semaphore = null) => + _semaphore = semaphore; + + public ConcurrencyGate(int max) : + this(new SemaphoreSlim(max, max)) {} + + public Task EnterAsync(CancellationToken token) => _semaphore?.WaitAsync(token) ?? CompletedTask; + public void Exit() => _semaphore?.Release(); + } } } From ec6e365c1d01bde0aa97521be8567b6887a48f22 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 30 May 2018 00:21:34 +0200 Subject: [PATCH 23/26] Re-format conditional expression --- MoreLinq/Experimental/Await.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 0dbffb4f5..167ba45a8 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -636,10 +636,9 @@ void OnPendingCompleted() onEnd(); } - var concurrencyGate - = maxConcurrency is int count - ? new ConcurrencyGate(count) - : ConcurrencyGate.Unbounded; + var concurrencyGate = maxConcurrency is int count + ? new ConcurrencyGate(count) + : ConcurrencyGate.Unbounded; while (enumerator.MoveNext()) { From 467ba7c701596d4e9d6765b2baf2f670b99e2fee Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 30 May 2018 00:24:45 +0200 Subject: [PATCH 24/26] Move CompletedTask singleton into own class This unties its initialization with ConcurrencyGate. --- MoreLinq/Experimental/Await.cs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 167ba45a8..49ced60a3 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -730,26 +730,29 @@ static class TupleComparer Comparer<(T1, T2, T3)>.Create((x, y) => Comparer.Default.Compare(x.Item3, y.Item3)); } - sealed class ConcurrencyGate + static class CompletedTask { - public static readonly ConcurrencyGate Unbounded = new ConcurrencyGate(); + #if NET451 || NETSTANDARD1_0 - static readonly Task CompletedTask; + public static readonly Task Instance; - static ConcurrencyGate() + static CompletedTask() { - #if NET451 || NETSTANDARD1_0 - var tcs = new TaskCompletionSource(); tcs.SetResult(null); - CompletedTask = tcs.Task; + Instance = tcs.Task; + } - #else + #else - CompletedTask = Task.CompletedTask; + public static readonly Task Instance = Task.CompletedTask; - #endif - } + #endif + } + + sealed class ConcurrencyGate + { + public static readonly ConcurrencyGate Unbounded = new ConcurrencyGate(); readonly SemaphoreSlim _semaphore; @@ -759,7 +762,7 @@ static ConcurrencyGate() public ConcurrencyGate(int max) : this(new SemaphoreSlim(max, max)) {} - public Task EnterAsync(CancellationToken token) => _semaphore?.WaitAsync(token) ?? CompletedTask; + public Task EnterAsync(CancellationToken token) => _semaphore?.WaitAsync(token) ?? CompletedTask.Instance; public void Exit() => _semaphore?.Release(); } } From 4a5cfd32905ad43bd82a2c17a279f37cd3fc7e9d Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 30 May 2018 00:25:58 +0200 Subject: [PATCH 25/26] Fix ConcurrencyGate.EnterAsync to handle cancellation --- MoreLinq/Experimental/Await.cs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 49ced60a3..f498e8468 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -651,9 +651,6 @@ void OnPendingCompleted() return; } - if (cancellationToken.IsCancellationRequested) - return; - Interlocked.Increment(ref pendingCount); var item = enumerator.Current; @@ -762,8 +759,19 @@ sealed class ConcurrencyGate public ConcurrencyGate(int max) : this(new SemaphoreSlim(max, max)) {} - public Task EnterAsync(CancellationToken token) => _semaphore?.WaitAsync(token) ?? CompletedTask.Instance; - public void Exit() => _semaphore?.Release(); + public Task EnterAsync(CancellationToken token) + { + if (_semaphore == null) + { + token.ThrowIfCancellationRequested(); + return CompletedTask.Instance; + } + + return _semaphore.WaitAsync(token); + } + + public void Exit() => + _semaphore?.Release(); } } } From 30478c00094271df8290dd9a250da7c35a1de907 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 30 May 2018 07:32:21 +0200 Subject: [PATCH 26/26] Rename task completion parameter for clarity --- MoreLinq/Experimental/Await.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index f498e8468..229fccbcb 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -615,14 +615,14 @@ enum Notice { End, Result, Error } static async Task StartAsync( this IEnumerator enumerator, Func> starter, - Action> onCompletion, + Action> onTaskCompletion, Action onEnd, int? maxConcurrency, CancellationToken cancellationToken) { if (enumerator == null) throw new ArgumentNullException(nameof(enumerator)); if (starter == null) throw new ArgumentNullException(nameof(starter)); - if (onCompletion == null) throw new ArgumentNullException(nameof(onCompletion)); + if (onTaskCompletion == null) throw new ArgumentNullException(nameof(onTaskCompletion)); if (onEnd == null) throw new ArgumentNullException(nameof(onEnd)); if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); @@ -672,7 +672,7 @@ void OnPendingCompleted() if (cancellationToken.IsCancellationRequested) return; - onCompletion(item, t); + onTaskCompletion(item, t); OnPendingCompleted(); });