Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ public Scheduler(IObservable<TSource> source, IScheduler scheduler)
protected override IDisposable Run(ObserveOnObserver<TSource> sink) => _source.SubscribeSafe(sink);
}

/// <summary>
/// The new ObserveOn operator run with an IScheduler in a lock-free manner.
/// </summary>
internal sealed class SchedulerNew : Producer<TSource, ObserveOnObserverNew<TSource>>
{
private readonly IObservable<TSource> _source;
private readonly IScheduler _scheduler;

public SchedulerNew(IObservable<TSource> source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
}

protected override ObserveOnObserverNew<TSource> CreateSink(IObserver<TSource> observer, IDisposable cancel) => new ObserveOnObserverNew<TSource>(_scheduler, observer, cancel);

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
protected override IDisposable Run(ObserveOnObserverNew<TSource> sink) => _source.SubscribeSafe(sink);
}

internal sealed class Context : Producer<TSource, Context._>
{
private readonly IObservable<TSource> _source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> sourc
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return new ObserveOn<TSource>.Scheduler(source, scheduler);
return new ObserveOn<TSource>.SchedulerNew(source, scheduler);
}

/// <summary>
Expand Down
278 changes: 278 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,282 @@ internal interface IScheduledObserver<T> : IObserver<T>, IDisposable
void EnsureActive();
void EnsureActive(int count);
}

/// <summary>
/// An ObserveOn operator implementation that uses lock-free
/// techniques to signal events to the downstream.
/// </summary>
/// <typeparam name="T">The element type of the sequence.</typeparam>
internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
{
readonly IObserver<T> downstream;

readonly IScheduler scheduler;

/// <summary>
/// If not null, the <see cref="scheduler"/> supports
/// long running tasks.
/// </summary>
readonly ISchedulerLongRunning longRunning;

readonly ConcurrentQueue<T> queue;

/// <summary>
/// The disposable of the upstream source.
/// </summary>
IDisposable upstream;

/// <summary>
/// The current task representing a running drain operation.
/// </summary>
IDisposable task;

/// <summary>
/// Indicates the work-in-progress state of this operator,
/// zero means no work is currently being done.
/// </summary>
int wip;

/// <summary>
/// If true, the upstream has issued OnCompleted.
/// </summary>
bool done;
/// <summary>
/// If <see cref="done"/> is true and this is non-null, the upstream
/// failed with an OnError.
/// </summary>
Exception error;

/// <summary>
/// Indicates a dispose has been requested.
/// </summary>
bool disposed;

public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream, IDisposable upstream)
{
this.downstream = downstream;
this.scheduler = scheduler;
this.longRunning = scheduler.AsLongRunning();
this.queue = new ConcurrentQueue<T>();
Volatile.Write(ref this.upstream, upstream);
}

public void Dispose()
{
Volatile.Write(ref disposed, true);
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
Interlocked.Exchange(ref task, BooleanDisposable.True)?.Dispose();
Clear();
}

/// <summary>
/// Remove remaining elements from the queue upon
/// cancellation or failure.
/// </summary>
void Clear()
{
var q = queue;
while (q.TryDequeue(out var _)) ;
}

public void OnCompleted()
{
Volatile.Write(ref done, true);
Schedule();
}

public void OnError(Exception error)
{
this.error = error;
Volatile.Write(ref done, true);
Schedule();
}

public void OnNext(T value)
{
queue.Enqueue(value);
Schedule();
}

/// <summary>
/// Submit the drain task via the appropriate scheduler if
/// there is no drain currently running (wip > 0).
/// </summary>
void Schedule()
{
if (Interlocked.Increment(ref wip) == 1)
{
var oldTask = Volatile.Read(ref task);

var newTask = new SingleAssignmentDisposable();

if (oldTask != BooleanDisposable.True
&& Interlocked.CompareExchange(ref task, newTask, oldTask) == oldTask)
{

var longRunning = this.longRunning;
if (longRunning != null)
{
newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING);
}
else
{
newTask.Disposable = scheduler.Schedule(this, DRAIN_SHORT_RUNNING);
}
}

// If there was a cancellation, clear the queue
// of items. This doesn't have to be inside the
// wip != 0 (exclusive) mode as the queue
// is of a multi-consumer type.
if (Volatile.Read(ref disposed))
{
Clear();
}
}
}

