diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs index 64e06ef6b4..fae29527a9 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs @@ -4,6 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Threading; namespace System.Reactive.Linq.ObservableImpl { @@ -24,6 +25,12 @@ public SkipUntil(IObservable source, IObservable other) internal sealed class _ : IdentitySink { + IDisposable _mainDisposable; + IDisposable _otherDisposable; + volatile bool _forward; + int _halfSerializer; + Exception _error; + public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { @@ -31,95 +38,128 @@ public _(IObserver observer, IDisposable cancel) public IDisposable Run(SkipUntil parent) { - var sourceObserver = new SourceObserver(this); - var otherObserver = new OtherObserver(this, sourceObserver); - - var otherSubscription = parent._other.SubscribeSafe(otherObserver); - var sourceSubscription = parent._source.SubscribeSafe(sourceObserver); - - sourceObserver.Disposable = sourceSubscription; - otherObserver.Disposable = otherSubscription; - - return StableCompositeDisposable.Create( - sourceSubscription, - otherSubscription - ); + Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this))); + + Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this)); + + return this; } - private sealed class SourceObserver : IObserver + protected override void Dispose(bool disposing) { - private readonly _ _parent; - public volatile bool _forward; - private readonly SingleAssignmentDisposable _subscription; - - public SourceObserver(_ parent) + if (disposing) { - _parent = parent; - _subscription = new SingleAssignmentDisposable(); + DisposeMain(); + if (!Disposable.GetIsDisposed(ref _otherDisposable)) + { + Disposable.TryDispose(ref _otherDisposable); + } } - public IDisposable Disposable + base.Dispose(disposing); + } + + void DisposeMain() + { + if (!Disposable.GetIsDisposed(ref _mainDisposable)) { - set { _subscription.Disposable = value; } + Disposable.TryDispose(ref _mainDisposable); } + } - public void OnNext(TSource value) + public override void OnNext(TSource value) + { + if (_forward) { - if (_forward) - _parent.ForwardOnNext(value); + if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0) + { + ForwardOnNext(value); + if (Interlocked.Decrement(ref _halfSerializer) != 0) + { + var ex = _error; + _error = SkipUntilTerminalException.Instance; + ForwardOnError(ex); + } + } } + } - public void OnError(Exception error) + public override void OnError(Exception ex) + { + if (Interlocked.CompareExchange(ref _error, ex, null) == null) { - _parent.ForwardOnError(error); + if (Interlocked.Increment(ref _halfSerializer) == 1) + { + _error = SkipUntilTerminalException.Instance; + ForwardOnError(ex); + } } + } - public void OnCompleted() + public override void OnCompleted() + { + if (_forward) { - if (_forward) - _parent.ForwardOnCompleted(); - - _subscription.Dispose(); // We can't cancel the other stream yet, it may be on its way to dispatch an OnError message and we don't want to have a race. + if (Interlocked.CompareExchange(ref _error, SkipUntilTerminalException.Instance, null) == null) + { + if (Interlocked.Increment(ref _halfSerializer) == 1) + { + ForwardOnCompleted(); + } + } + } + else + { + DisposeMain(); } } - private sealed class OtherObserver : IObserver + void OtherComplete() + { + _forward = true; + } + + sealed class OtherObserver : IObserver, IDisposable { - private readonly _ _parent; - private readonly SourceObserver _sourceObserver; - private readonly SingleAssignmentDisposable _subscription; + readonly _ _parent; - public OtherObserver(_ parent, SourceObserver sourceObserver) + public OtherObserver(_ parent) { _parent = parent; - _sourceObserver = sourceObserver; - _subscription = new SingleAssignmentDisposable(); } - public IDisposable Disposable + public void Dispose() { - set { _subscription.Disposable = value; } + if (!Disposable.GetIsDisposed(ref _parent._otherDisposable)) + { + Disposable.TryDispose(ref _parent._otherDisposable); + } } - public void OnNext(TOther value) + public void OnCompleted() { - _sourceObserver._forward = true; - _subscription.Dispose(); + Dispose(); } public void OnError(Exception error) { - _parent.ForwardOnError(error); + _parent.OnError(error); } - public void OnCompleted() + public void OnNext(TOther value) { - _subscription.Dispose(); + _parent.OtherComplete(); + Dispose(); } } } } + internal static class SkipUntilTerminalException + { + internal static readonly Exception Instance = new Exception("No further exceptions"); + } + internal sealed class SkipUntil : Producer._> { private readonly IObservable _source;