Skip to content

Minimum subscribers for RefCount #406

@skarllot

Description

@skarllot

I would like to propose an overload for RefCount method to specify the minimum number of subscribers before connect:

/// <summary>
/// Returns an observable sequence that stays connected to the source
/// as long as there is at least the specified number of subscribers to
/// the observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minSubscribers">The minimum number of subscribers before connect.</param>
/// <returns>
/// An observable sequence that stays connected to the source as long
/// as there is at least the specified number of subscribers to the
/// observable sequence.
/// </returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, int minSubscribers)
{
    if (minSubscribers == 1)
        return Observable.RefCount(source);

    if (source == null)
        throw new ArgumentNullException(nameof(source));

    return new MinRefCountObservable<TSource>(source, minSubscribers);
}

sealed class MinRefCountObservable<T> : IDisposable, IObservable<T>
{
    private IDisposable connection;
    private int counter;
    readonly int minSubscribers;
    readonly IConnectableObservable<T> source;
    readonly object sync;

    public MinRefCountObservable(IConnectableObservable<T> source, int minSubscribers)
    {
        this.minSubscribers = minSubscribers;
        this.source = source;
        connection = null;
        counter = 0;
        sync = new object();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = new CompositeDisposable(source.Subscribe(observer), this);
        lock (sync)
        {
            if (++counter == minSubscribers)
                connection = source.Connect();
        }

        return subscription;
    }

    void IDisposable.Dispose()
    {
        lock (sync)
        {
            if (--counter != minSubscribers - 1)
                return;

            connection.Dispose();
            connection = null;
        }
    }
}

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions