From 8a05ef27042ad98d9edc11bd314dd3ae9fc19768 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 3 Jul 2018 14:29:36 +0200 Subject: [PATCH] 4.x: Upgrade the ObserveOn operator to IdentitySink, fix queue usage --- .../Internal/ScheduledObserver.cs | 76 ++++++++++--------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs b/Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs index dcd99d02dd..cb0b5554dd 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs @@ -381,9 +381,8 @@ internal interface IScheduledObserver : IObserver, IDisposable /// techniques to signal events to the downstream. /// /// The element type of the sequence. - internal sealed class ObserveOnObserverNew : IObserver, IDisposable + internal sealed class ObserveOnObserverNew : IdentitySink { - private readonly IObserver _downstream; private readonly IScheduler _scheduler; /// @@ -392,7 +391,6 @@ internal sealed class ObserveOnObserverNew : IObserver, IDisposable /// private readonly ISchedulerLongRunning _longRunning; private readonly ConcurrentQueue _queue; - private IDisposable _run; /// /// The current task representing a running drain operation. @@ -421,54 +419,55 @@ internal sealed class ObserveOnObserverNew : IObserver, IDisposable /// private bool _disposed; - public ObserveOnObserverNew(IScheduler scheduler, IObserver downstream) + public ObserveOnObserverNew(IScheduler scheduler, IObserver downstream) : base(downstream) { - _downstream = downstream; _scheduler = scheduler; _longRunning = scheduler.AsLongRunning(); _queue = new ConcurrentQueue(); } - public void Run(IObservable source) - { - Disposable.SetSingle(ref _run, source.SubscribeSafe(this)); - } - - public void Dispose() + protected override void Dispose(bool disposing) { Volatile.Write(ref _disposed, true); - Disposable.TryDispose(ref _task); - Disposable.TryDispose(ref _run); - Clear(); + + base.Dispose(disposing); + if (disposing) + { + Disposable.TryDispose(ref _task); + Clear(_queue); + } } /// /// Remove remaining elements from the queue upon /// cancellation or failure. /// - private void Clear() + /// The queue to use. The argument ensures that the + /// _queue field is not re-read from memory unnecessarily + /// due to the memory barriers inside TryDequeue mandating it + /// despite the field is read-only. + private void Clear(ConcurrentQueue q) { - var q = _queue; while (q.TryDequeue(out var _)) { ; } } - public void OnCompleted() + public override void OnCompleted() { Volatile.Write(ref _done, true); Schedule(); } - public void OnError(Exception error) + public override void OnError(Exception error) { _error = error; Volatile.Write(ref _done, true); Schedule(); } - public void OnNext(T value) + public override void OnNext(T value) { _queue.Enqueue(value); Schedule(); @@ -489,11 +488,11 @@ private void Schedule() var longRunning = _longRunning; if (longRunning != null) { - newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING); + newTask.Disposable = longRunning.ScheduleLongRunning(this, DrainLongRunningAction); } else { - newTask.Disposable = _scheduler.Schedule(this, DRAIN_SHORT_RUNNING); + newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc); } } @@ -503,7 +502,7 @@ private void Schedule() // is of a multi-consumer type. if (Volatile.Read(ref _disposed)) { - Clear(); + Clear(_queue); } } } @@ -513,7 +512,7 @@ private void Schedule() /// Avoids creating a delegate that captures this /// whenever the signals have to be drained. /// - private static readonly Action, ICancelable> DRAIN_LONG_RUNNING = + private static readonly Action, ICancelable> DrainLongRunningAction = (self, cancel) => self.DrainLongRunning(); /// @@ -521,7 +520,7 @@ private void Schedule() /// Avoids creating a delegate that captures this /// whenever the signals have to be drained. /// - private static readonly Func, IDisposable> DRAIN_SHORT_RUNNING = + private static readonly Func, IDisposable> DrainShortRunningFunc = (scheduler, self) => self.DrainShortRunning(scheduler); /// @@ -532,13 +531,13 @@ private void Schedule() /// The IDisposable of the recursively scheduled task or an empty disposable. private IDisposable DrainShortRunning(IScheduler recursiveScheduler) { - DrainStep(_queue, _downstream, false); + DrainStep(_queue, false); if (Interlocked.Decrement(ref _wip) != 0) { // Don't return the disposable of Schedule() because that may chain together // a long string of ScheduledItems causing StackOverflowException upon Dispose() - var d = recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING); + var d = recursiveScheduler.Schedule(this, DrainShortRunningFunc); Disposable.TrySetMultiple(ref _task, d); } return Disposable.Empty; @@ -550,18 +549,22 @@ private IDisposable DrainShortRunning(IScheduler recursiveScheduler) /// empty queue, issuing the appropriate signals to the /// given downstream. /// - /// The queue to use. - /// The intended consumer of the events. + /// The queue to use. The argument ensures that the + /// _queue field is not re-read from memory due to the memory barriers + /// inside TryDequeue mandating it despite the field is read-only. + /// In addition, the DrainStep is invoked from the DrainLongRunning's loop + /// so reading _queue inside this method would still incur the same barrier + /// overhead otherwise. /// Should the errors be delayed until all /// queued items have been emitted to the downstream? /// True if the drain loop should stop. - private bool DrainStep(ConcurrentQueue q, IObserver downstream, bool delayError) + private bool DrainStep(ConcurrentQueue q, bool delayError) { // Check if the operator has been disposed if (Volatile.Read(ref _disposed)) { // cleanup residue items in the queue - Clear(); + Clear(q); return true; } @@ -577,13 +580,13 @@ private bool DrainStep(ConcurrentQueue q, IObserver downstream, bool delay if (ex != null) { Volatile.Write(ref _disposed, true); - downstream.OnError(ex); + ForwardOnError(ex); return true; } } // get the next item from the queue if any - var empty = !_queue.TryDequeue(out var v); + var empty = !q.TryDequeue(out var v); // the upstream called OnComplete and the queue is empty // that means we are done, no further signals can happen @@ -596,12 +599,12 @@ private bool DrainStep(ConcurrentQueue q, IObserver downstream, bool delay // if not null, there was an OnError call if (ex != null) { - downstream.OnError(ex); + ForwardOnError(ex); } else { // otherwise, complete normally - downstream.OnCompleted(); + ForwardOnCompleted(); } return true; } @@ -612,7 +615,7 @@ private bool DrainStep(ConcurrentQueue q, IObserver downstream, bool delay return true; } // emit the item - downstream.OnNext(v); + ForwardOnNext(v); // keep looping return false; @@ -631,7 +634,6 @@ private void DrainLongRunning() // that would force the re-read of these constant values // from memory, regardless of readonly, afaik var q = _queue; - var downstream = _downstream; for (; ; ) { @@ -640,7 +642,7 @@ private void DrainLongRunning() // delayError: true - because of // ObserveOn_LongRunning_HoldUpDuringDispatchAndFail // expects something that almost looks like full delayError - if (DrainStep(q, downstream, true)) + if (DrainStep(q, true)) { break; }