/// <summary>
/// The static action to be scheduled on a long running scheduler.
/// Avoids creating a delegate that captures <code>this</code>
/// whenever the signals have to be drained.
/// </summary>
static readonly Action<ObserveOnObserverNew<T>, ICancelable> DRAIN_LONG_RUNNING =
(self, cancel) => self.DrainLongRunning();

/// <summary>
/// The static action to be scheduled on a simple scheduler.
/// Avoids creating a delegate that captures <code>this</code>
/// whenever the signals have to be drained.
/// </summary>
static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DRAIN_SHORT_RUNNING =
(scheduler, self) => self.DrainShortRunning(scheduler);

/// <summary>
/// Emits at most one signal per run on a scheduler that doesn't like
/// long running tasks.
/// </summary>
/// <param name="recursiveScheduler">The scheduler to use for scheduling the next signal emission if necessary.</param>
/// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
IDisposable DrainShortRunning(IScheduler recursiveScheduler)
{
DrainStep(queue, downstream, false);

if (Interlocked.Decrement(ref wip) != 0)
{
return recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING);
}
return Disposable.Empty;
}

/// <summary>
/// Executes a drain step by checking the disposed state,
/// checking for the terminated state and for an
/// empty queue, issuing the approrpiate signals to the
/// given downstream.
/// </summary>
/// <param name="q">The queue to use.</param>
/// <param name="downstream">The intended consumer of the events.</param>
/// <param name="delayError">Should the errors be delayed until all
/// queued items have been emitted to the downstream?</param>
/// <returns>True if the drain loop should stop.</returns>
bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delayError)
{
// Check if the operator has been disposed
if (Volatile.Read(ref disposed))
{
// cleanup residue items in the queue
Clear();
return true;
}

// Has the upstream call OnCompleted?
var d = Volatile.Read(ref done);

if (d && !delayError)
{
// done = true happens before setting error
// this is safe to be a plain read
var ex = error;
// if not null, there was an OnError call
if (ex != null)
{
Volatile.Write(ref disposed, true);
downstream.OnError(ex);
return true;
}
}

// get the next item from the queue if any
var empty = !queue.TryDequeue(out var v);

// the upstream called OnComplete and the queue is empty
// that means we are done, no further signals can happen
if (d && empty)
{
Volatile.Write(ref disposed, true);
// done = true happens before setting error
// this is safe to be a plain read
var ex = error;
// if not null, there was an OnError call
if (ex != null)
{
downstream.OnError(ex);
}
else
{
// otherwise, complete normally
downstream.OnCompleted();
}
return true;
}
else
// the queue is empty and the upstream hasn't completed yet
if (empty)
{
return true;
}
// emit the item
downstream.OnNext(v);

// keep looping
return false;
}

/// <summary>
/// Emits as many signals as possible to the downstream observer
/// as this is executing a long-running scheduler so
/// it can occupy that thread as long as it needs to.
/// </summary>
void DrainLongRunning()
{
var missed = 1;

// read out fields upfront as the DrainStep uses atomics
// that would force the re-read of these constant values
// from memory, regardless of readonly, afaik
var q = queue;
var downstream = this.downstream;

for (; ; )
{
for (; ; )
{
// delayError: true - because of
// ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
// expects something that almost looks like full delayError
if (DrainStep(q, downstream, true))
{
break;
}
}

missed = Interlocked.Add(ref wip, -missed);
if (missed == 0)
{
break;
}
}
}
}
}