diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs index 24d343429d..172763e69e 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs @@ -205,11 +205,10 @@ internal void InvokeNext(TState state) { var sad = new SingleAssignmentDisposable(); group.Add(sad); - sad.Disposable = scheduler.Schedule((state, sad, @this: this), (_, nextState) => + sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), nextState => { nextState.@this.group.Remove(nextState.sad); nextState.@this.InvokeFirst(nextState.state); - return Disposable.Empty; }); } @@ -234,11 +233,9 @@ internal void InvokeNext(TState state, TimeSpan time) { var sad = new SingleAssignmentDisposable(); group.Add(sad); - sad.Disposable = scheduler.Schedule((state, sad, @this: this), time, (_, nextState) => - { + sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), time, nextState => { nextState.@this.group.Remove(nextState.sad); nextState.@this.InvokeFirst(nextState.state); - return Disposable.Empty; }); } @@ -263,11 +260,9 @@ internal void InvokeNext(TState state, DateTimeOffset dtOffset) { var sad = new SingleAssignmentDisposable(); group.Add(sad); - sad.Disposable = scheduler.Schedule((state, sad, @this: this), dtOffset, (_, nextState) => - { + sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), dtOffset, nextState => { nextState.@this.group.Remove(nextState.sad); nextState.@this.InvokeFirst(nextState.state); - return Disposable.Empty; }); } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs index cf1c11b7fc..774174fd7a 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs @@ -89,12 +89,12 @@ public void Run() { var disp = _append ? _source.SubscribeSafe(this) - : _scheduler.Schedule(this, PrependValue); + : _scheduler.ScheduleAction(this, PrependValue); SetUpstream(disp); } - private static IDisposable PrependValue(IScheduler scheduler, _ sink) + private static IDisposable PrependValue(_ sink) { sink.ForwardOnNext(sink._value); return sink._source.SubscribeSafe(sink); @@ -104,7 +104,7 @@ public override void OnCompleted() { if (_append) { - var disposable = _scheduler.Schedule(this, AppendValue); + var disposable = _scheduler.ScheduleAction(this, AppendValue); Disposable.TrySetSingle(ref _schedulerDisposable, disposable); } else @@ -113,11 +113,10 @@ public override void OnCompleted() } } - private static IDisposable AppendValue(IScheduler scheduler, _ sink) + private static void AppendValue(_ sink) { sink.ForwardOnNext(sink._value); sink.ForwardOnCompleted(); - return Disposable.Empty; } protected override void Dispose(bool disposing) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs index 3f565c52b7..3c49b465a4 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs @@ -346,10 +346,10 @@ private void CreateTimer() _nextShift += _timeShift; } - m.Disposable = _scheduler.Schedule((@this: this, isSpan, isShift), ts, (_, tuple) => tuple.@this.Tick(tuple.isSpan, tuple.isShift)); + m.Disposable = _scheduler.ScheduleAction((@this: this, isSpan, isShift), ts, tuple => tuple.@this.Tick(tuple.isSpan, tuple.isShift)); } - private IDisposable Tick(bool isSpan, bool isShift) + private void Tick(bool isSpan, bool isShift) { lock (_gate) { @@ -372,8 +372,6 @@ private IDisposable Tick(bool isSpan, bool isShift) } CreateTimer(); - - return Disposable.Empty; } public override void OnNext(TSource value) @@ -559,19 +557,17 @@ private void CreateTimer(int id) var m = new SingleAssignmentDisposable(); Disposable.TrySetSerial(ref _timerSerial, m); - m.Disposable = _parent._scheduler.Schedule((@this: this, id), _parent._timeSpan, (_, tuple) => tuple.@this.Tick(tuple.id)); + m.Disposable = _parent._scheduler.ScheduleAction((@this: this, id), _parent._timeSpan, tuple => tuple.@this.Tick(tuple.id)); } - private IDisposable Tick(int id) + private void Tick(int id) { - var d = Disposable.Empty; - var newId = 0; lock (_gate) { if (id != _windowId) { - return d; + return; } _n = 0; @@ -583,8 +579,6 @@ private IDisposable Tick(int id) CreateTimer(newId); } - - return d; } public override void OnNext(TSource value) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs index 761a3f44b4..901a26c0f0 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs @@ -396,7 +396,7 @@ private void DrainQueue(ICancelable cancel) if (shouldWait) { var timer = new ManualResetEventSlim(); - _scheduler.Schedule(timer, waitTime, (_, slimTimer) => { slimTimer.Set(); return Disposable.Empty; }); + _scheduler.ScheduleAction(timer, waitTime, slimTimer => { slimTimer.Set(); }); try { @@ -455,10 +455,10 @@ protected override void RunCore(Absolute parent) { _ready = false; - Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start())); + Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start())); } - private IDisposable Start() + private void Start() { var next = default(TimeSpan); var shouldRun = false; @@ -491,8 +491,6 @@ private IDisposable Start() { Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule((Base.S)this, next, (@this, a) => DrainQueue(a))); } - - return Disposable.Empty; } } @@ -508,10 +506,10 @@ protected override void RunCore(Absolute parent) // ScheduleDrain might have already set a newer disposable // using TrySetSerial would cancel it, stopping the emission // and hang the consumer - Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start())); + Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start())); } - private IDisposable Start() + private void Start() { lock (_gate) { @@ -528,8 +526,6 @@ private IDisposable Start() } ScheduleDrain(); - - return Disposable.Empty; } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/DelaySubscription.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/DelaySubscription.cs index 79a30ff8f7..7205295fbf 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/DelaySubscription.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/DelaySubscription.cs @@ -56,12 +56,12 @@ public _(IObserver observer) public void Run(IObservable source, IScheduler scheduler, DateTimeOffset dueTime) { - SetUpstream(scheduler.Schedule((@this: this, source), dueTime, (self, tuple) => tuple.source.SubscribeSafe(tuple.@this))); + SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, tuple => tuple.source.SubscribeSafe(tuple.@this))); } public void Run(IObservable source, IScheduler scheduler, TimeSpan dueTime) { - SetUpstream(scheduler.Schedule((@this: this, source), dueTime, (self, tuple) => tuple.source.SubscribeSafe(tuple.@this))); + SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, tuple => tuple.source.SubscribeSafe(tuple.@this))); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Empty.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Empty.cs index 16a50e862e..9e0dc959c7 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Empty.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Empty.cs @@ -29,11 +29,7 @@ public _(IObserver observer) public void Run(IScheduler scheduler) { - SetUpstream(scheduler.Schedule(this, (s, target) => - { - target.OnCompleted(); - return Disposable.Empty; - })); + SetUpstream(scheduler.ScheduleAction(this, target => target.OnCompleted())); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs index 3ae2d740f4..2cc38434d0 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs @@ -321,11 +321,11 @@ private void Initialize() // the GetSchedulerForCurrentContext method). // var onNext = _parent.GetHandler(_subject.OnNext); - _parent._scheduler.Schedule(onNext, AddHandler); + _parent._scheduler.ScheduleAction(onNext, AddHandler); } } - private IDisposable AddHandler(IScheduler self, TDelegate onNext) + private void AddHandler(TDelegate onNext) { var removeHandler = default(IDisposable); try @@ -335,7 +335,7 @@ private IDisposable AddHandler(IScheduler self, TDelegate onNext) catch (Exception exception) { _subject.OnError(exception); - return Disposable.Empty; + return; } // @@ -347,8 +347,6 @@ private IDisposable AddHandler(IScheduler self, TDelegate onNext) // remove handler to run on the scheduler. // _removeHandler.Disposable = removeHandler; - - return Disposable.Empty; } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs index 466e9f258b..d3adbae945 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs @@ -140,7 +140,7 @@ public void Run(Lazy parent) { var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial); - cancelable.Disposable = closureParent._scheduler.Schedule((cancelable, closureParent), closureParent._disconnectTime, (self, tuple2) => + cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, tuple2 => { lock (tuple2.closureParent._gate) { @@ -150,8 +150,6 @@ public void Run(Lazy parent) tuple2.closureParent._connectableSubscription = null; } } - - return Disposable.Empty; }); } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs index 02adb86e90..451d68598c 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs @@ -111,7 +111,7 @@ public _(IObserver observer) public void Run(Time parent) { - SetUpstream(parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick())); + SetUpstream(parent._scheduler.ScheduleAction(this, parent._duration, state => state.Tick())); Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this)); } @@ -124,10 +124,9 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private IDisposable Tick() + private void Tick() { _open = true; - return Disposable.Empty; } public override void OnNext(TSource value) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs index 1db2d533a4..a419f6eda0 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs @@ -170,7 +170,7 @@ public _(IObserver observer) public void Run(SkipUntil parent) { - Disposable.SetSingle(ref _task, parent._scheduler.Schedule(this, parent._startTime, (_, state) => state.Tick())); + Disposable.SetSingle(ref _task, parent._scheduler.ScheduleAction(this, parent._startTime, state => state.Tick())); base.Run(parent._source); } @@ -183,10 +183,9 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private IDisposable Tick() + private void Tick() { _open = true; - return Disposable.Empty; } public override void OnNext(TSource value) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs index df8ca2813d..787025f81d 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs @@ -122,7 +122,7 @@ public void Run(Time parent) { _gate = new object(); - Disposable.SetSingle(ref _task, parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick())); + Disposable.SetSingle(ref _task, parent._scheduler.ScheduleAction(this, parent._duration, state => state.Tick())); base.Run(parent._source); } @@ -135,13 +135,12 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private IDisposable Tick() + private void Tick() { lock (_gate) { ForwardOnCompleted(); } - return Disposable.Empty; } public override void OnNext(TSource value) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs index 8044e971d2..f166f24ca3 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs @@ -147,7 +147,7 @@ public _(IObserver observer) public void Run(TakeUntil parent) { - Disposable.SetSingle(ref _timerDisposable, parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick())); + Disposable.SetSingle(ref _timerDisposable, parent._scheduler.ScheduleAction(this, parent._endTime, state => state.Tick())); base.Run(parent._source); } @@ -160,10 +160,9 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private IDisposable Tick() + private void Tick() { OnCompleted(); - return Disposable.Empty; } public override void OnNext(TSource value) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs index 7516d1fbb5..8560377ab0 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs @@ -64,10 +64,10 @@ public override void OnNext(TSource value) } Disposable.TrySetSerial(ref _serialCancelable, null); - Disposable.TrySetSerial(ref _serialCancelable, _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => tuple.@this.Propagate(tuple.currentid))); + Disposable.TrySetSerial(ref _serialCancelable, _scheduler.ScheduleAction((@this: this, currentid), _dueTime, tuple => tuple.@this.Propagate(tuple.currentid))); } - private IDisposable Propagate(ulong currentid) + private void Propagate(ulong currentid) { lock (_gate) { @@ -78,8 +78,6 @@ private IDisposable Propagate(ulong currentid) _hasValue = false; } - - return Disposable.Empty; } public override void OnError(Exception error) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs index 81394c08a3..8917f7c47a 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs @@ -70,7 +70,7 @@ private void CreateTimer(long idx) if (Disposable.TrySetMultiple(ref _timerDisposable, null)) { - var d = _scheduler.Schedule((idx, instance: this), _dueTime, (_, state) => { state.instance.Timeout(state.idx); return Disposable.Empty; }); + var d = _scheduler.ScheduleAction((idx, instance: this), _dueTime, state => { state.instance.Timeout(state.idx); }); Disposable.TrySetMultiple(ref _timerDisposable, d); } @@ -159,7 +159,7 @@ public _(IObservable other, IObserver observer) public void Run(Absolute parent) { - SetUpstream(parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Timeout())); + SetUpstream(parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Timeout())); Disposable.TrySetSingle(ref _serialDisposable, parent._source.SubscribeSafe(this)); } @@ -173,13 +173,12 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private IDisposable Timeout() + private void Timeout() { if (Interlocked.Increment(ref _wip) == 1) { Disposable.TrySetSerial(ref _serialDisposable, _other.SubscribeSafe(GetForwarder())); } - return Disposable.Empty; } public override void OnNext(TSource value) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs index 40633ad30e..31feedfa2e 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs @@ -58,19 +58,18 @@ public _(IObserver observer) public void Run(Single parent, DateTimeOffset dueTime) { - SetUpstream(parent._scheduler.Schedule(this, dueTime, (_, state) => state.Invoke())); + SetUpstream(parent._scheduler.ScheduleAction(this, dueTime, state => state.Invoke())); } public void Run(Single parent, TimeSpan dueTime) { - SetUpstream(parent._scheduler.Schedule(this, dueTime, (_, state) => state.Invoke())); + SetUpstream(parent._scheduler.ScheduleAction(this, dueTime, state => state.Invoke())); } - private IDisposable Invoke() + private void Invoke() { ForwardOnNext(0); ForwardOnCompleted(); - return Disposable.Empty; } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs index a61b0c4899..b0eae97b33 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs @@ -208,10 +208,10 @@ private void CreateTimer() _nextShift += _timeShift; } - m.Disposable = _scheduler.Schedule((@this: this, isSpan, isShift), ts, (_, tuple) => tuple.@this.Tick(tuple.isSpan, tuple.isShift)); + m.Disposable = _scheduler.ScheduleAction((@this: this, isSpan, isShift), ts, tuple => tuple.@this.Tick(tuple.isSpan, tuple.isShift)); } - private IDisposable Tick(bool isSpan, bool isShift) + private void Tick(bool isSpan, bool isShift) { lock (_gate) { @@ -234,8 +234,6 @@ private IDisposable Tick(bool isSpan, bool isShift) } CreateTimer(); - - return Disposable.Empty; } public override void OnNext(TSource value) @@ -424,19 +422,17 @@ private void CreateTimer(Subject window) var m = new SingleAssignmentDisposable(); _timerD.Disposable = m; - m.Disposable = _scheduler.Schedule((@this: this, window), _timeSpan, (_, tuple) => tuple.@this.Tick(tuple.window)); + m.Disposable = _scheduler.ScheduleAction((@this: this, window), _timeSpan, tuple => tuple.@this.Tick(tuple.window)); } - private IDisposable Tick(Subject window) + private void Tick(Subject window) { - var d = Disposable.Empty; - var newWindow = default(Subject); lock (_gate) { if (window != _s) { - return d; + return; } _n = 0; @@ -448,8 +444,6 @@ private IDisposable Tick(Subject window) } CreateTimer(newWindow); - - return d; } public override void OnNext(TSource value) diff --git a/Rx.NET/Source/src/System.Reactive/Notification.cs b/Rx.NET/Source/src/System.Reactive/Notification.cs index 3a7cc02991..21e618de71 100644 --- a/Rx.NET/Source/src/System.Reactive/Notification.cs +++ b/Rx.NET/Source/src/System.Reactive/Notification.cs @@ -665,7 +665,7 @@ public NotificationToObservable(IScheduler scheduler, Notification parent) protected override IDisposable SubscribeCore(IObserver observer) { - return _scheduler.Schedule((_parent, observer), (scheduler, state) => + return _scheduler.ScheduleAction((_parent, observer), state => { var parent = state._parent; var o = state.observer; @@ -676,7 +676,6 @@ protected override IDisposable SubscribeCore(IObserver observer) { o.OnCompleted(); } - return Disposable.Empty; }); } } diff --git a/Rx.NET/Source/src/System.Reactive/ObservableBase.cs b/Rx.NET/Source/src/System.Reactive/ObservableBase.cs index 532aceb26d..dd01df7a61 100644 --- a/Rx.NET/Source/src/System.Reactive/ObservableBase.cs +++ b/Rx.NET/Source/src/System.Reactive/ObservableBase.cs @@ -50,7 +50,7 @@ public IDisposable Subscribe(IObserver observer) // exception thrown in OnNext to circle back to OnError, which looks like the // sequence can't make up its mind. // - CurrentThreadScheduler.Instance.Schedule(autoDetachObserver, ScheduledSubscribe); + CurrentThreadScheduler.Instance.ScheduleAction(autoDetachObserver, ScheduledSubscribe); } else { @@ -79,7 +79,7 @@ public IDisposable Subscribe(IObserver observer) return autoDetachObserver; } - private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver autoDetachObserver) + private void ScheduledSubscribe(AutoDetachObserver autoDetachObserver) { try { @@ -101,8 +101,6 @@ private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver autoD throw; } } - - return Disposable.Empty; } ///