Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,8 @@ internal interface IScheduledObserver<T> : IObserver<T>, IDisposable
/// techniques to signal events to the downstream.
/// </summary>
/// <typeparam name="T">The element type of the sequence.</typeparam>
internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
internal sealed class ObserveOnObserverNew<T> : IdentitySink<T>
{
private readonly IObserver<T> _downstream;
private readonly IScheduler _scheduler;

/// <summary>
Expand All @@ -392,7 +391,6 @@ internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
/// </summary>
private readonly ISchedulerLongRunning _longRunning;
private readonly ConcurrentQueue<T> _queue;
private IDisposable _run;

/// <summary>
/// The current task representing a running drain operation.
Expand Down Expand Up @@ -421,54 +419,55 @@ internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
/// </summary>
private bool _disposed;

public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream)
public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream)
{
_downstream = downstream;
_scheduler = scheduler;
_longRunning = scheduler.AsLongRunning();
_queue = new ConcurrentQueue<T>();
}

public void Run(IObservable<T> source)
{
Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
}

public void Dispose()
protected override void Dispose(bool disposing)
{
Volatile.Write(ref _disposed, true);
Disposable.TryDispose(ref _task);
Disposable.TryDispose(ref _run);
Clear();

base.Dispose(disposing);
if (disposing)
{
Disposable.TryDispose(ref _task);
Clear(_queue);
}
}

/// <summary>
/// Remove remaining elements from the queue upon
/// cancellation or failure.
/// </summary>
private void Clear()
/// <param name="q">The queue to use. The argument ensures that the
/// _queue field is not re-read from memory unnecessarily
/// due to the memory barriers inside TryDequeue mandating it
/// despite the field is read-only.</param>
private void Clear(ConcurrentQueue<T> q)
{
var q = _queue;
while (q.TryDequeue(out var _))
{
;
}
}

public void OnCompleted()
public override void OnCompleted()
{
Volatile.Write(ref _done, true);
Schedule();
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
_error = error;
Volatile.Write(ref _done, true);
Schedule();
}

public void OnNext(T value)
public override void OnNext(T value)
{
_queue.Enqueue(value);
Schedule();
Expand All @@ -489,11 +488,11 @@ private void Schedule()
var longRunning = _longRunning;
if (longRunning != null)
{
newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING);
newTask.Disposable = longRunning.ScheduleLongRunning(this, DrainLongRunningAction);
}
else
{
newTask.Disposable = _scheduler.Schedule(this, DRAIN_SHORT_RUNNING);
newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
}
}

Expand All @@ -503,7 +502,7 @@ private void Schedule()
// is of a multi-consumer type.
if (Volatile.Read(ref _disposed))
{
Clear();
Clear(_queue);
}
}
}
Expand All @@ -513,15 +512,15 @@ private void Schedule()
/// Avoids creating a delegate that captures <code>this</code>
/// whenever the signals have to be drained.
/// </summary>
private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DRAIN_LONG_RUNNING =
private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DrainLongRunningAction =
(self, cancel) => self.DrainLongRunning();

/// <summary>
/// The static action to be scheduled on a simple scheduler.
/// Avoids creating a delegate that captures <code>this</code>
/// whenever the signals have to be drained.
/// </summary>
private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DRAIN_SHORT_RUNNING =
private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DrainShortRunningFunc =
(scheduler, self) => self.DrainShortRunning(scheduler);

/// <summary>
Expand All @@ -532,13 +531,13 @@ private void Schedule()
/// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
{
DrainStep(_queue, _downstream, false);
DrainStep(_queue, false);

if (Interlocked.Decrement(ref _wip) != 0)
{
// Don't return the disposable of Schedule() because that may chain together
// a long string of ScheduledItems causing StackOverflowException upon Dispose()
var d = recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING);
var d = recursiveScheduler.Schedule(this, DrainShortRunningFunc);
Disposable.TrySetMultiple(ref _task, d);
}
return Disposable.Empty;
Expand All @@ -550,18 +549,22 @@ private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
/// empty queue, issuing the appropriate signals to the
/// given downstream.
/// </summary>
/// <param name="q">The queue to use.</param>
/// <param name="downstream">The intended consumer of the events.</param>
/// <param name="q">The queue to use. The argument ensures that the
/// _queue field is not re-read from memory due to the memory barriers
/// inside TryDequeue mandating it despite the field is read-only.
/// In addition, the DrainStep is invoked from the DrainLongRunning's loop
/// so reading _queue inside this method would still incur the same barrier
/// overhead otherwise.</param>
/// <param name="delayError">Should the errors be delayed until all
/// queued items have been emitted to the downstream?</param>
/// <returns>True if the drain loop should stop.</returns>
private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delayError)
private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
{
// Check if the operator has been disposed
if (Volatile.Read(ref _disposed))
{
// cleanup residue items in the queue
Clear();
Clear(q);
return true;
}

Expand All @@ -577,13 +580,13 @@ private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delay
if (ex != null)
{
Volatile.Write(ref _disposed, true);
downstream.OnError(ex);
ForwardOnError(ex);
return true;
}
}

// get the next item from the queue if any
var empty = !_queue.TryDequeue(out var v);
var empty = !q.TryDequeue(out var v);

// the upstream called OnComplete and the queue is empty
// that means we are done, no further signals can happen
Expand All @@ -596,12 +599,12 @@ private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delay
// if not null, there was an OnError call
if (ex != null)
{
downstream.OnError(ex);
ForwardOnError(ex);
}
else
{
// otherwise, complete normally
downstream.OnCompleted();
ForwardOnCompleted();
}
return true;
}
Expand All @@ -612,7 +615,7 @@ private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delay
return true;
}
// emit the item
downstream.OnNext(v);
ForwardOnNext(v);

// keep looping
return false;
Expand All @@ -631,7 +634,6 @@ private void DrainLongRunning()
// that would force the re-read of these constant values
// from memory, regardless of readonly, afaik
var q = _queue;
var downstream = _downstream;

for (; ; )
{
Expand All @@ -640,7 +642,7 @@ private void DrainLongRunning()
// delayError: true - because of
// ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
// expects something that almost looks like full delayError
if (DrainStep(q, downstream, true))
if (DrainStep(q, true))
{
break;
}
Expand Down