Skip to content

Termination race condition in Merge/SelectMany #660

@akarnokd

Description

@akarnokd

The following test fails as the completion signal is not received. completed remains 0 even though all 3 participating observable is completed:

[Fact]
public void Merge_MainNext_InnerComplete_Race()
{
    for (var i = 0; i < 100000; i++)
    {
        var main = new Subject<IObservable<int>>();

        var subj1 = new Subject<int>();
        var subj2 = new Subject<int>();

        var completed = 0;

        // main.SelectMany(v => v).Subscribe(v => { }, () => completed++);
        Observable.Merge(main).Subscribe(v => { }, () => completed++);

        main.OnNext(subj1);

        var sync = 3;

        var cdl = new CountdownEvent(2);

        Task.Factory.StartNew(() =>
        {
            if (Interlocked.Decrement(ref sync) != 0)
            {
                while (Volatile.Read(ref sync) != 0)
                    ;
            }
            main.OnNext(subj2);
            main.OnCompleted();

            cdl.Signal();
        });

        Task.Factory.StartNew(() =>
        {
            if (Interlocked.Decrement(ref sync) != 0)
            {
                while (Volatile.Read(ref sync) != 0)
                    ;
            }
            subj2.OnCompleted();

            cdl.Signal();
        });

        if (Interlocked.Decrement(ref sync) != 0)
        {
            while (Volatile.Read(ref sync) != 0)
                ;
            subj1.OnCompleted();
        }

        Assert.True(cdl.Wait(5000), "Timeout!");

        Assert.Equal(1, completed);
    }
}

The same happens with SelectMany as they use the same logic as Merge but in a separate implementation.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions