Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ internal interface IQueryLanguage
IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source);
IObservable<TSource> Repeat<TSource>(IObservable<TSource> source);
IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount);
IObservable<TSource> RepeatWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler);
IObservable<TSource> Retry<TSource>(IObservable<TSource> source);
IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount);
IObservable<TSource> RetryWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<Exception>, IObservable<TSignal>> handler);
Expand Down
25 changes: 25 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,31 @@ public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> sou
return s_impl.Repeat<TSource>(source, repeatCount);
}

/// <summary>
/// Repeatedly resubscribes to the source observable after a normal completion and when the observable
/// returned by a handler produces an arbitrary item.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <typeparam name="TSignal">The arbitrary element type signaled by the handler observable.</typeparam>
/// <param name="source">Observable sequence to keep repeating when it successfully terminates.</param>
/// <param name="handler">The function that is called for each observer and takes an observable sequence objects.
/// It should return an observable of arbitrary items that should signal that arbitrary item in
/// response to receiving the completion signal from the source observable. If this observable signals
/// a terminal event, the sequence is terminated with that signal instead.</param>
/// <returns>An observable sequence producing the elements of the given sequence repeatedly while each repetition terminates successfully.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentNullException"><paramref name="handler"/> is null.</exception>
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(this IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (handler == null)
throw new ArgumentNullException(nameof(handler));

return s_impl.RepeatWhen(source, handler);
}


#endregion

#region + Retry +
Expand Down
170 changes: 170 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/RepeatWhen.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class RepeatWhen<T, U> : IObservable<T>
{
readonly IObservable<T> source;

readonly Func<IObservable<object>, IObservable<U>> handler;

internal RepeatWhen(IObservable<T> source, Func<IObservable<object>, IObservable<U>> handler)
{
this.source = source;
this.handler = handler;
}

public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}

var completeSignals = new Subject<object>();
var redo = default(IObservable<U>);

try
{
redo = handler(completeSignals);
if (redo == null)
{
throw new NullReferenceException("The handler returned a null IObservable");
}
}
catch (Exception ex)
{
observer.OnError(ex);
return Disposable.Empty;
}

var parent = new MainObserver(observer, source, new RedoSerializedObserver<object>(completeSignals));

var d = redo.SubscribeSafe(parent.handlerObserver);
Disposable.SetSingle(ref parent.handlerUpstream, d);

parent.HandlerNext();

return parent;
}

sealed class MainObserver : Sink<T>, IObserver<T>
{
readonly IObserver<Exception> errorSignal;

internal readonly HandlerObserver handlerObserver;

readonly IObservable<T> source;

IDisposable upstream;

internal IDisposable handlerUpstream;

int trampoline;

int halfSerializer;

Exception error;

internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal) : base(downstream)
{
this.source = source;
this.errorSignal = errorSignal;
this.handlerObserver = new HandlerObserver(this);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref upstream);
Disposable.TryDispose(ref handlerUpstream);
}
base.Dispose(disposing);
}

public void OnCompleted()
{
if (Disposable.TrySetSerial(ref upstream, null))
{
errorSignal.OnNext(null);
}

}

public void OnError(Exception error)
{
HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error);
}

public void OnNext(T value)
{
HalfSerializer.ForwardOnNext(this, value, ref halfSerializer, ref this.error);
}

internal void HandlerError(Exception error)
{
HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error);
}

internal void HandlerComplete()
{
HalfSerializer.ForwardOnCompleted(this, ref halfSerializer, ref this.error);
}

internal void HandlerNext()
{
if (Interlocked.Increment(ref trampoline) == 1)
{
do
{
var sad = new SingleAssignmentDisposable();
if (Interlocked.CompareExchange(ref upstream, sad, null) != null)
{
return;
}

sad.Disposable = source.SubscribeSafe(this);
}
while (Interlocked.Decrement(ref trampoline) != 0);
}
}

internal sealed class HandlerObserver : IObserver<U>
{
readonly MainObserver main;

internal HandlerObserver(MainObserver main)
{
this.main = main;
}

public void OnCompleted()
{
main.HandlerComplete();
}

public void OnError(Exception error)
{
main.HandlerError(error);
}

public void OnNext(U value)
{
main.HandlerNext();
}
}
}

}
}
Loading