Skip to content

Commit cb8d44d

Browse files
authored
Merge pull request #760 from danielcweber/ReviewTaskObservableExtensions
Review TaskObservableExtensions
2 parents 58a55ba + 28f51f6 commit cb8d44d

File tree

2 files changed

+56
-54
lines changed

2 files changed

+56
-54
lines changed

Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ public void OnNext(Unit value)
101101
public Subscription(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync, IObserver<TResult> observer)
102102
{
103103
_subscription = subscribeAsync(observer, _cts.Token)
104-
.ToObservable()
105104
.Subscribe(new TaskCompletionObserver(observer));
106105
}
107106

@@ -180,7 +179,6 @@ public Subscription(Func<IObserver<TResult>, CancellationToken, Task<IDisposable
180179
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
181180
//
182181
subscribeAsync(observer, _cts.Token)
183-
.ToObservable()
184182
.Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
185183
}
186184

@@ -262,7 +260,6 @@ public Subscription(Func<IObserver<TResult>, CancellationToken, Task<Action>> su
262260
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
263261
//
264262
subscribeAsync(observer, _cts.Token)
265-
.ToObservable()
266263
.Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
267264
}
268265

Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System.Reactive.Concurrency;
6+
using System.Reactive.Disposables;
67
using System.Reactive.Linq;
78
using System.Reactive.Linq.ObservableImpl;
89
using System.Reactive.Subjects;
@@ -58,48 +59,30 @@ public static IObservable<Unit> ToObservable(this Task task, IScheduler schedule
5859

5960
private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
6061
{
61-
var res = default(IObservable<Unit>);
62-
6362
if (task.IsCompleted)
6463
{
6564
scheduler = scheduler ?? ImmediateScheduler.Instance;
6665

6766
switch (task.Status)
6867
{
69-
case TaskStatus.RanToCompletion:
70-
res = new Return<Unit>(Unit.Default, scheduler);
71-
break;
7268
case TaskStatus.Faulted:
73-
res = new Throw<Unit>(task.Exception.InnerException, scheduler);
74-
break;
69+
return new Throw<Unit>(task.Exception.InnerException, scheduler);
7570
case TaskStatus.Canceled:
76-
res = new Throw<Unit>(new TaskCanceledException(task), scheduler);
77-
break;
71+
return new Throw<Unit>(new TaskCanceledException(task), scheduler);
7872
}
79-
}
80-
else
81-
{
82-
//
83-
// Separate method to avoid closure in synchronous completion case.
84-
//
85-
res = ToObservableSlow(task, scheduler);
86-
}
8773

88-
return res;
89-
}
74+
return new Return<Unit>(Unit.Default, scheduler);
75+
}
9076

91-
private static IObservable<Unit> ToObservableSlow(Task task, IScheduler scheduler)
92-
{
9377
var subject = new AsyncSubject<Unit>();
94-
9578
var options = GetTaskContinuationOptions(scheduler);
9679

97-
task.ContinueWith(t => ToObservableDone(task, subject), options);
80+
task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);
9881

99-
return ToObservableResult(subject, scheduler);
82+
return subject.ToObservableResult(scheduler);
10083
}
10184

102-
private static void ToObservableDone(Task task, IObserver<Unit> subject)
85+
private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
10386
{
10487
switch (task.Status)
10588
{
@@ -116,6 +99,26 @@ private static void ToObservableDone(Task task, IObserver<Unit> subject)
11699
}
117100
}
118101

102+
internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
103+
{
104+
if (task.IsCompleted)
105+
{
106+
task.EmitTaskResult(observer);
107+
return Disposable.Empty;
108+
}
109+
110+
var cts = new CancellationDisposable();
111+
112+
task.ContinueWith(
113+
(t, observerObject) => t.EmitTaskResult((IObserver<Unit>)observerObject),
114+
observer,
115+
cts.Token,
116+
TaskContinuationOptions.ExecuteSynchronously,
117+
TaskScheduler.Default);
118+
119+
return cts;
120+
}
121+
119122
/// <summary>
120123
/// Returns an observable sequence that propagates the result of the task.
121124
/// </summary>
@@ -160,48 +163,30 @@ public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task
160163

161164
private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
162165
{
163-
var res = default(IObservable<TResult>);
164-
165166
if (task.IsCompleted)
166167
{
167168
scheduler = scheduler ?? ImmediateScheduler.Instance;
168169

169170
switch (task.Status)
170171
{
171-
case TaskStatus.RanToCompletion:
172-
res = new Return<TResult>(task.Result, scheduler);
173-
break;
174172
case TaskStatus.Faulted:
175-
res = new Throw<TResult>(task.Exception.InnerException, scheduler);
176-
break;
173+
return new Throw<TResult>(task.Exception.InnerException, scheduler);
177174
case TaskStatus.Canceled:
178-
res = new Throw<TResult>(new TaskCanceledException(task), scheduler);
179-
break;
175+
return new Throw<TResult>(new TaskCanceledException(task), scheduler);
180176
}
181-
}
182-
else
183-
{
184-
//
185-
// Separate method to avoid closure in synchronous completion case.
186-
//
187-
res = ToObservableSlow(task, scheduler);
188-
}
189177

190-
return res;
191-
}
178+
return new Return<TResult>(task.Result, scheduler);
179+
}
192180

193-
private static IObservable<TResult> ToObservableSlow<TResult>(Task<TResult> task, IScheduler scheduler)
194-
{
195181
var subject = new AsyncSubject<TResult>();
196-
197182
var options = GetTaskContinuationOptions(scheduler);
198183

199-
task.ContinueWith(t => ToObservableDone(task, subject), options);
184+
task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);
200185

201-
return ToObservableResult(subject, scheduler);
186+
return subject.ToObservableResult(scheduler);
202187
}
203188

204-
private static void ToObservableDone<TResult>(Task<TResult> task, IObserver<TResult> subject)
189+
private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
205190
{
206191
switch (task.Status)
207192
{
@@ -240,7 +225,7 @@ private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler sch
240225
return options;
241226
}
242227

243-
private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TResult> subject, IScheduler scheduler)
228+
private static IObservable<TResult> ToObservableResult<TResult>(this AsyncSubject<TResult> subject, IScheduler scheduler)
244229
{
245230
if (scheduler != null)
246231
{
@@ -250,6 +235,26 @@ private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TRe
250235
return subject.AsObservable();
251236
}
252237

238+
internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer)
239+
{
240+
if (task.IsCompleted)
241+
{
242+
task.EmitTaskResult(observer);
243+
return Disposable.Empty;
244+
}
245+
246+
var cts = new CancellationDisposable();
247+
248+
task.ContinueWith(
249+
(t, observerObject) => t.EmitTaskResult((IObserver<TResult>)observerObject),
250+
observer,
251+
cts.Token,
252+
TaskContinuationOptions.ExecuteSynchronously,
253+
TaskScheduler.Default);
254+
255+
return cts;
256+
}
257+
253258
/// <summary>
254259
/// Returns a task that will receive the last value or the exception produced by the observable sequence.
255260
/// </summary>

0 commit comments

Comments
 (0)