diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs index 5a8d1cda9c..c2acf4b925 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs @@ -24,7 +24,7 @@ public ObservablesMaxConcurrency(IObservable> sources, int protected override _ CreateSink(IObserver observer) => new _(_maxConcurrent, observer); - protected override void Run(_ sink) => sink.Run(this); + protected override void Run(_ sink) => sink.Run(_sources); internal sealed class _ : Sink, TSource> { @@ -36,28 +36,12 @@ public _(int maxConcurrent, IObserver observer) _maxConcurrent = maxConcurrent; } - private object _gate; - private Queue> _q; - private bool _isStopped; - private SingleAssignmentDisposable _sourceSubscription; - private CompositeDisposable _group; + private object _gate = new object(); + private Queue> _q = new Queue>(); + private volatile bool _isStopped; + private CompositeDisposable _group = new CompositeDisposable(); private int _activeCount = 0; - public void Run(ObservablesMaxConcurrency parent) - { - _gate = new object(); - _q = new Queue>(); - _isStopped = false; - _activeCount = 0; - - _group = new CompositeDisposable(); - _sourceSubscription = new SingleAssignmentDisposable(); - _sourceSubscription.Disposable = parent._sources.SubscribeSafe(this); - _group.Add(_sourceSubscription); - - SetUpstream(_group); - } - public override void OnNext(IObservable value) { lock (_gate) @@ -91,36 +75,42 @@ public override void OnCompleted() } else { - _sourceSubscription.Dispose(); + DisposeUpstream(); } } } + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + _group.Dispose(); + } + private void Subscribe(IObservable innerSource) { - var subscription = new SingleAssignmentDisposable(); - _group.Add(subscription); - subscription.Disposable = innerSource.SubscribeSafe(new InnerObserver(this, subscription)); + var innerObserver = new InnerObserver(this); + _group.Add(innerObserver); + innerObserver.SetResource(innerSource.SubscribeSafe(innerObserver)); } - private sealed class InnerObserver : IObserver + private sealed class InnerObserver : SafeObserver { private readonly _ _parent; - private readonly IDisposable _self; - public InnerObserver(_ parent, IDisposable self) + public InnerObserver(_ parent) { _parent = parent; - _self = self; } - public void OnNext(TSource value) + public override void OnNext(TSource value) { lock (_parent._gate) _parent.ForwardOnNext(value); } - public void OnError(Exception error) + public override void OnError(Exception error) { lock (_parent._gate) { @@ -128,9 +118,9 @@ public void OnError(Exception error) } } - public void OnCompleted() + public override void OnCompleted() { - _parent._group.Remove(_self); + _parent._group.Remove(this); lock (_parent._gate) { if (_parent._q.Count > 0) @@ -163,7 +153,7 @@ public Observables(IObservable> sources) protected override _ CreateSink(IObserver observer) => new _(observer); - protected override void Run(_ sink) => sink.Run(this); + protected override void Run(_ sink) => sink.Run(_sources); internal sealed class _ : Sink, TSource> { @@ -172,29 +162,15 @@ public _(IObserver observer) { } - private object _gate; - private bool _isStopped; - private CompositeDisposable _group; - private SingleAssignmentDisposable _sourceSubscription; - - public void Run(Observables parent) - { - _gate = new object(); - _isStopped = false; - _group = new CompositeDisposable(); - - _sourceSubscription = new SingleAssignmentDisposable(); - _group.Add(_sourceSubscription); - _sourceSubscription.Disposable = parent._sources.SubscribeSafe(this); - - SetUpstream(_group); - } + private object _gate = new object(); + private volatile bool _isStopped; + private CompositeDisposable _group = new CompositeDisposable(); public override void OnNext(IObservable value) { - var innerSubscription = new SingleAssignmentDisposable(); - _group.Add(innerSubscription); - innerSubscription.Disposable = value.SubscribeSafe(new InnerObserver(this, innerSubscription)); + var innerObserver = new InnerObserver(this); + _group.Add(innerObserver); + innerObserver.SetResource(value.SubscribeSafe(innerObserver)); } public override void OnError(Exception error) @@ -208,7 +184,7 @@ public override void OnError(Exception error) public override void OnCompleted() { _isStopped = true; - if (_group.Count == 1) + if (_group.Count == 0) { // // Notice there can be a race between OnCompleted of the source and any @@ -224,28 +200,34 @@ public override void OnCompleted() } else { - _sourceSubscription.Dispose(); + DisposeUpstream(); } } - private sealed class InnerObserver : IObserver + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + _group.Dispose(); + } + + private sealed class InnerObserver : SafeObserver { private readonly _ _parent; - private readonly IDisposable _self; - public InnerObserver(_ parent, IDisposable self) + public InnerObserver(_ parent) { _parent = parent; - _self = self; } - public void OnNext(TSource value) + public override void OnNext(TSource value) { lock (_parent._gate) _parent.ForwardOnNext(value); } - public void OnError(Exception error) + public override void OnError(Exception error) { lock (_parent._gate) { @@ -253,10 +235,10 @@ public void OnError(Exception error) } } - public void OnCompleted() + public override void OnCompleted() { - _parent._group.Remove(_self); - if (_parent._isStopped && _parent._group.Count == 1) + _parent._group.Remove(this); + if (_parent._isStopped && _parent._group.Count == 0) { // // Notice there can be a race between OnCompleted of the source and any @@ -286,7 +268,7 @@ public Tasks(IObservable> sources) protected override _ CreateSink(IObserver observer) => new _(observer); - protected override void Run(_ sink) => sink.Run(this); + protected override void Run(_ sink) => sink.Run(_sources); internal sealed class _ : Sink, TSource> { @@ -295,16 +277,8 @@ public _(IObserver observer) { } - private object _gate; - private volatile int _count; - - public void Run(Tasks parent) - { - _gate = new object(); - _count = 1; - - SetUpstream(parent._sources.SubscribeSafe(this)); - } + private object _gate = new object(); + private volatile int _count = 1; public override void OnNext(Task value) { @@ -315,7 +289,7 @@ public override void OnNext(Task value) } else { - value.ContinueWith(OnCompletedTask); + value.ContinueWith((t, @thisObject) => ((_)@thisObject).OnCompletedTask(t), this); } }