Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ internal interface IQueryLanguage
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler schedulder);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay, IScheduler schedulder);
IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source);
IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, IScheduler scheduler);
IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);
Expand Down
88 changes: 88 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,94 @@ public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable
return s_impl.RefCount(source, disconnectDelay, scheduler);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The minimum number of observers required to subscribe before establishing the connection to the source.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, int minObservers)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (minObservers <= 0)
{
throw new ArgumentOutOfRangeException(nameof(minObservers));
}

return s_impl.RefCount(source, minObservers);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The minimum number of observers required to subscribe before establishing the connection to the source.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay)
{
if (source == null)
{
throw new ArgumentNullException("source");
}

if (disconnectDelay < TimeSpan.Zero)
{
throw new ArgumentException("disconnectDelay");
}
if (minObservers <= 0)
{
throw new ArgumentOutOfRangeException(nameof(minObservers));
}

return s_impl.RefCount(source, minObservers, disconnectDelay);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The minimum number of observers required to subscribe before establishing the connection to the source.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay, IScheduler scheduler)
{
if (source == null)
{
throw new ArgumentNullException("source");
}

if (scheduler == null)
{
throw new ArgumentNullException("scheduler");
}

if (disconnectDelay < TimeSpan.Zero)
{
throw new ArgumentException("disconnectDelay");
}
if (minObservers <= 0)
{
throw new ArgumentOutOfRangeException(nameof(minObservers));
}


return s_impl.RefCount(source, minObservers, disconnectDelay, scheduler);
}

#endregion

#region + AutoConnect +
Expand Down
14 changes: 10 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ internal sealed class Eager : Producer<TSource, Eager._>
/// </summary>
private RefConnection _connection;

public Eager(IConnectableObservable<TSource> source)
private readonly int _minObservers;

public Eager(IConnectableObservable<TSource> source, int minObservers)
{
_source = source;
_gate = new object();
_minObservers = minObservers;
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
Expand Down Expand Up @@ -68,7 +71,7 @@ public void Run()
}

// this is the first observer, then connect
doConnect = conn._count++ == 0;
doConnect = ++conn._count == _parent._minObservers;
// save the current connection for this observer
_targetConnection = conn;
}
Expand Down Expand Up @@ -132,16 +135,19 @@ internal sealed class Lazy : Producer<TSource, Lazy._>
private readonly IScheduler _scheduler;
private readonly TimeSpan _disconnectTime;
private readonly IConnectableObservable<TSource> _source;
private readonly int _minObservers;

private IDisposable _serial;
private int _count;
private IDisposable _connectableSubscription;

public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers)
{
_source = source;
_gate = new object();
_disconnectTime = disconnectTime;
_scheduler = scheduler;
_minObservers = minObservers;
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
Expand All @@ -161,7 +167,7 @@ public void Run(Lazy parent)

lock (parent._gate)
{
if (++parent._count == 1)
if (++parent._count == parent._minObservers)
{
if (parent._connectableSubscription == null)
{
Expand Down
113 changes: 113 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10854,6 +10854,119 @@ public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider pr
);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The minimum number of observers required to subscribe before establishing the connection to the source.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));
if (minObservers <= 0)
throw new ArgumentOutOfRangeException(nameof(minObservers));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(minObservers, typeof(int))
)
);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The minimum number of observers required to subscribe before establishing the connection to the source.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));
if (minObservers <= 0)
throw new ArgumentOutOfRangeException(nameof(minObservers));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int), default(TimeSpan))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(minObservers, typeof(int)),
Expression.Constant(disconnectDelay, typeof(TimeSpan))
)
);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The minimum number of observers required to subscribe before establishing the connection to the source.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="minObservers"/> is non-positive.</exception>
public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectDelay, IScheduler scheduler)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
if (minObservers <= 0)
throw new ArgumentOutOfRangeException(nameof(minObservers));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int), default(TimeSpan), default(IScheduler))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(minObservers, typeof(int)),
Expression.Constant(disconnectDelay, typeof(TimeSpan)),
Expression.Constant(scheduler, typeof(IScheduler))
)
);
}

/// <summary>
/// Generates an observable sequence that repeats the given element infinitely.
/// </summary>
Expand Down
19 changes: 17 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public virtual IObservable<TResult> PublishLast<TSource, TResult>(IObservable<TS

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source)
{
return new RefCount<TSource>.Eager(source);
return RefCount(source, 1);
}

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime)
Expand All @@ -77,7 +77,22 @@ public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSo

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
{
return new RefCount<TSource>.Lazy(source, disconnectTime, scheduler);
return RefCount(source, 1, disconnectTime, scheduler);
}

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers)
{
return new RefCount<TSource>.Eager(source, minObservers);
}

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectTime)
{
return RefCount(source, minObservers, disconnectTime, Scheduler.Default);
}

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, int minObservers, TimeSpan disconnectTime, IScheduler scheduler)
{
return new RefCount<TSource>.Lazy(source, disconnectTime, scheduler, minObservers);
}

#endregion
Expand Down
Loading