Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,40 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
if (action == null)
throw new ArgumentNullException(nameof(action));

var state1 = state;
var gate = new AsyncLock();
return new PeriodicallyScheduledWorkItem<TState>(state, period, action);
}

private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private TState _state;
private Func<TState, TState> _action;
private readonly IDisposable _cancel;
private readonly AsyncLock _gate = new AsyncLock();

var cancel = s_cal.StartPeriodicTimer(() =>
public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action)
{
gate.Wait(() =>
{
state1 = action(state1);
});
}, period);
_state = state;
_action = action;

_cancel = s_cal.StartPeriodicTimer(Tick, period);
}

return Disposable.Create(() =>
private void Tick()
{
cancel.Dispose();
gate.Dispose();
action = Stubs<TState>.I;
});
_gate.Wait(
this,
closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state));
}

public void Dispose()
{
_cancel.Dispose();
_gate.Dispose();
_action = Stubs<TState>.I;
}
}


/// <summary>
/// Discovers scheduler services by interface type.
/// </summary>
Expand Down
62 changes: 29 additions & 33 deletions Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,52 +191,48 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
if (action == null)
throw new ArgumentNullException(nameof(action));

var start = _stopwatch.Elapsed;
var next = start + period;

var state1 = state;
return new PeriodicallyScheduledWorkItem<TState>(this, state, period, action);
}

var td = new TernaryDisposable();
private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private readonly TimeSpan _period;
private readonly Func<TState, TState> _action;
private readonly EventLoopScheduler _scheduler;
private readonly AsyncLock _gate = new AsyncLock();

var gate = new AsyncLock();
td.Extra = gate;
private TState _state;
private TimeSpan _next;
private IDisposable _task;

var tick = default(Func<IScheduler, object, IDisposable>);
tick = (self_, _) =>
public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
{
next += period;
_state = state;
_period = period;
_action = action;
_scheduler = scheduler;
_next = scheduler._stopwatch.Elapsed + period;

td.Next = self_.Schedule(null, next - _stopwatch.Elapsed, tick);

gate.Wait(() =>
{
state1 = action(state1);
});

return Disposable.Empty;
};
Disposable.TrySetSingle(ref _task, scheduler.Schedule(this, _next - scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));
}

td.First = Schedule(null, next - _stopwatch.Elapsed, tick);
private IDisposable Tick(IScheduler self)
{
_next += _period;

return td;
}
Disposable.TrySetMultiple(ref _task, self.Schedule(this, _next - _scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));

private sealed class TernaryDisposable : IDisposable
{
private IDisposable _task;
private IDisposable _extra;
_gate.Wait(
this,
closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state));

// If Next was called before this assignment is executed, it won't overwrite
// a more fresh IDisposable task
public IDisposable First { set { Disposable.TrySetSingle(ref _task, value); } }
// It is fine to overwrite the first or previous IDisposable task
public IDisposable Next { set { Disposable.TrySetMultiple(ref _task, value); } }
public IDisposable Extra { set { Disposable.SetSingle(ref _extra, value); } }
return Disposable.Empty;
}

public void Dispose()
{
Disposable.TryDispose(ref _task);
Disposable.TryDispose(ref _extra);
_gate.Dispose();
}
}

Expand Down
42 changes: 23 additions & 19 deletions Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TSta
asyncLock = new AsyncLock();
}

asyncLock.Wait(() =>
{
if (!m.IsDisposed)
asyncLock.Wait(
(@this: this, m, action, state),
tuple =>
{
m.Disposable = action(this, state);
}
});
if (!m.IsDisposed)
{
tuple.m.Disposable = tuple.action(tuple.@this, tuple.state);
}
});

return m;
}
Expand Down Expand Up @@ -113,22 +115,24 @@ private IDisposable ScheduleSlow<TState>(TState state, TimeSpan dueTime, Func<IS
asyncLock = new AsyncLock();
}

asyncLock.Wait(() =>
{
if (!m.IsDisposed)
asyncLock.Wait(
(@this: this, m, state, action, timer, dueTime),
tuple =>
{
var sleep = dueTime - timer.Elapsed;
if (sleep.Ticks > 0)
{
ConcurrencyAbstractionLayer.Current.Sleep(sleep);
}

if (!m.IsDisposed)
if (!tuple.m.IsDisposed)
{
m.Disposable = action(this, state);
var sleep = tuple.dueTime - tuple.timer.Elapsed;
if (sleep.Ticks > 0)
{
ConcurrencyAbstractionLayer.Current.Sleep(sleep);
}

if (!tuple.m.IsDisposed)
{
tuple.m.Disposable = tuple.action(tuple.@this, tuple.state);
}
}
}
});
});

