Skip to content

CancellationToken in ToAsyncEnumerable can cause NullReferenceException #1726

@slavashar

Description

@slavashar

Bug

It look like there is a race condition in ToAsyncEnumerable which can cause NullReferenceException when the cancellation token is requested.

Which subcomponent library (Ix, Async.Ix)?

Ix

Which library version?

System.Linq.Async 6.0.1

What are the platform(s), environment(s) and related component version(s)?

Windows, .NET6

What is the use case or problem?

Cancellation of ObservableAsyncEnumerable.

What is the expected outcome?

OperationCanceledException is thrown.

What is the actual outcome?

NullReferenceException is thrown intermittently.

What is the stacktrace of the exception(s) if any?

 ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at System.Linq.AsyncEnumerable.ObservableAsyncEnumerable`1.MoveNextCore() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.Observable.cs:line 88
   at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 70
   at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 75
   at Program.<<Main>$>g__ConsumeAsync|0_2(Subject`1 subject, CancellationToken ct) in C:\Users\shara\source\repos\ToAsyncEnumerableTest\Program.cs:line 30

Do you have a code snippet or project that reproduces the problem?

using System.Reactive.Subjects;

Parallel.For(1, 10, _ => RunAsync().Wait());

async Task RunAsync()
{
    // Set custom context to avoid sychronous execution of enumerable continuation
    SynchronizationContext.SetSynchronizationContext(new CustomSynchronizationContext());

    using var subject = new Subject<int>();
    using var cts = new CancellationTokenSource();

    var task = ConsumeAsync(subject, cts.Token);

    subject.OnNext(1);
    cts.Cancel();

    try
    {
        await task;
    }
    catch (OperationCanceledException)
    {
        // ignore
    }
}

async Task ConsumeAsync(Subject<int> subject, CancellationToken ct)
{
    await foreach (var _ in subject.ToAsyncEnumerable().WithCancellation(ct))
    {
    }
}

internal class CustomSynchronizationContext : SynchronizationContext
{
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions