Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,10 @@ internal void InvokeNext(TState state)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), (_, nextState) =>
sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), nextState =>
{
[email protected](nextState.sad);
[email protected](nextState.state);
return Disposable.Empty;
});
}

Expand All @@ -234,11 +233,9 @@ internal void InvokeNext(TState state, TimeSpan time)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), time, (_, nextState) =>
{
sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), time, nextState => {
[email protected](nextState.sad);
[email protected](nextState.state);
return Disposable.Empty;
});
}

Expand All @@ -263,11 +260,9 @@ internal void InvokeNext(TState state, DateTimeOffset dtOffset)
{
var sad = new SingleAssignmentDisposable();
group.Add(sad);
sad.Disposable = scheduler.Schedule((state, sad, @this: this), dtOffset, (_, nextState) =>
{
sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), dtOffset, nextState => {
[email protected](nextState.sad);
[email protected](nextState.state);
return Disposable.Empty;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public void Run()
{
var disp = _append
? _source.SubscribeSafe(this)
: _scheduler.Schedule(this, PrependValue);
: _scheduler.ScheduleAction(this, PrependValue);

SetUpstream(disp);
}

private static IDisposable PrependValue(IScheduler scheduler, _ sink)
private static IDisposable PrependValue(_ sink)
{
sink.ForwardOnNext(sink._value);
return sink._source.SubscribeSafe(sink);
Expand All @@ -104,7 +104,7 @@ public override void OnCompleted()
{
if (_append)
{
var disposable = _scheduler.Schedule(this, AppendValue);
var disposable = _scheduler.ScheduleAction(this, AppendValue);
Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
}
else
Expand All @@ -113,11 +113,10 @@ public override void OnCompleted()
}
}

private static IDisposable AppendValue(IScheduler scheduler, _ sink)
private static void AppendValue(_ sink)
{
sink.ForwardOnNext(sink._value);
sink.ForwardOnCompleted();
return Disposable.Empty;
}

protected override void Dispose(bool disposing)
Expand Down
16 changes: 5 additions & 11 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ private void CreateTimer()
_nextShift += _timeShift;
}

m.Disposable = _scheduler.Schedule((@this: this, isSpan, isShift), ts, (_, tuple) => [email protected](tuple.isSpan, tuple.isShift));
m.Disposable = _scheduler.ScheduleAction((@this: this, isSpan, isShift), ts, tuple => [email protected](tuple.isSpan, tuple.isShift));
}

private IDisposable Tick(bool isSpan, bool isShift)
private void Tick(bool isSpan, bool isShift)
{
lock (_gate)
{
Expand All @@ -372,8 +372,6 @@ private IDisposable Tick(bool isSpan, bool isShift)
}

CreateTimer();

return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down Expand Up @@ -559,19 +557,17 @@ private void CreateTimer(int id)
var m = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _timerSerial, m);

m.Disposable = _parent._scheduler.Schedule((@this: this, id), _parent._timeSpan, (_, tuple) => [email protected](tuple.id));
m.Disposable = _parent._scheduler.ScheduleAction((@this: this, id), _parent._timeSpan, tuple => [email protected](tuple.id));
}

private IDisposable Tick(int id)
private void Tick(int id)
{
var d = Disposable.Empty;

var newId = 0;
lock (_gate)
{
if (id != _windowId)
{
return d;
return;
}

_n = 0;
Expand All @@ -583,8 +579,6 @@ private IDisposable Tick(int id)

CreateTimer(newId);
}

return d;
}

public override void OnNext(TSource value)
Expand Down
14 changes: 5 additions & 9 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ private void DrainQueue(ICancelable cancel)
if (shouldWait)
{
var timer = new ManualResetEventSlim();
_scheduler.Schedule(timer, waitTime, (_, slimTimer) => { slimTimer.Set(); return Disposable.Empty; });
_scheduler.ScheduleAction(timer, waitTime, slimTimer => { slimTimer.Set(); });

try
{
Expand Down Expand Up @@ -455,10 +455,10 @@ protected override void RunCore(Absolute parent)
{
_ready = false;

Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start()));
}

private IDisposable Start()
private void Start()
{
var next = default(TimeSpan);
var shouldRun = false;
Expand Down Expand Up @@ -491,8 +491,6 @@ private IDisposable Start()
{
Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule((Base<Absolute>.S)this, next, (@this, a) => DrainQueue(a)));
}

return Disposable.Empty;
}
}

Expand All @@ -508,10 +506,10 @@ protected override void RunCore(Absolute parent)
// ScheduleDrain might have already set a newer disposable
// using TrySetSerial would cancel it, stopping the emission
// and hang the consumer
Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start()));
}

private IDisposable Start()
private void Start()
{
lock (_gate)
{
Expand All @@ -528,8 +526,6 @@ private IDisposable Start()
}

ScheduleDrain();

return Disposable.Empty;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public _(IObserver<TSource> observer)

public void Run(IObservable<TSource> source, IScheduler scheduler, DateTimeOffset dueTime)
{
SetUpstream(scheduler.Schedule((@this: this, source), dueTime, (self, tuple) => tuple.source.SubscribeSafe(tuple.@this)));
SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, tuple => tuple.source.SubscribeSafe(tuple.@this)));
}

