diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs index fdf4b55145..e3c08526f6 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs @@ -36,7 +36,7 @@ public _(IObserver observer, IDisposable cancel) internal abstract class S : _ { protected readonly object _gate = new object(); - protected readonly SerialDisposable _cancelable = new SerialDisposable(); + protected IDisposable _cancelable; protected readonly IScheduler _scheduler; @@ -74,22 +74,31 @@ public override IDisposable Run(TParent parent) RunCore(parent); - var sourceSubscription = new SingleAssignmentDisposable(); - _sourceSubscription = sourceSubscription; - sourceSubscription.Disposable = parent._source.SubscribeSafe(this); + Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this)); - return StableCompositeDisposable.Create(_sourceSubscription, _cancelable); + return this; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Disposable.TryDispose(ref _sourceSubscription); + Disposable.TryDispose(ref _cancelable); + } + base.Dispose(disposing); } protected abstract void RunCore(TParent parent); public override void OnNext(TSource value) { - var next = _watch.Elapsed.Add(_delay); var shouldRun = false; lock (_gate) { + var next = _watch.Elapsed.Add(_delay); + _queue.Enqueue(new System.Reactive.TimeInterval(value, next)); shouldRun = _ready && !_active; @@ -98,13 +107,13 @@ public override void OnNext(TSource value) if (shouldRun) { - _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue); + Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(_delay, DrainQueue)); } } public override void OnError(Exception error) { - _sourceSubscription.Dispose(); + Disposable.TryDispose(ref _sourceSubscription); var shouldRun = false; @@ -126,13 +135,14 @@ public override void OnError(Exception error) public override void OnCompleted() { - _sourceSubscription.Dispose(); + Disposable.TryDispose(ref _sourceSubscription); - var next = _watch.Elapsed.Add(_delay); var shouldRun = false; lock (_gate) { + var next = _watch.Elapsed.Add(_delay); + _completeAt = next; _hasCompleted = true; @@ -142,7 +152,7 @@ public override void OnCompleted() if (shouldRun) { - _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue); + Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(_delay, DrainQueue)); } } @@ -256,7 +266,7 @@ protected void DrainQueue(Action recurse) protected abstract class L : _ { protected readonly object _gate = new object(); - protected readonly SerialDisposable _cancelable = new SerialDisposable(); + protected IDisposable _cancelable; private readonly SemaphoreSlim _evt = new SemaphoreSlim(0); private readonly IScheduler _scheduler; @@ -291,11 +301,19 @@ public override IDisposable Run(TParent parent) RunCore(parent); - var sourceSubscription = new SingleAssignmentDisposable(); - _sourceSubscription = sourceSubscription; - sourceSubscription.Disposable = parent._source.SubscribeSafe(this); + Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this)); - return StableCompositeDisposable.Create(_sourceSubscription, _cancelable); + return this; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Disposable.TryDispose(ref _sourceSubscription); + Disposable.TryDispose(ref _cancelable); + } + base.Dispose(disposing); } protected abstract void RunCore(TParent parent); @@ -303,17 +321,18 @@ public override IDisposable Run(TParent parent) protected void ScheduleDrain() { _stop = new CancellationTokenSource(); - _cancelable.Disposable = Disposable.Create(_stop.Cancel); + Disposable.TrySetSerial(ref _cancelable, Disposable.Create(_stop.Cancel)); _scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue); } public override void OnNext(TSource value) { - var next = _watch.Elapsed.Add(_delay); lock (_gate) { + var next = _watch.Elapsed.Add(_delay); + _queue.Enqueue(new System.Reactive.TimeInterval(value, next)); _evt.Release(); @@ -322,7 +341,7 @@ public override void OnNext(TSource value) public override void OnError(Exception error) { - _sourceSubscription.Dispose(); + Disposable.TryDispose(ref _sourceSubscription); lock (_gate) { @@ -337,12 +356,13 @@ public override void OnError(Exception error) public override void OnCompleted() { - _sourceSubscription.Dispose(); + Disposable.TryDispose(ref _sourceSubscription); - var next = _watch.Elapsed.Add(_delay); lock (_gate) { + var next = _watch.Elapsed.Add(_delay); + _completeAt = next; _hasCompleted = true; @@ -473,7 +493,7 @@ protected override void RunCore(Absolute parent) { _ready = false; - _cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start); + Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start)); } private void Start() @@ -507,7 +527,7 @@ private void Start() if (shouldRun) { - _cancelable.Disposable = _scheduler.Schedule(next, DrainQueue); + Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(next, DrainQueue)); } } } @@ -521,7 +541,10 @@ public L(Absolute parent, IObserver observer, IDisposable cancel) protected override void RunCore(Absolute parent) { - _cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start); + // ScheduleDrain might have already set a newer disposable + // using TrySetSerial would cancel it, stopping the emission + // and hang the consumer + Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start)); } private void Start()