diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs index 8b46ff245d..7b4fc7d5c7 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs @@ -128,25 +128,40 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< if (action == null) throw new ArgumentNullException(nameof(action)); - var state1 = state; - var gate = new AsyncLock(); + return new PeriodicallyScheduledWorkItem(state, period, action); + } + + private sealed class PeriodicallyScheduledWorkItem : IDisposable + { + private TState _state; + private Func _action; + private readonly IDisposable _cancel; + private readonly AsyncLock _gate = new AsyncLock(); - var cancel = s_cal.StartPeriodicTimer(() => + public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func action) { - gate.Wait(() => - { - state1 = action(state1); - }); - }, period); + _state = state; + _action = action; + + _cancel = s_cal.StartPeriodicTimer(Tick, period); + } - return Disposable.Create(() => + private void Tick() { - cancel.Dispose(); - gate.Dispose(); - action = Stubs.I; - }); + _gate.Wait( + this, + closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state)); + } + + public void Dispose() + { + _cancel.Dispose(); + _gate.Dispose(); + _action = Stubs.I; + } } + /// /// Discovers scheduler services by interface type. /// diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs index f7301a9c8f..2404aa1c71 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs @@ -191,52 +191,48 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< if (action == null) throw new ArgumentNullException(nameof(action)); - var start = _stopwatch.Elapsed; - var next = start + period; - - var state1 = state; + return new PeriodicallyScheduledWorkItem(this, state, period, action); + } - var td = new TernaryDisposable(); + private sealed class PeriodicallyScheduledWorkItem : IDisposable + { + private readonly TimeSpan _period; + private readonly Func _action; + private readonly EventLoopScheduler _scheduler; + private readonly AsyncLock _gate = new AsyncLock(); - var gate = new AsyncLock(); - td.Extra = gate; + private TState _state; + private TimeSpan _next; + private IDisposable _task; - var tick = default(Func); - tick = (self_, _) => + public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func action) { - next += period; + _state = state; + _period = period; + _action = action; + _scheduler = scheduler; + _next = scheduler._stopwatch.Elapsed + period; - td.Next = self_.Schedule(null, next - _stopwatch.Elapsed, tick); - - gate.Wait(() => - { - state1 = action(state1); - }); - - return Disposable.Empty; - }; + Disposable.TrySetSingle(ref _task, scheduler.Schedule(this, _next - scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_))); + } - td.First = Schedule(null, next - _stopwatch.Elapsed, tick); + private IDisposable Tick(IScheduler self) + { + _next += _period; - return td; - } + Disposable.TrySetMultiple(ref _task, self.Schedule(this, _next - _scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_))); - private sealed class TernaryDisposable : IDisposable - { - private IDisposable _task; - private IDisposable _extra; + _gate.Wait( + this, + closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state)); - // If Next was called before this assignment is executed, it won't overwrite - // a more fresh IDisposable task - public IDisposable First { set { Disposable.TrySetSingle(ref _task, value); } } - // It is fine to overwrite the first or previous IDisposable task - public IDisposable Next { set { Disposable.TrySetMultiple(ref _task, value); } } - public IDisposable Extra { set { Disposable.SetSingle(ref _extra, value); } } + return Disposable.Empty; + } public void Dispose() { Disposable.TryDispose(ref _task); - Disposable.TryDispose(ref _extra); + _gate.Dispose(); } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs index 99d6f10868..c32b07c1d8 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs @@ -78,13 +78,15 @@ public override IDisposable Schedule(TState state, Func - { - if (!m.IsDisposed) + asyncLock.Wait( + (@this: this, m, action, state), + tuple => { - m.Disposable = action(this, state); - } - }); + if (!m.IsDisposed) + { + tuple.m.Disposable = tuple.action(tuple.@this, tuple.state); + } + }); return m; } @@ -113,22 +115,24 @@ private IDisposable ScheduleSlow(TState state, TimeSpan dueTime, Func - { - if (!m.IsDisposed) + asyncLock.Wait( + (@this: this, m, state, action, timer, dueTime), + tuple => { - var sleep = dueTime - timer.Elapsed; - if (sleep.Ticks > 0) - { - ConcurrencyAbstractionLayer.Current.Sleep(sleep); - } - - if (!m.IsDisposed) + if (!tuple.m.IsDisposed) { - m.Disposable = action(this, state); + var sleep = tuple.dueTime - tuple.timer.Elapsed; + if (sleep.Ticks > 0) + { + ConcurrencyAbstractionLayer.Current.Sleep(sleep); + } + + if (!tuple.m.IsDisposed) + { + tuple.m.Disposable = tuple.action(tuple.@this, tuple.state); + } } - } - }); + }); return m; } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs index f0d3642cb7..e0d29f4d26 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs @@ -179,31 +179,54 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< if (action == null) throw new ArgumentNullException(nameof(action)); - var cancel = new CancellationDisposable(); + return new PeriodicallyScheduledWorkItem(state, period, action, taskFactory); + } + + private sealed class PeriodicallyScheduledWorkItem : IDisposable + { + private TState _state; + + private readonly TimeSpan _period; + private readonly TaskFactory _taskFactory; + private readonly Func _action; + private readonly AsyncLock _gate = new AsyncLock(); + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + + public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func action, TaskFactory taskFactory) + { + _state = state; + _period = period; + _action = action; + _taskFactory = taskFactory; + + MoveNext(); + } - var state1 = state; - var gate = new AsyncLock(); + public void Dispose() + { + _cts.Cancel(); + _gate.Dispose(); + } - var moveNext = default(Action); - moveNext = () => + private void MoveNext() { - TaskHelpers.Delay(period, cancel.Token).ContinueWith( - _ => + TaskHelpers.Delay(_period, _cts.Token).ContinueWith( + (_, thisObject) => { - moveNext(); + var @this = (PeriodicallyScheduledWorkItem)thisObject; + + @this.MoveNext(); - gate.Wait(() => - { - state1 = action(state1); - }); + @this._gate.Wait( + @this, + closureThis => closureThis._state = closureThis._action(closureThis._state)); }, - CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler + this, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, + _taskFactory.Scheduler ); - }; - - moveNext(); - - return StableCompositeDisposable.Create(cancel, gate); + } } } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs index 49982529e4..b673d8b3f3 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs @@ -157,26 +157,40 @@ public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func< if (action == null) throw new ArgumentNullException(nameof(action)); - var state1 = state; - var gate = new AsyncLock(); + return new PeriodicallyScheduledWorkItem(state, period, action); + } - var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer( - tpt => - { - gate.Wait(() => - { - state1 = action(state1); - }); - }, - period - ); + private sealed class PeriodicallyScheduledWorkItem : IDisposable + { + private TState _state; + private Func _action; + + private readonly ThreadPoolTimer _timer; + private readonly AsyncLock _gate = new AsyncLock(); - return Disposable.Create(() => + public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func action) { - res.Cancel(); - gate.Dispose(); - action = Stubs.I; - }); + _state = state; + _action = action; + + _timer = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer( + Tick, + period); + } + + private void Tick(ThreadPoolTimer timer) + { + _gate.Wait( + this, + @this => @this._state = @this._action(@this._state)); + } + + public void Dispose() + { + _timer.Cancel(); + _gate.Dispose(); + _action = Stubs.I; + } } } } diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs index 189cb4d470..480077e9a2 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs @@ -284,10 +284,12 @@ public PeriodicTimer(TState state, TimeSpan period, Func action) private void Tick(object state) { - _gate.Wait(() => - { - _state = _action(_state); - }); + _gate.Wait( + this, + @this => + { + @this._state = @this._action(@this._state); + }); } public void Dispose() diff --git a/Rx.NET/Source/src/System.Reactive/Internal/AsyncLockObserver.cs b/Rx.NET/Source/src/System.Reactive/Internal/AsyncLockObserver.cs index aaa2ad23af..0fc412ca72 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/AsyncLockObserver.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/AsyncLockObserver.cs @@ -19,26 +19,23 @@ public AsyncLockObserver(IObserver observer, AsyncLock gate) protected override void OnNextCore(T value) { - _gate.Wait(() => - { - _observer.OnNext(value); - }); + _gate.Wait( + (_observer, value), + tuple => tuple._observer.OnNext(tuple.value)); } protected override void OnErrorCore(Exception exception) { - _gate.Wait(() => - { - _observer.OnError(exception); - }); + _gate.Wait( + (_observer, exception), + tuple => tuple._observer.OnError(tuple.exception)); } protected override void OnCompletedCore() { - _gate.Wait(() => - { - _observer.OnCompleted(); - }); + _gate.Wait( + _observer, + closureObserver => closureObserver.OnCompleted()); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs index 2415d77720..3727312b76 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs @@ -520,7 +520,7 @@ public override void Run(IObservable source) base.Run(source); - _bufferGate.Wait(CreateBufferClose); + _bufferGate.Wait(this, @this => @this.CreateBufferClose()); } protected override void Dispose(bool disposing) @@ -564,7 +564,7 @@ private void CloseBuffer(IDisposable closingSubscription) ForwardOnNext(res); } - _bufferGate.Wait(CreateBufferClose); + _bufferGate.Wait(this, @this => @this.CreateBufferClose()); } private sealed class BufferClosingObserver : IObserver diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs index fe90e66727..bf1653ec2c 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs @@ -530,7 +530,7 @@ public override void Run(IObservable source) groupDisposable.Add(source.SubscribeSafe(this)); - _windowGate.Wait(CreateWindowClose); + _windowGate.Wait(this, @this => @this.CreateWindowClose()); SetUpstream(_refCountDisposable); } @@ -569,7 +569,7 @@ private void CloseWindow(IDisposable closingSubscription) ForwardOnNext(window); } - _windowGate.Wait(CreateWindowClose); + _windowGate.Wait(this, @this => @this.CreateWindowClose()); } private sealed class WindowClosingObserver : IObserver