Skip to content

Commit ea02b95

Browse files
committed
4.x: Implement RepeatWhen
1 parent 196b172 commit ea02b95

File tree

8 files changed

+911
-65
lines changed

8 files changed

+911
-65
lines changed

Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ internal interface IQueryLanguage
580580
IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source);
581581
IObservable<TSource> Repeat<TSource>(IObservable<TSource> source);
582582
IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount);
583+
IObservable<TSource> RepeatWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler);
583584
IObservable<TSource> Retry<TSource>(IObservable<TSource> source);
584585
IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount);
585586
IObservable<TSource> RetryWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<Exception>, IObservable<TSignal>> handler);

Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,31 @@ public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> sou
377377
return s_impl.Repeat<TSource>(source, repeatCount);
378378
}
379379

380+
/// <summary>
381+
/// Repeatedly resubscribes to the source observable after a normal completion and when the observable
382+
/// returned by a handler produces an arbitrary item.
383+
/// </summary>
384+
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
385+
/// <typeparam name="TSignal">The arbitrary element type signaled by the handler observable.</typeparam>
386+
/// <param name="source">Observable sequence to keep repeating when it successfully terminates.</param>
387+
/// <param name="handler">The function that is called for each observer and takes an observable sequence objects.
388+
/// It should return an observable of arbitrary items that should signal that arbitrary item in
389+
/// response to receiving the completion signal from the source observable. If this observable signals
390+
/// a terminal event, the sequence is terminated with that signal instead.</param>
391+
/// <returns>An observable sequence producing the elements of the given sequence repeatedly while each repetition terminates successfully.</returns>
392+
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
393+
/// <exception cref="ArgumentNullException"><paramref name="handler"/> is null.</exception>
394+
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(this IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler)
395+
{
396+
if (source == null)
397+
throw new ArgumentNullException(nameof(source));
398+
if (handler == null)
399+
throw new ArgumentNullException(nameof(handler));
400+
401+
return s_impl.RepeatWhen(source, handler);
402+
}
403+
404+
380405
#endregion
381406

