diff --git a/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs b/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs index f788b8ad5f..facb74de4f 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs @@ -580,6 +580,7 @@ internal interface IQueryLanguage IObservable> Materialize(IObservable source); IObservable Repeat(IObservable source); IObservable Repeat(IObservable source, int repeatCount); + IObservable RepeatWhen(IObservable source, Func, IObservable> handler); IObservable Retry(IObservable source); IObservable Retry(IObservable source, int retryCount); IObservable RetryWhen(IObservable source, Func, IObservable> handler); diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs index 3c545ff932..83ce5f973c 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs @@ -377,6 +377,31 @@ public static IObservable Repeat(this IObservable sou return s_impl.Repeat(source, repeatCount); } + /// + /// Repeatedly resubscribes to the source observable after a normal completion and when the observable + /// returned by a handler produces an arbitrary item. + /// + /// The type of the elements in the source sequence. + /// The arbitrary element type signaled by the handler observable. + /// Observable sequence to keep repeating when it successfully terminates. + /// The function that is called for each observer and takes an observable sequence objects. + /// It should return an observable of arbitrary items that should signal that arbitrary item in + /// response to receiving the completion signal from the source observable. If this observable signals + /// a terminal event, the sequence is terminated with that signal instead. + /// An observable sequence producing the elements of the given sequence repeatedly while each repetition terminates successfully. + /// is null. + /// is null. + public static IObservable RepeatWhen(this IObservable source, Func, IObservable> handler) + { + if (source == null) + throw new ArgumentNullException(nameof(source)); + if (handler == null) + throw new ArgumentNullException(nameof(handler)); + + return s_impl.RepeatWhen(source, handler); + } + + #endregion #region + Retry + diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RepeatWhen.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RepeatWhen.cs new file mode 100644 index 0000000000..bcc6d26c65 --- /dev/null +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RepeatWhen.cs @@ -0,0 +1,170 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Reactive.Subjects; +using System.Text; +using System.Threading; + +namespace System.Reactive.Linq.ObservableImpl +{ + internal sealed class RepeatWhen : IObservable + { + readonly IObservable source; + + readonly Func, IObservable> handler; + + internal RepeatWhen(IObservable source, Func, IObservable> handler) + { + this.source = source; + this.handler = handler; + } + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var completeSignals = new Subject(); + var redo = default(IObservable); + + try + { + redo = handler(completeSignals); + if (redo == null) + { + throw new NullReferenceException("The handler returned a null IObservable"); + } + } + catch (Exception ex) + { + observer.OnError(ex); + return Disposable.Empty; + } + + var parent = new MainObserver(observer, source, new RedoSerializedObserver(completeSignals)); + + var d = redo.SubscribeSafe(parent.handlerObserver); + Disposable.SetSingle(ref parent.handlerUpstream, d); + + parent.HandlerNext(); + + return parent; + } + + sealed class MainObserver : Sink, IObserver + { + readonly IObserver errorSignal; + + internal readonly HandlerObserver handlerObserver; + + readonly IObservable source; + + IDisposable upstream; + + internal IDisposable handlerUpstream; + + int trampoline; + + int halfSerializer; + + Exception error; + + internal MainObserver(IObserver downstream, IObservable source, IObserver errorSignal) : base(downstream) + { + this.source = source; + this.errorSignal = errorSignal; + this.handlerObserver = new HandlerObserver(this); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Disposable.TryDispose(ref upstream); + Disposable.TryDispose(ref handlerUpstream); + } + base.Dispose(disposing); + } + + public void OnCompleted() + { + if (Disposable.TrySetSerial(ref upstream, null)) + { + errorSignal.OnNext(null); + } + + } + + public void OnError(Exception error) + { + HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error); + } + + public void OnNext(T value) + { + HalfSerializer.ForwardOnNext(this, value, ref halfSerializer, ref this.error); + } + + internal void HandlerError(Exception error) + { + HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error); + } + + internal void HandlerComplete() + { + HalfSerializer.ForwardOnCompleted(this, ref halfSerializer, ref this.error); + } + + internal void HandlerNext() + { + if (Interlocked.Increment(ref trampoline) == 1) + { + do + { + var sad = new SingleAssignmentDisposable(); + if (Interlocked.CompareExchange(ref upstream, sad, null) != null) + { + return; + } + + sad.Disposable = source.SubscribeSafe(this); + } + while (Interlocked.Decrement(ref trampoline) != 0); + } + } + + internal sealed class HandlerObserver : IObserver + { + readonly MainObserver main; + + internal HandlerObserver(MainObserver main) + { + this.main = main; + } + + public void OnCompleted() + { + main.HandlerComplete(); + } + + public void OnError(Exception error) + { + main.HandlerError(error); + } + + public void OnNext(U value) + { + main.HandlerNext(); + } + } + } + + } +} diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs index 28594ea15a..e7870edded 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs @@ -1,4 +1,8 @@ -using System; +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Reactive.Disposables; @@ -44,19 +48,18 @@ public IDisposable Subscribe(IObserver observer) return Disposable.Empty; } - var parent = new MainObserver(observer, source, new SerializedObserver(errorSignals)); + var parent = new MainObserver(observer, source, new RedoSerializedObserver(errorSignals)); var d = redo.SubscribeSafe(parent.handlerObserver); - parent.handlerObserver.OnSubscribe(d); + Disposable.SetSingle(ref parent.handlerUpstream, d); parent.HandlerNext(); return parent; } - sealed class MainObserver : IObserver, IDisposable + sealed class MainObserver : Sink, IObserver { - readonly IObserver downstream; readonly IObserver errorSignal; @@ -65,6 +68,7 @@ sealed class MainObserver : IObserver, IDisposable readonly IObservable source; IDisposable upstream; + internal IDisposable handlerUpstream; int trampoline; @@ -72,85 +76,49 @@ sealed class MainObserver : IObserver, IDisposable Exception error; - internal MainObserver(IObserver downstream, IObservable source, IObserver errorSignal) + internal MainObserver(IObserver downstream, IObservable source, IObserver errorSignal) : base(downstream) { - this.downstream = downstream; this.source = source; this.errorSignal = errorSignal; this.handlerObserver = new HandlerObserver(this); } - public void Dispose() + protected override void Dispose(bool disposing) { - Disposable.TryDispose(ref upstream); - handlerObserver.Dispose(); + if (disposing) + { + Disposable.TryDispose(ref upstream); + Disposable.TryDispose(ref handlerUpstream); + } + base.Dispose(disposing); } public void OnCompleted() { - if (Interlocked.Increment(ref halfSerializer) == 1) - { - downstream.OnCompleted(); - Dispose(); - } + HalfSerializer.ForwardOnCompleted(this, ref halfSerializer, ref this.error); } public void OnError(Exception error) { - for (; ; ) + if (Disposable.TrySetSerial(ref upstream, null)) { - var d = Volatile.Read(ref upstream); - if (d == BooleanDisposable.True) - { - break; - } - if (Interlocked.CompareExchange(ref upstream, null, d) == d) - { - errorSignal.OnNext(error); - d.Dispose(); - break; - } + errorSignal.OnNext(error); } } public void OnNext(T value) { - if (Interlocked.CompareExchange(ref halfSerializer, 1, 0) == 0) - { - downstream.OnNext(value); - if (Interlocked.Decrement(ref halfSerializer) != 0) - { - var ex = error; - if (ex == null) - { - downstream.OnCompleted(); - } - else - { - downstream.OnError(ex); - } - Dispose(); - } - } + HalfSerializer.ForwardOnNext(this, value, ref halfSerializer, ref this.error); } internal void HandlerError(Exception error) { - this.error = error; - if (Interlocked.Increment(ref halfSerializer) == 1) - { - downstream.OnError(error); - Dispose(); - } + HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error); } internal void HandlerComplete() { - if (Interlocked.Increment(ref halfSerializer) == 1) - { - downstream.OnCompleted(); - Dispose(); - } + HalfSerializer.ForwardOnCompleted(this, ref halfSerializer, ref this.error); } internal void HandlerNext() @@ -171,37 +139,23 @@ internal void HandlerNext() } } - internal sealed class HandlerObserver : IObserver, IDisposable + internal sealed class HandlerObserver : IObserver { readonly MainObserver main; - IDisposable upstream; - internal HandlerObserver(MainObserver main) { this.main = main; } - internal void OnSubscribe(IDisposable d) - { - Disposable.SetSingle(ref upstream, d); - } - - public void Dispose() - { - Disposable.TryDispose(ref upstream); - } - public void OnCompleted() { main.HandlerComplete(); - Dispose(); } public void OnError(Exception error) { main.HandlerError(error); - Dispose(); } public void OnNext(U value) @@ -210,97 +164,97 @@ public void OnNext(U value) } } } + } - sealed class SerializedObserver : IObserver - { - readonly IObserver downstream; - - int wip; + internal sealed class RedoSerializedObserver : IObserver + { + readonly IObserver downstream; - Exception terminalException; + int wip; - static readonly Exception DONE = new Exception(); + Exception terminalException; - static readonly Exception SIGNALED = new Exception(); + static readonly Exception DONE = new Exception(); - readonly ConcurrentQueue queue; + static readonly Exception SIGNALED = new Exception(); - internal SerializedObserver(IObserver downstream) - { - this.downstream = downstream; - this.queue = new ConcurrentQueue(); - } + readonly ConcurrentQueue queue; - public void OnCompleted() - { - if (Interlocked.CompareExchange(ref terminalException, DONE, null) == null) - { - Drain(); - } - } + internal RedoSerializedObserver(IObserver downstream) + { + this.downstream = downstream; + this.queue = new ConcurrentQueue(); + } - public void OnError(Exception error) + public void OnCompleted() + { + if (Interlocked.CompareExchange(ref terminalException, DONE, null) == null) { - if (Interlocked.CompareExchange(ref terminalException, error, null) == null) - { - Drain(); - } + Drain(); } + } - public void OnNext(Exception value) + public void OnError(Exception error) + { + if (Interlocked.CompareExchange(ref terminalException, error, null) == null) { - queue.Enqueue(value); Drain(); } + } - void Clear() - { - while (queue.TryDequeue(out var _)) ; - } + public void OnNext(X value) + { + queue.Enqueue(value); + Drain(); + } - void Drain() + void Clear() + { + while (queue.TryDequeue(out var _)) ; + } + + void Drain() + { + if (Interlocked.Increment(ref wip) != 1) { - if (Interlocked.Increment(ref wip) != 1) - { - return; - } + return; + } - int missed = 1; + int missed = 1; - for (; ; ) + for (; ; ) + { + var ex = Volatile.Read(ref terminalException); + if (ex != null) { - var ex = Volatile.Read(ref terminalException); - if (ex != null) + if (ex != SIGNALED) { - if (ex != SIGNALED) + Interlocked.Exchange(ref terminalException, SIGNALED); + if (ex != DONE) { - Interlocked.Exchange(ref terminalException, SIGNALED); - if (ex != DONE) - { - downstream.OnError(ex); - } - else - { - downstream.OnCompleted(); - } + downstream.OnError(ex); } - Clear(); - } - else - { - while (queue.TryDequeue(out var item)) + else { - downstream.OnNext(item); + downstream.OnCompleted(); } } - - - missed = Interlocked.Add(ref wip, -missed); - if (missed == 0) + Clear(); + } + else + { + while (queue.TryDequeue(out var item)) { - break; + downstream.OnNext(item); } } + + + missed = Interlocked.Add(ref wip, -missed); + if (missed == 0) + { + break; + } } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs b/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs index 80cb107990..33f5bbf4e7 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs @@ -1,4 +1,4 @@ -/* +/* * WARNING: Auto-generated file (05/28/2018 22:20:18) * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory). */ @@ -10847,6 +10847,41 @@ public static IQbservable Repeat(this IQbservable sou ); } + /// + /// Repeatedly resubscribes to the source observable after a normal completion and when the observable + /// returned by a handler produces an arbitrary item. + /// + /// The type of the elements in the source sequence. + /// The arbitrary element type signaled by the handler observable. + /// Observable sequence to keep repeating when it successfully terminates. + /// The function that is called for each observer and takes an observable sequence objects. + /// It should return an observable of arbitrary items that should signal that arbitrary item in + /// response to receiving the completion signal from the source observable. If this observable signals + /// a terminal event, the sequence is terminated with that signal instead. + /// An observable sequence producing the elements of the given sequence repeatedly while each repetition terminates successfully. + /// is null. + /// is null. + public static IQbservable RepeatWhen(this IQbservable source, Expression, IObservable>> handler) + { + if (source == null) + throw new ArgumentNullException(nameof(source)); + if (handler == null) + throw new ArgumentNullException(nameof(handler)); + + return source.Provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.RepeatWhen(default(IQbservable), default(Expression, IObservable>>))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TSignal)), +#endif + source.Expression, + handler + ) + ); + } + /// /// Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications. /// This operator is a specialization of Multicast using a . diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs index 2c8fed154e..272ecccc1b 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs @@ -172,6 +172,11 @@ public virtual IObservable Repeat(IObservable source, return Enumerable.Repeat(source, repeatCount).Concat(); } + public virtual IObservable RepeatWhen(IObservable source, Func, IObservable> handler) + { + return new RepeatWhen(source, handler); + } + #endregion #region - Retry - diff --git a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt index b22c1dcd8f..995069a93f 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt +++ b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt @@ -1252,6 +1252,7 @@ namespace System.Reactive.Linq public static System.IObservable Repeat(TResult value, int repeatCount, System.Reactive.Concurrency.IScheduler scheduler) { } public static System.IObservable Repeat(this System.IObservable source) { } public static System.IObservable Repeat(this System.IObservable source, int repeatCount) { } + public static System.IObservable RepeatWhen(this System.IObservable source, System.Func, System.IObservable> handler) { } public static System.Reactive.Subjects.IConnectableObservable Replay(this System.IObservable source) { } public static System.Reactive.Subjects.IConnectableObservable Replay(this System.IObservable source, System.Reactive.Concurrency.IScheduler scheduler) { } public static System.IObservable Replay(this System.IObservable source, System.Func, System.IObservable> selector) { } @@ -1968,6 +1969,7 @@ namespace System.Reactive.Linq public static System.Reactive.Linq.IQbservable Repeat(this System.Reactive.Linq.IQbservableProvider provider, TResult value, System.Reactive.Concurrency.IScheduler scheduler) { } public static System.Reactive.Linq.IQbservable Repeat(this System.Reactive.Linq.IQbservable source) { } public static System.Reactive.Linq.IQbservable Repeat(this System.Reactive.Linq.IQbservable source, int repeatCount) { } + public static System.Reactive.Linq.IQbservable RepeatWhen(this System.Reactive.Linq.IQbservable source, System.Linq.Expressions.Expression, System.IObservable>> handler) { } public static System.Reactive.Linq.IQbservable Replay(this System.Reactive.Linq.IQbservable source, System.Linq.Expressions.Expression, System.IObservable>> selector) { } public static System.Reactive.Linq.IQbservable Replay(this System.Reactive.Linq.IQbservable source, System.Linq.Expressions.Expression, System.IObservable>> selector, int bufferSize) { } public static System.Reactive.Linq.IQbservable Replay(this System.Reactive.Linq.IQbservable source, System.Linq.Expressions.Expression, System.IObservable>> selector, int bufferSize, System.Reactive.Concurrency.IScheduler scheduler) { } diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RepeatWhenTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RepeatWhenTest.cs new file mode 100644 index 0000000000..ecc992e6e8 --- /dev/null +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RepeatWhenTest.cs @@ -0,0 +1,542 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using Microsoft.Reactive.Testing; +using Xunit; +using ReactiveTests.Dummies; +using System.Reflection; +using System.Threading; +using System.Reactive.Disposables; +using System.Reactive.Subjects; +using System.Runtime.CompilerServices; + +namespace ReactiveTests.Tests +{ + public class RepeatWhenTest : ReactiveTest + { + + [Fact] + public void RepeatWhen_ArgumentChecking() + { + ReactiveAssert.Throws(() => Observable.RepeatWhen(null, v => v)); + ReactiveAssert.Throws(() => Observable.RepeatWhen(Observable.Return(1), null)); + ReactiveAssert.Throws(() => DummyObservable.Instance.RepeatWhen(v => v).Subscribe(null)); + } + + [Fact] + public void RepeatWhen_Handler_Crash() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateColdObservable( + OnCompleted(10) + ); + + var ex = new InvalidOperationException(); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => { throw ex; }) + ); + + res.Messages.AssertEqual( + OnError(200, ex) + ); + + xs.Subscriptions.AssertEqual( + ); + } + + [Fact] + public void RepeatWhen_Handler_Error() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateColdObservable( + OnCompleted(10) + ); + + var ex = new InvalidOperationException(); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => v.Select(w => throw ex)) + ); + + res.Messages.AssertEqual( + OnError(210, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [Fact] + public void RepeatWhen_Handler_Completed() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateColdObservable( + OnCompleted(10) + ); + + var ex = new InvalidOperationException(); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => v.Take(1).Skip(1)) + ); + + res.Messages.AssertEqual( + OnCompleted(210) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [Fact] + public void RepeatWhen_Disposed() + { + var main = new Subject(); + var inner = new Subject(); + + var d = main.RepeatWhen(v => inner).Subscribe(); + + Assert.True(main.HasObservers); + Assert.True(inner.HasObservers); + + d.Dispose(); + + Assert.False(main.HasObservers); + Assert.False(inner.HasObservers); + } + + [Fact] + public void RepeatWhen_Handler_Completed_Disposes_Main() + { + var main = new Subject(); + var inner = new Subject(); + + var end = 0; + var items = 0; + var errors = 0; + + main.RepeatWhen(v => inner).Subscribe( + onNext: v => items++, + onError: e => errors++, + onCompleted: () => end++); + + Assert.True(main.HasObservers); + Assert.True(inner.HasObservers); + + inner.OnCompleted(); + + Assert.False(main.HasObservers); + Assert.False(inner.HasObservers); + + Assert.Equal(0, items); + Assert.Equal(0, errors); + Assert.Equal(1, end); + } + + [Fact] + public void RepeatWhen_Handler_Error_Disposes_Main() + { + var main = new Subject(); + var inner = new Subject(); + + var end = 0; + var items = 0; + var errors = 0; + + main.RepeatWhen(v => inner).Subscribe( + onNext: v => items++, + onError: e => errors++, + onCompleted: () => end++); + + Assert.True(main.HasObservers); + Assert.True(inner.HasObservers); + + inner.OnError(new InvalidOperationException()); + + Assert.False(main.HasObservers); + Assert.False(inner.HasObservers); + + Assert.Equal(0, items); + Assert.Equal(1, errors); + Assert.Equal(0, end); + } + + [Fact] + public void RepeatWhen_Basic() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateColdObservable( + OnNext(100, 1), + OnNext(150, 2), + OnNext(200, 3), + OnCompleted(250) + ); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => v) + ); + + res.Messages.AssertEqual( + OnNext(300, 1), + OnNext(350, 2), + OnNext(400, 3), + OnNext(550, 1), + OnNext(600, 2), + OnNext(650, 3), + OnNext(800, 1), + OnNext(850, 2), + OnNext(900, 3) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 450), + Subscribe(450, 700), + Subscribe(700, 950), + Subscribe(950, 1000) + ); + } + + [Fact] + public void RepeatWhen_Infinite() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateColdObservable( + OnNext(100, 1), + OnNext(150, 2), + OnNext(200, 3) + ); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => v) + ); + + res.Messages.AssertEqual( + OnNext(300, 1), + OnNext(350, 2), + OnNext(400, 3) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 1000) + ); + } + + [Fact] + public void RepeatWhen_Error() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateColdObservable( + OnNext(100, 1), + OnNext(150, 2), + OnNext(200, 3), + OnError(250, ex) + ); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => v) + ); + + res.Messages.AssertEqual( + OnNext(300, 1), + OnNext(350, 2), + OnNext(400, 3), + OnError(450, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 450) + ); + } + + [Fact] + public void RepeatWhen_Throws() + { + var scheduler1 = new TestScheduler(); + + var xs = Observable.Return(1, scheduler1).RepeatWhen(v => v); + + xs.Subscribe(x => { throw new InvalidOperationException(); }); + + ReactiveAssert.Throws(() => scheduler1.Start()); + + var scheduler2 = new TestScheduler(); + + var ys = Observable.Throw(new Exception(), scheduler2).RepeatWhen(v => v); + + ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); }); + + ReactiveAssert.Throws(() => scheduler2.Start()); + + var scheduler3 = new TestScheduler(); + + var zs = Observable.Return(1, scheduler3).RepeatWhen(v => v); + + var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }); + + scheduler3.ScheduleAbsolute(210, () => d.Dispose()); + + scheduler3.Start(); + + var xss = Observable.Create(new Func, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v => v); + + ReactiveAssert.Throws(() => xss.Subscribe()); + } + + [Fact] + public void RepeatWhen_RepeatCount_Basic() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateColdObservable( + OnNext(5, 1), + OnNext(10, 2), + OnNext(15, 3), + OnCompleted(20) + ); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => + { + var count = 0; + return v.TakeWhile(w => ++count < 3); + }) + ); + + res.Messages.AssertEqual( + OnNext(205, 1), + OnNext(210, 2), + OnNext(215, 3), + OnNext(225, 1), + OnNext(230, 2), + OnNext(235, 3), + OnNext(245, 1), + OnNext(250, 2), + OnNext(255, 3), + OnCompleted(260) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 220), + Subscribe(220, 240), + Subscribe(240, 260) + ); + } + + [Fact] + public void RepeatWhen_RepeatCount_Dispose() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateColdObservable( + OnNext(5, 1), + OnNext(10, 2), + OnNext(15, 3), + OnCompleted(20) + ); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => + { + var count = 0; + return v.TakeWhile(w => ++count < 3); + }), 231 + ); + + res.Messages.AssertEqual( + OnNext(205, 1), + OnNext(210, 2), + OnNext(215, 3), + OnNext(225, 1), + OnNext(230, 2) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 220), + Subscribe(220, 231) + ); + } + + [Fact] + public void RepeatWhen_RepeatCount_Infinite() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateColdObservable( + OnNext(100, 1), + OnNext(150, 2), + OnNext(200, 3) + ); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => + { + var count = 0; + return v.TakeWhile(w => ++count < 3); + }) + ); + + res.Messages.AssertEqual( + OnNext(300, 1), + OnNext(350, 2), + OnNext(400, 3) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 1000) + ); + } + + [Fact] + public void RepeatWhen_RepeatCount_Error() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateColdObservable( + OnNext(100, 1), + OnNext(150, 2), + OnNext(200, 3), + OnError(250, ex) + ); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => + { + var count = 0; + return v.TakeWhile(w => ++count < 3); + }) + ); + + res.Messages.AssertEqual( + OnNext(300, 1), + OnNext(350, 2), + OnNext(400, 3), + OnError(450, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 450) + ); + } + + [Fact] + public void RepeatWhen_RepeatCount_Throws() + { + var scheduler1 = new TestScheduler(); + + var xs = Observable.Return(1, scheduler1).RepeatWhen(v => + { + var count = 0; + return v.TakeWhile(w => ++count < 3); + }); + + xs.Subscribe(x => { throw new InvalidOperationException(); }); + + ReactiveAssert.Throws(() => scheduler1.Start()); + + var scheduler2 = new TestScheduler(); + + var ys = Observable.Throw(new Exception(), scheduler2).RepeatWhen(v => + { + var count = 0; + return v.TakeWhile(w => ++count < 3); + }); + + ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); }); + + ReactiveAssert.Throws(() => scheduler2.Start()); + + var scheduler3 = new TestScheduler(); + + var zs = Observable.Return(1, scheduler3).RepeatWhen(v => + { + var count = 0; + return v.TakeWhile(w => ++count < 100); + }); + + var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }); + + scheduler3.ScheduleAbsolute(10, () => d.Dispose()); + + scheduler3.Start(); + + var xss = Observable.Create(new Func, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v => + { + var count = 0; + return v.TakeWhile(w => ++count < 3); + }); + + ReactiveAssert.Throws(() => xss.Subscribe()); + } + + [Fact] + public void RepeatWhen_Observable_Repeat_Delayed() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateColdObservable( + OnNext(5, 1), + OnNext(10, 2), + OnNext(15, 3), + OnCompleted(20) + ); + + var res = scheduler.Start(() => + xs.RepeatWhen(v => + { + int[] count = { 0 }; + return v.SelectMany(w => { + int c = ++count[0]; + if (c == 3) + { + return Observable.Throw(ex); + } + return Observable.Return(1).Delay(TimeSpan.FromTicks(c * 100), scheduler); + }); + }) + ); + + res.Messages.AssertEqual( + OnNext(205, 1), + OnNext(210, 2), + OnNext(215, 3), + OnNext(325, 1), + OnNext(330, 2), + OnNext(335, 3), + OnNext(545, 1), + OnNext(550, 2), + OnNext(555, 3), + OnError(560, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 220), + Subscribe(320, 340), + Subscribe(540, 560) + ); + } + } +}