Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 17 additions & 34 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,37 @@ public Base(IObservable<TSource> source, IScheduler scheduler)

internal abstract class _ : IdentitySink<TSource>
{
public _(IObserver<TSource> observer)
protected IStopwatch _watch;
protected IScheduler _scheduler;

public _(TParent parent, IObserver<TSource> observer)
: base(observer)
{
_scheduler = parent._scheduler;
}

public void Run(TParent parent)
{
_watch = _scheduler.StartStopwatch();

RunCore(parent);

base.Run(parent._source);
}

public abstract void Run(TParent parent);
protected abstract void RunCore(TParent parent);
}

internal abstract class S : _
{
protected readonly object _gate = new object();
protected IDisposable _cancelable;

protected readonly IScheduler _scheduler;

public S(TParent parent, IObserver<TSource> observer)
: base(observer)
: base(parent, observer)
{
_scheduler = parent._scheduler;
}

protected IStopwatch _watch;
protected TimeSpan _delay;
protected bool _ready;
protected bool _active;
Expand All @@ -58,15 +67,6 @@ public S(TParent parent, IObserver<TSource> observer)
private bool _hasFailed;
private Exception _exception;

public override void Run(TParent parent)
{
_watch = _scheduler.StartStopwatch();

RunCore(parent);

base.Run(parent._source);
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
Expand All @@ -77,8 +77,6 @@ protected override void Dispose(bool disposing)
}
}

protected abstract void RunCore(TParent parent);

public override void OnNext(TSource value)
{
var shouldRun = false;
Expand Down Expand Up @@ -257,15 +255,11 @@ protected abstract class L : _
protected IDisposable _cancelable;
private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);

private readonly IScheduler _scheduler;

public L(TParent parent, IObserver<TSource> observer)
: base(observer)
: base(parent, observer)
{
_scheduler = parent._scheduler;
}

protected IStopwatch _watch;
protected TimeSpan _delay;
protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>();

Expand All @@ -275,15 +269,6 @@ public L(TParent parent, IObserver<TSource> observer)
private bool _hasFailed;
private Exception _exception;

public override void Run(TParent parent)
{
_watch = _scheduler.StartStopwatch();

RunCore(parent);

base.Run(parent._source);
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
Expand All @@ -294,8 +279,6 @@ protected override void Dispose(bool disposing)
}
}

protected abstract void RunCore(TParent parent);

protected void ScheduleDrain()
{
_stop = new CancellationTokenSource();
Expand Down