Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ internal interface IQueryLanguage
IConnectableObservable<TSource> PublishLast<TSource>(IObservable<TSource> source);
IObservable<TResult> PublishLast<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler schedulder);
IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source);
IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, IScheduler scheduler);
IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);
Expand Down
42 changes: 42 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,48 @@ public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable
return s_impl.RefCount<TSource>(source);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, TimeSpan disconnectDelay)
{
if (source == null)
throw new ArgumentNullException("source");

if (disconnectDelay < TimeSpan.Zero)
throw new ArgumentException("disconnectDelay");

return s_impl.RefCount<TSource>(source, disconnectDelay);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");

if (scheduler == null)
throw new ArgumentNullException("scheduler");

if (disconnectDelay < TimeSpan.Zero)
throw new ArgumentException("disconnectDelay");

return s_impl.RefCount<TSource>(source, disconnectDelay, scheduler);
}

#endregion

#region + AutoConnect +
Expand Down
171 changes: 126 additions & 45 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,159 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;

namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class RefCount<TSource> : Producer<TSource, RefCount<TSource>._>
internal static class RefCount<TSource>
{
private readonly IConnectableObservable<TSource> _source;

private readonly object _gate;
private int _count;
private IDisposable _connectableSubscription;

public RefCount(IConnectableObservable<TSource> source)
internal sealed class Eager : Producer<TSource, Eager._>
{
_source = source;
_gate = new object();
_count = 0;
_connectableSubscription = default(IDisposable);
}
private readonly IConnectableObservable<TSource> _source;

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);

protected override void Run(_ sink) => sink.Run();

internal sealed class _ : IdentitySink<TSource>
{
readonly RefCount<TSource> _parent;
private readonly object _gate;
private int _count;
private IDisposable _connectableSubscription;

public _(IObserver<TSource> observer, RefCount<TSource> parent)
: base(observer)
public Eager(IConnectableObservable<TSource> source)
{
this._parent = parent;
_source = source;
_gate = new object();
_count = 0;
_connectableSubscription = default(IDisposable);
}

public void Run()
protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);

protected override void Run(_ sink) => sink.Run();

internal sealed class _ : IdentitySink<TSource>
{
base.Run(_parent._source);
readonly Eager _parent;

public _(IObserver<TSource> observer, Eager parent)
: base(observer)
{
this._parent = parent;
}

lock (_parent._gate)
public void Run()
{
if (++_parent._count == 1)
base.Run(_parent._source);

lock (_parent._gate)
{
// We need to set _connectableSubscription to something
// before Connect because if Connect terminates synchronously,
// Dispose(bool) gets executed and will try to dispose
// _connectableSubscription of null.
// ?.Dispose() is no good because the dispose action has to be
// executed anyway.
// We can't inline SAD either because the IDisposable of Connect
// may belong to the wrong connection.
var sad = new SingleAssignmentDisposable();
_parent._connectableSubscription = sad;

sad.Disposable = _parent._source.Connect();
if (++_parent._count == 1)
{
// We need to set _connectableSubscription to something
// before Connect because if Connect terminates synchronously,
// Dispose(bool) gets executed and will try to dispose
// _connectableSubscription of null.
// ?.Dispose() is no good because the dispose action has to be
// executed anyway.
// We can't inline SAD either because the IDisposable of Connect
// may belong to the wrong connection.
var sad = new SingleAssignmentDisposable();
_parent._connectableSubscription = sad;

sad.Disposable = _parent._source.Connect();
}
}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
lock (_parent._gate)
{
if (--_parent._count == 0)
{
_parent._connectableSubscription.Dispose();
}
}
}
}
}
}

internal sealed class Lazy : Producer<TSource, Lazy._>
{
private readonly object _gate;
private readonly IScheduler _scheduler;
private readonly TimeSpan _disconnectTime;
private readonly IConnectableObservable<TSource> _source;
private readonly SerialDisposable _serial = new SerialDisposable();

private int _count;
private IDisposable _connectableSubscription;

protected override void Dispose(bool disposing)
public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
{
base.Dispose(disposing);
_source = source;
_gate = new object();
_disconnectTime = disconnectTime;
_scheduler = scheduler;
_count = 0;
_connectableSubscription = default(IDisposable);
}

if (disposing)
protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);

protected override void Run(_ sink) => sink.Run(this);

internal sealed class _ : IdentitySink<TSource>
{
public _(IObserver<TSource> observer)
: base(observer)
{
lock (_parent._gate)
}

public void Run(Lazy parent)
{
var subscription = parent._source.SubscribeSafe(this);

lock (parent._gate)
{
if (--_parent._count == 0)
if (++parent._count == 1)
{
_parent._connectableSubscription.Dispose();
if (parent._connectableSubscription == null)
parent._connectableSubscription = parent._source.Connect();

parent._serial.Disposable = new SingleAssignmentDisposable();
}
}

SetUpstream(Disposable.Create(() =>
{
subscription.Dispose();

lock (parent._gate)
{
if (--parent._count == 0)
{
var cancelable = (SingleAssignmentDisposable)parent._serial.Disposable;

cancelable.Disposable = parent._scheduler.Schedule(cancelable, parent._disconnectTime, (self, state) =>
{
lock (parent._gate)
{
if (object.ReferenceEquals(parent._serial.Disposable, state))
{
parent._connectableSubscription.Dispose();
parent._connectableSubscription = null;
}
}

return Disposable.Empty;
});
}
}
}));
}
}
}
Expand Down
72 changes: 70 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* WARNING: Auto-generated file (05/28/2018 22:20:18)
/*
* WARNING: Auto-generated file (06/12/2018 13:00:48)
* Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
*/

Expand Down Expand Up @@ -10667,6 +10667,74 @@ public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider pr
);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, TimeSpan disconnectDelay)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(TimeSpan))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(disconnectDelay, typeof(TimeSpan))
)
);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler scheduler)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(TimeSpan), default(IScheduler))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(disconnectDelay, typeof(TimeSpan)),
Expression.Constant(scheduler, typeof(IScheduler))
)
);
}

/// <summary>
/// Generates an observable sequence that repeats the given element infinitely.
/// </summary>
Expand Down
12 changes: 11 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,17 @@ public virtual IObservable<TResult> PublishLast<TSource, TResult>(IObservable<TS

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source)
{
return new RefCount<TSource>(source);
return new RefCount<TSource>.Eager(source);
}

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime)
{
return RefCount(source, disconnectTime, Scheduler.Default);
}

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
{
return new RefCount<TSource>.Lazy(source, disconnectTime, scheduler);
}

#endregion
Expand Down
Loading