public void Run(IObservable<TSource> source, IScheduler scheduler, TimeSpan dueTime)
{
SetUpstream(scheduler.Schedule((@this: this, source), dueTime, (self, tuple) => tuple.source.SubscribeSafe(tuple.@this)));
SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, tuple => tuple.source.SubscribeSafe(tuple.@this)));
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Empty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ public _(IObserver<TResult> observer)

public void Run(IScheduler scheduler)
{
SetUpstream(scheduler.Schedule(this, (s, target) =>
{
target.OnCompleted();
return Disposable.Empty;
}));
SetUpstream(scheduler.ScheduleAction(this, target => target.OnCompleted()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,11 @@ private void Initialize()
// the GetSchedulerForCurrentContext method).
//
var onNext = _parent.GetHandler(_subject.OnNext);
_parent._scheduler.Schedule(onNext, AddHandler);
_parent._scheduler.ScheduleAction(onNext, AddHandler);
}
}

private IDisposable AddHandler(IScheduler self, TDelegate onNext)
private void AddHandler(TDelegate onNext)
{
var removeHandler = default(IDisposable);
try
Expand All @@ -335,7 +335,7 @@ private IDisposable AddHandler(IScheduler self, TDelegate onNext)
catch (Exception exception)
{
_subject.OnError(exception);
return Disposable.Empty;
return;
}

//
Expand All @@ -347,8 +347,6 @@ private IDisposable AddHandler(IScheduler self, TDelegate onNext)
// remove handler to run on the scheduler.
//
_removeHandler.Disposable = removeHandler;

return Disposable.Empty;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void Run(Lazy parent)
{
var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial);

cancelable.Disposable = closureParent._scheduler.Schedule((cancelable, closureParent), closureParent._disconnectTime, (self, tuple2) =>
cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, tuple2 =>
{
lock (tuple2.closureParent._gate)
{
Expand All @@ -150,8 +150,6 @@ public void Run(Lazy parent)
tuple2.closureParent._connectableSubscription = null;
}
}

return Disposable.Empty;
});
}
}
Expand Down
5 changes: 2 additions & 3 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public _(IObserver<TSource> observer)

public void Run(Time parent)
{
SetUpstream(parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick()));
SetUpstream(parent._scheduler.ScheduleAction(this, parent._duration, state => state.Tick()));
Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
}

Expand All @@ -124,10 +124,9 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

private IDisposable Tick()
private void Tick()
{
_open = true;
return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public _(IObserver<TSource> observer)

public void Run(SkipUntil<TSource> parent)
{
Disposable.SetSingle(ref _task, parent._scheduler.Schedule(this, parent._startTime, (_, state) => state.Tick()));
Disposable.SetSingle(ref _task, parent._scheduler.ScheduleAction(this, parent._startTime, state => state.Tick()));
base.Run(parent._source);
}

Expand All @@ -183,10 +183,9 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

private IDisposable Tick()
private void Tick()
{
_open = true;
return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
5 changes: 2 additions & 3 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void Run(Time parent)
{
_gate = new object();

Disposable.SetSingle(ref _task, parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick()));
Disposable.SetSingle(ref _task, parent._scheduler.ScheduleAction(this, parent._duration, state => state.Tick()));
base.Run(parent._source);
}

Expand All @@ -135,13 +135,12 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

private IDisposable Tick()
private void Tick()
{
lock (_gate)
{
ForwardOnCompleted();
}
return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public _(IObserver<TSource> observer)

public void Run(TakeUntil<TSource> parent)
{
Disposable.SetSingle(ref _timerDisposable, parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick()));
Disposable.SetSingle(ref _timerDisposable, parent._scheduler.ScheduleAction(this, parent._endTime, state => state.Tick()));
base.Run(parent._source);
}

Expand All @@ -160,10 +160,9 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

private IDisposable Tick()
private void Tick()
{
OnCompleted();
return Disposable.Empty;
}

public override void OnNext(TSource value)
Expand Down
6 changes: 2 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public override void OnNext(TSource value)
}

Disposable.TrySetSerial(ref _serialCancelable, null);
Disposable.TrySetSerial(ref _serialCancelable, _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => [email protected](tuple.currentid)));
Disposable.TrySetSerial(ref _serialCancelable, _scheduler.ScheduleAction((@this: this, currentid), _dueTime, tuple => [email protected](tuple.currentid)));
}

private IDisposable Propagate(ulong currentid)
private void Propagate(ulong currentid)
{
lock (_gate)
{
Expand All @@ -78,8 +78,6 @@ private IDisposable Propagate(ulong currentid)

_hasValue = false;
}

return Disposable.Empty;
}

public override void OnError(Exception error)
Expand Down
Loading