Skip to content

Commit 196b172

Browse files
akarnokddanielcweber
authored andcommitted
Use Schedule calls with state (#558)
1 parent a9e2277 commit 196b172

File tree

13 files changed

+206
-197
lines changed

13 files changed

+206
-197
lines changed

Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ protected override IDisposable SubscribeCore(IObserver<TSource> observer)
5656
var d = new SerialDisposable();
5757
d.Disposable = m;
5858

59-
m.Disposable = scheduler.Schedule(() =>
59+
m.Disposable = scheduler.Schedule((source, observer, d),
60+
(scheduler, state) =>
6061
{
61-
d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
62+
state.d.Disposable = new ScheduledDisposable(scheduler, state.source.SubscribeSafe(state.observer));
63+
return Disposable.Empty;
6264
});
6365

6466
return d;

Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ private void DrainQueue(ICancelable cancel)
434434
if (shouldWait)
435435
{
436436
var timer = new ManualResetEventSlim();
437-
_scheduler.Schedule(waitTime, () => { timer.Set(); });
437+
_scheduler.Schedule(timer, waitTime, (_, slimTimer) => { slimTimer.Set(); return Disposable.Empty; });
438438

439439
try
440440
{

Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,12 +284,13 @@ public IDisposable Connect(IObserver<TArgs> observer)
284284
{
285285
if (--_count == 0)
286286
{
287-
_parent._scheduler.Schedule(_removeHandler.Dispose);
287+
_parent._scheduler.ScheduleAction(_removeHandler, handler => handler.Dispose());
288288
_parent._session = null;
289289
}
290290
}
291291
});
292292
}
293+
293294
}
294295

295296
private void Initialize()

Rx.NET/Source/src/System.Reactive/Linq/Observable/Return.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System.Reactive.Concurrency;
6+
using System.Reactive.Disposables;
67

78
namespace System.Reactive.Linq.ObservableImpl
89
{
@@ -33,7 +34,7 @@ public _(TResult value, IObserver<TResult> observer, IDisposable cancel)
3334

3435
public IDisposable Run(IScheduler scheduler)
3536
{
36-
return scheduler.Schedule(Invoke);
37+
return scheduler.ScheduleAction(this, @this => @this.Invoke());
3738
}
3839

3940
private void Invoke()

Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,15 @@ public _(IObserver<TSource> observer, IDisposable cancel)
101101

102102
public IDisposable Run(Time parent)
103103
{
104-
var t = parent._scheduler.Schedule(parent._duration, Tick);
104+
var t = parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick());
105105
var d = parent._source.SubscribeSafe(this);
106106
return StableCompositeDisposable.Create(t, d);
107107
}
108108

109-
private void Tick()
109+
private IDisposable Tick()
110110
{
111111
_open = true;
112+
return Disposable.Empty;
112113
}
113114

114115
public override void OnNext(TSource value)

Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,15 @@ public _(IObserver<TSource> observer, IDisposable cancel)
205205

206206
public IDisposable Run(SkipUntil<TSource> parent)
207207
{
208-
var t = parent._scheduler.Schedule(parent._startTime, Tick);
208+
var t = parent._scheduler.Schedule(this, parent._startTime, (_, state) => state.Tick());
209209
var d = parent._source.SubscribeSafe(this);
210210
return StableCompositeDisposable.Create(t, d);
211211
}
212212

213-
private void Tick()
213+
private IDisposable Tick()
214214
{
215215
_open = true;
216+
return Disposable.Empty;
216217
}
217218

218219
public override void OnNext(TSource value)

Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,17 +112,18 @@ public IDisposable Run(Time parent)
112112
{
113113
_gate = new object();
114114

115-
var t = parent._scheduler.Schedule(parent._duration, Tick);
115+
var t = parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick());
116116
var d = parent._source.SubscribeSafe(this);
117117
return StableCompositeDisposable.Create(t, d);
118118
}
119119

120-
private void Tick()
120+
private IDisposable Tick()
121121
{
122122
lock (_gate)
123123
{
124124
ForwardOnCompleted();
125125
}
126+
return Disposable.Empty;
126127
}
127128

128129
public override void OnNext(TSource value)

Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,17 +180,18 @@ public _(IObserver<TSource> observer, IDisposable cancel)
180180

181181
public IDisposable Run(TakeUntil<TSource> parent)
182182
{
183-
var t = parent._scheduler.Schedule(parent._endTime, Tick);
183+
var t = parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick());
184184
var d = parent._source.SubscribeSafe(this);
185185
return StableCompositeDisposable.Create(t, d);
186186
}
187187

188-
private void Tick()
188+
private IDisposable Tick()
189189
{
190190
lock (_gate)
191191
{
192192
ForwardOnCompleted();
193193
}
194+
return Disposable.Empty;
194195
}
195196

196197
public override void OnNext(TSource value)

Rx.NET/Source/src/System.Reactive/Linq/Observable/Throw.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System.Reactive.Concurrency;
6+
using System.Reactive.Disposables;
67

78
namespace System.Reactive.Linq.ObservableImpl
89
{
@@ -33,13 +34,9 @@ public _(Exception exception, IObserver<TResult> observer, IDisposable cancel)
3334

3435
public IDisposable Run(IScheduler scheduler)
3536
{
36-
return scheduler.Schedule(Invoke);
37+
return scheduler.ScheduleAction(this, @this => @this.ForwardOnError(@this._exception));
3738
}
3839

39-
private void Invoke()
40-
{
41-
ForwardOnError(_exception);
42-
}
4340
}
4441
}
4542
}

Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,14 +173,14 @@ public IDisposable Run(Absolute parent)
173173

174174
_switched = false;
175175

176-
var timer = parent._scheduler.Schedule(parent._dueTime, Timeout);
176+
var timer = parent._scheduler.Schedule(this, parent._dueTime, (_, state) => state.Timeout());
177177

178178
original.Disposable = parent._source.SubscribeSafe(this);
179179

180180
return StableCompositeDisposable.Create(_subscription, timer);
181181
}
182182

183-
private void Timeout()
183+
private IDisposable Timeout()
184184
{
185185
var timerWins = false;
186186

@@ -192,6 +192,8 @@ private void Timeout()
192192

193193
if (timerWins)
194194
_subscription.Disposable = _other.SubscribeSafe(GetForwarder());
195+
196+
return Disposable.Empty;
195197
}
196198

197199
public override void OnNext(TSource value)

0 commit comments

Comments
 (0)