Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 51 additions & 77 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public ObservablesMaxConcurrency(IObservable<IObservable<TSource>> sources, int

protected override _ CreateSink(IObserver<TSource> observer) => new _(_maxConcurrent, observer);

protected override void Run(_ sink) => sink.Run(this);
protected override void Run(_ sink) => sink.Run(_sources);

internal sealed class _ : Sink<IObservable<TSource>, TSource>
{
Expand All @@ -36,28 +36,12 @@ public _(int maxConcurrent, IObserver<TSource> observer)
_maxConcurrent = maxConcurrent;
}

private object _gate;
private Queue<IObservable<TSource>> _q;
private bool _isStopped;
private SingleAssignmentDisposable _sourceSubscription;
private CompositeDisposable _group;
private object _gate = new object();
private Queue<IObservable<TSource>> _q = new Queue<IObservable<TSource>>();
private volatile bool _isStopped;
private CompositeDisposable _group = new CompositeDisposable();
private int _activeCount = 0;

public void Run(ObservablesMaxConcurrency parent)
{
_gate = new object();
_q = new Queue<IObservable<TSource>>();
_isStopped = false;
_activeCount = 0;

_group = new CompositeDisposable();
_sourceSubscription = new SingleAssignmentDisposable();
_sourceSubscription.Disposable = parent._sources.SubscribeSafe(this);
_group.Add(_sourceSubscription);

SetUpstream(_group);
}

public override void OnNext(IObservable<TSource> value)
{
lock (_gate)
Expand Down Expand Up @@ -91,46 +75,52 @@ public override void OnCompleted()
}
else
{
_sourceSubscription.Dispose();
DisposeUpstream();
}
}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
_group.Dispose();
}

private void Subscribe(IObservable<TSource> innerSource)
{
var subscription = new SingleAssignmentDisposable();
_group.Add(subscription);
subscription.Disposable = innerSource.SubscribeSafe(new InnerObserver(this, subscription));
var innerObserver = new InnerObserver(this);
_group.Add(innerObserver);
innerObserver.SetResource(innerSource.SubscribeSafe(innerObserver));
}

private sealed class InnerObserver : IObserver<TSource>
private sealed class InnerObserver : SafeObserver<TSource>
{
private readonly _ _parent;
private readonly IDisposable _self;

public InnerObserver(_ parent, IDisposable self)
public InnerObserver(_ parent)
{
_parent = parent;
_self = self;
}

public void OnNext(TSource value)
public override void OnNext(TSource value)
{
lock (_parent._gate)
_parent.ForwardOnNext(value);
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
lock (_parent._gate)
{
_parent.ForwardOnError(error);
}
}

public void OnCompleted()
public override void OnCompleted()
{
_parent._group.Remove(_self);
_parent._group.Remove(this);
lock (_parent._gate)
{
if (_parent._q.Count > 0)
Expand Down Expand Up @@ -163,7 +153,7 @@ public Observables(IObservable<IObservable<TSource>> sources)

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);

protected override void Run(_ sink) => sink.Run(this);
protected override void Run(_ sink) => sink.Run(_sources);

internal sealed class _ : Sink<IObservable<TSource>, TSource>
{
Expand All @@ -172,29 +162,15 @@ public _(IObserver<TSource> observer)
{
}

private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;

public void Run(Observables parent)
{
_gate = new object();
_isStopped = false;
_group = new CompositeDisposable();

_sourceSubscription = new SingleAssignmentDisposable();
_group.Add(_sourceSubscription);
_sourceSubscription.Disposable = parent._sources.SubscribeSafe(this);

SetUpstream(_group);
}
private object _gate = new object();
private volatile bool _isStopped;
private CompositeDisposable _group = new CompositeDisposable();

public override void OnNext(IObservable<TSource> value)
{
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
innerSubscription.Disposable = value.SubscribeSafe(new InnerObserver(this, innerSubscription));
var innerObserver = new InnerObserver(this);
_group.Add(innerObserver);
innerObserver.SetResource(value.SubscribeSafe(innerObserver));
}

public override void OnError(Exception error)
Expand All @@ -208,7 +184,7 @@ public override void OnError(Exception error)
public override void OnCompleted()
{
_isStopped = true;
if (_group.Count == 1)
if (_group.Count == 0)
{
//
// Notice there can be a race between OnCompleted of the source and any
Expand All @@ -224,39 +200,45 @@ public override void OnCompleted()
}
else
{
_sourceSubscription.Dispose();
DisposeUpstream();
}
}

private sealed class InnerObserver : IObserver<TSource>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
_group.Dispose();
}

private sealed class InnerObserver : SafeObserver<TSource>
{
private readonly _ _parent;
private readonly IDisposable _self;

public InnerObserver(_ parent, IDisposable self)
public InnerObserver(_ parent)
{
_parent = parent;
_self = self;
}

public void OnNext(TSource value)
public override void OnNext(TSource value)
{
lock (_parent._gate)
_parent.ForwardOnNext(value);
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
lock (_parent._gate)
{
_parent.ForwardOnError(error);
}
}

public void OnCompleted()
public override void OnCompleted()
{
_parent._group.Remove(_self);
if (_parent._isStopped && _parent._group.Count == 1)
_parent._group.Remove(this);
if (_parent._isStopped && _parent._group.Count == 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same possible problem as with SelectMany: _isStopped is plain and can be reordered after _group.Count.

Copy link
Collaborator Author

@danielcweber danielcweber Jun 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any current open issues that could be explained by this going wrong sometimes? Same for SelectMany.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to post an issue which demonstrates this race condition?

Copy link
Collaborator Author

@danielcweber danielcweber Jun 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're fixing it, no I wouldn't suggest that. Let's ask @onovotny about whether we should put out another preview to have something to point people to, should someone encounter the issue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not trigger this specific reordering on my machine but I found another issue via #660.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's already on the fix branch ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main branch. This PR did not change the algorithm so the test should fail as well. You could try it in this PR as the test is standalone.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'll merge this, then make the failing test (and subsequent fix) a (failing) new WIP-PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried with your branch. #660 still fails.

{
//
// Notice there can be a race between OnCompleted of the source and any
Expand Down Expand Up @@ -286,7 +268,7 @@ public Tasks(IObservable<Task<TSource>> sources)

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);

protected override void Run(_ sink) => sink.Run(this);
protected override void Run(_ sink) => sink.Run(_sources);

internal sealed class _ : Sink<Task<TSource>, TSource>
{
Expand All @@ -295,16 +277,8 @@ public _(IObserver<TSource> observer)
{
}

private object _gate;
private volatile int _count;

public void Run(Tasks parent)
{
_gate = new object();
_count = 1;

SetUpstream(parent._sources.SubscribeSafe(this));
}
private object _gate = new object();
private volatile int _count = 1;

public override void OnNext(Task<TSource> value)
{
Expand All @@ -315,7 +289,7 @@ public override void OnNext(Task<TSource> value)
}
else
{
value.ContinueWith(OnCompletedTask);
value.ContinueWith((t, @thisObject) => ((_)@thisObject).OnCompletedTask(t), this);
}
}

Expand Down