return m;
}
Expand Down
59 changes: 41 additions & 18 deletions Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,54 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
if (action == null)
throw new ArgumentNullException(nameof(action));

var cancel = new CancellationDisposable();
return new PeriodicallyScheduledWorkItem<TState>(state, period, action, taskFactory);
}

private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private TState _state;

private readonly TimeSpan _period;
private readonly TaskFactory _taskFactory;
private readonly Func<TState, TState> _action;
private readonly AsyncLock _gate = new AsyncLock();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action, TaskFactory taskFactory)
{
_state = state;
_period = period;
_action = action;
_taskFactory = taskFactory;

MoveNext();
}

var state1 = state;
var gate = new AsyncLock();
public void Dispose()
{
_cts.Cancel();
_gate.Dispose();
}

var moveNext = default(Action);
moveNext = () =>
private void MoveNext()
{
TaskHelpers.Delay(period, cancel.Token).ContinueWith(
_ =>
TaskHelpers.Delay(_period, _cts.Token).ContinueWith(
(_, thisObject) =>
{
moveNext();
var @this = (PeriodicallyScheduledWorkItem<TState>)thisObject;

@this.MoveNext();

gate.Wait(() =>
{
state1 = action(state1);
});
@this._gate.Wait(
@this,
closureThis => closureThis._state = closureThis._action(closureThis._state));
},
CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler
this,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion,
_taskFactory.Scheduler
);
};

moveNext();

return StableCompositeDisposable.Create(cancel, gate);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,26 +157,40 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
if (action == null)
throw new ArgumentNullException(nameof(action));

var state1 = state;
var gate = new AsyncLock();
return new PeriodicallyScheduledWorkItem<TState>(state, period, action);
}

var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer(
tpt =>
{
gate.Wait(() =>
{
state1 = action(state1);
});
},
period
);
private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private TState _state;
private Func<TState, TState> _action;

private readonly ThreadPoolTimer _timer;
private readonly AsyncLock _gate = new AsyncLock();

return Disposable.Create(() =>
public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action)
{
res.Cancel();
gate.Dispose();
action = Stubs<TState>.I;
});
_state = state;
_action = action;

_timer = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer(
Tick,
period);
}

private void Tick(ThreadPoolTimer timer)
{
_gate.Wait(
this,
@this => @this._state = @this._action(@this._state));
}

public void Dispose()
{
_timer.Cancel();
_gate.Dispose();
_action = Stubs<TState>.I;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,12 @@ public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action)

private void Tick(object state)
{
_gate.Wait(() =>
{
_state = _action(_state);
});
_gate.Wait(
this,
@this =>
{
@this._state = @this._action(@this._state);
});
}

public void Dispose()
Expand Down
21 changes: 9 additions & 12 deletions Rx.NET/Source/src/System.Reactive/Internal/AsyncLockObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,23 @@ public AsyncLockObserver(IObserver<T> observer, AsyncLock gate)

protected override void OnNextCore(T value)
{
_gate.Wait(() =>
{
_observer.OnNext(value);
});
_gate.Wait(
(_observer, value),
tuple => tuple._observer.OnNext(tuple.value));
}

protected override void OnErrorCore(Exception exception)
{
_gate.Wait(() =>
{
_observer.OnError(exception);
});
_gate.Wait(
(_observer, exception),
tuple => tuple._observer.OnError(tuple.exception));
}

protected override void OnCompletedCore()
{
_gate.Wait(() =>
{
_observer.OnCompleted();
});
_gate.Wait(
_observer,
closureObserver => closureObserver.OnCompleted());
}
}
}
4 changes: 2 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ public override void Run(IObservable<TSource> source)

base.Run(source);

_bufferGate.Wait(CreateBufferClose);
_bufferGate.Wait(this, @this => @this.CreateBufferClose());
}

protected override void Dispose(bool disposing)
Expand Down Expand Up @@ -564,7 +564,7 @@ private void CloseBuffer(IDisposable closingSubscription)
ForwardOnNext(res);
}

_bufferGate.Wait(CreateBufferClose);
_bufferGate.Wait(this, @this => @this.CreateBufferClose());
}

private sealed class BufferClosingObserver : IObserver<TBufferClosing>
Expand Down
Loading