382407
#region + Retry +
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Concurrent;
7+
using System.Collections.Generic;
8+
using System.Reactive.Disposables;
9+
using System.Reactive.Subjects;
10+
using System.Text;
11+
using System.Threading;
12+
13+
namespace System.Reactive.Linq.ObservableImpl
14+
{
15+
internal sealed class RepeatWhen<T, U> : IObservable<T>
16+
{
17+
readonly IObservable<T> source;
18+
19+
readonly Func<IObservable<object>, IObservable<U>> handler;
20+
21+
internal RepeatWhen(IObservable<T> source, Func<IObservable<object>, IObservable<U>> handler)
22+
{
23+
this.source = source;
24+
this.handler = handler;
25+
}
26+
27+
public IDisposable Subscribe(IObserver<T> observer)
28+
{
29+
if (observer == null)
30+
{
31+
throw new ArgumentNullException(nameof(observer));
32+
}
33+
34+
var completeSignals = new Subject<object>();
35+
var redo = default(IObservable<U>);
36+
37+
try
38+
{
39+
redo = handler(completeSignals);
40+
if (redo == null)
41+
{
42+
throw new NullReferenceException("The handler returned a null IObservable");
43+
}
44+
}
45+
catch (Exception ex)
46+
{
47+
observer.OnError(ex);
48+
return Disposable.Empty;
49+
}
50+
51+
var parent = new MainObserver(observer, source, new RedoSerializedObserver<object>(completeSignals));
52+
53+
var d = redo.SubscribeSafe(parent.handlerObserver);
54+
parent.handlerObserver.OnSubscribe(d);
55+
56+
parent.HandlerNext();
57+
58+
return parent;
59+
}
60+
61+
sealed class MainObserver : IObserver<T>, IDisposable
62+
{
63+
readonly IObserver<T> downstream;
64+
65+
readonly IObserver<Exception> errorSignal;
66+
67+
internal readonly HandlerObserver handlerObserver;
68+
69+
readonly IObservable<T> source;
70+
71+
SingleAssignmentDisposable upstream;
72+
73+
int trampoline;
74+
75+
int halfSerializer;
76+
77+
Exception error;
78+
79+
static readonly SingleAssignmentDisposable DISPOSED;
80+
81+
static MainObserver()
82+
{
83+
DISPOSED = new SingleAssignmentDisposable();
84+
DISPOSED.Dispose();
85+
}
86+
87+
internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal)
88+
{
89+
this.downstream = downstream;
90+
this.source = source;
91+
this.errorSignal = errorSignal;
92+
this.handlerObserver = new HandlerObserver(this);
93+
}
94+
95+
public void Dispose()
96+
{
97+
Interlocked.Exchange(ref upstream, DISPOSED)?.Dispose();
98+
handlerObserver.Dispose();
99+
}
100+
101+
public void OnCompleted()
102+
{
103+
104+
for (; ; )
105+
{
106+
var d = Volatile.Read(ref upstream);
107+
if (d == DISPOSED)
108+
{
109+
break;
110+
}
111+
if (Interlocked.CompareExchange(ref upstream, null, d) == d)
112+
{
113+
errorSignal.OnNext(null);
114+
d.Dispose();
115+
break;
116+
}
117+
}
118+
119+
}
120+
121+
public void OnError(Exception error)
122+
{
123+
if (Interlocked.Increment(ref halfSerializer) == 1)
124+
{
125+
downstream.OnError(error);
126+
Dispose();
127+
}
128+
}
129+
130+
public void OnNext(T value)
131+
{
132+
if (Interlocked.CompareExchange(ref halfSerializer, 1, 0) == 0)
133+
{
134+
downstream.OnNext(value);
135+
if (Interlocked.Decrement(ref halfSerializer) != 0)
136+
{
137+
var ex = error;
138+
if (ex == null)
139+
{
140+
downstream.OnCompleted();
141+
}
142+
else
143+
{
144+
downstream.OnError(ex);
145+
}
146+
Dispose();
147+
}
148+
}
149+
}
150+
151+
internal void HandlerError(Exception error)
152+
{
153+
this.error = error;
154+
if (Interlocked.Increment(ref halfSerializer) == 1)
155+
{
156+
downstream.OnError(error);
157+
Dispose();
158+
}
159+
}
160+
161+
internal void HandlerComplete()
162+
{
163+
if (Interlocked.Increment(ref halfSerializer) == 1)
164+
{
165+
downstream.OnCompleted();
166+
Dispose();
167+
}
168+
}
169+
170+
internal void HandlerNext()
171+
{
172+
if (Interlocked.Increment(ref trampoline) == 1)
173+
{
174+
do
175+
{
176+
var sad = new SingleAssignmentDisposable();
177+
if (Interlocked.CompareExchange(ref upstream, sad, null) != null)
178+
{
179+
return;
180+
}
181+
182+
sad.Disposable = source.SubscribeSafe(this);
183+
}
184+
while (Interlocked.Decrement(ref trampoline) != 0);
185+
}
186+
}
187+
188+
internal sealed class HandlerObserver : IObserver<U>, IDisposable
189+
{
190+
readonly MainObserver main;
191+
192+
IDisposable upstream;
193+
194+
internal HandlerObserver(MainObserver main)
195+
{
196+
this.main = main;
197+
}
198+
199+
internal void OnSubscribe(IDisposable d)
200+
{
201+
if (Interlocked.CompareExchange(ref upstream, d, null) != null)
202+
{
203+
d?.Dispose();
204+
}
205+
}
206+
207+
public void Dispose()
208+
{
209+
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
210+
}
211+
212+
public void OnCompleted()
213+
{
214+
main.HandlerComplete();
215+
Dispose();
216+
}
217+
218+
public void OnError(Exception error)
219+
{
220+
main.HandlerError(error);
221+
Dispose();
222+
}
223+
224+
public void OnNext(U value)
225+
{
226+
main.HandlerNext();
227+
}
228+
}
229+
}
230+
231+
}
232+
}

0 commit comments

Comments
 (0)