From 6cc401f0b51d1656f10abff777bfe9d4126662da Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Sat, 12 Sep 2015 14:09:15 +0200 Subject: [PATCH] Added lazy RefCount operator for IObservables. Lazy RefCount connects like RefCount but may delay disconnection. This is useful whenever a lot of connect/disconnect cycles are expected within a short timespan but with a significant overhead in connecting/disconnecting. Some unit tests have been added. Lazy RefCount has been excluded from methods that must be present for Qbservable as well. I leave it up to others to decide what Lazy RefCount means for Qbservable and whether there should be an implementation. --- .../System.Reactive/Linq/IQueryLanguage.cs | 2 + .../Linq/Observable.Binding.cs | 42 +++++ .../Linq/Observable/RefCount.cs | 171 ++++++++++++----- .../Linq/Qbservable.Generated.cs | 72 +++++++- .../Linq/QueryLanguage.Binding.cs | 12 +- .../Api/ApiApprovalTests.Core.approved.txt | 4 + .../Tests/Linq/Observable/RefCountTest.cs | 174 ++++++++++++++++++ 7 files changed, 429 insertions(+), 48 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs b/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs index facb74de4f..82c8cbba5d 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs @@ -307,6 +307,8 @@ internal interface IQueryLanguage IConnectableObservable PublishLast(IObservable source); IObservable PublishLast(IObservable source, Func, IObservable> selector); IObservable RefCount(IConnectableObservable source); + IObservable RefCount(IConnectableObservable source, TimeSpan disconnectDelay); + IObservable RefCount(IConnectableObservable source, TimeSpan disconnectDelay, IScheduler schedulder); IConnectableObservable Replay(IObservable source); IConnectableObservable Replay(IObservable source, IScheduler scheduler); IObservable Replay(IObservable source, Func, IObservable> selector); diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs index efe8a39c91..a81fbb072f 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs @@ -203,6 +203,48 @@ public static IObservable RefCount(this IConnectableObservable return s_impl.RefCount(source); } + /// + /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + /// + /// The type of the elements in the source sequence. + /// Connectable observable sequence. + /// The time span that should be waited before possibly unsubscribing from the connectable observable. + /// An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + /// is null. + public static IObservable RefCount(this IConnectableObservable source, TimeSpan disconnectDelay) + { + if (source == null) + throw new ArgumentNullException("source"); + + if (disconnectDelay < TimeSpan.Zero) + throw new ArgumentException("disconnectDelay"); + + return s_impl.RefCount(source, disconnectDelay); + } + + /// + /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + /// + /// The type of the elements in the source sequence. + /// Connectable observable sequence. + /// The time span that should be waited before possibly unsubscribing from the connectable observable. + /// The scheduler to use for delayed unsubscription. + /// An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + /// is null. + public static IObservable RefCount(this IConnectableObservable source, TimeSpan disconnectDelay, IScheduler scheduler) + { + if (source == null) + throw new ArgumentNullException("source"); + + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + if (disconnectDelay < TimeSpan.Zero) + throw new ArgumentException("disconnectDelay"); + + return s_impl.RefCount(source, disconnectDelay, scheduler); + } + #endregion #region + AutoConnect + diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs index 0d4d7960b3..869e8fa879 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs @@ -2,78 +2,159 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. +using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq.ObservableImpl { - internal sealed class RefCount : Producer._> + internal static class RefCount { - private readonly IConnectableObservable _source; - - private readonly object _gate; - private int _count; - private IDisposable _connectableSubscription; - - public RefCount(IConnectableObservable source) + internal sealed class Eager : Producer { - _source = source; - _gate = new object(); - _count = 0; - _connectableSubscription = default(IDisposable); - } + private readonly IConnectableObservable _source; - protected override _ CreateSink(IObserver observer) => new _(observer, this); - - protected override void Run(_ sink) => sink.Run(); - - internal sealed class _ : IdentitySink - { - readonly RefCount _parent; + private readonly object _gate; + private int _count; + private IDisposable _connectableSubscription; - public _(IObserver observer, RefCount parent) - : base(observer) + public Eager(IConnectableObservable source) { - this._parent = parent; + _source = source; + _gate = new object(); + _count = 0; + _connectableSubscription = default(IDisposable); } - public void Run() + protected override _ CreateSink(IObserver observer) => new _(observer, this); + + protected override void Run(_ sink) => sink.Run(); + + internal sealed class _ : IdentitySink { - base.Run(_parent._source); + readonly Eager _parent; + + public _(IObserver observer, Eager parent) + : base(observer) + { + this._parent = parent; + } - lock (_parent._gate) + public void Run() { - if (++_parent._count == 1) + base.Run(_parent._source); + + lock (_parent._gate) { - // We need to set _connectableSubscription to something - // before Connect because if Connect terminates synchronously, - // Dispose(bool) gets executed and will try to dispose - // _connectableSubscription of null. - // ?.Dispose() is no good because the dispose action has to be - // executed anyway. - // We can't inline SAD either because the IDisposable of Connect - // may belong to the wrong connection. - var sad = new SingleAssignmentDisposable(); - _parent._connectableSubscription = sad; - - sad.Disposable = _parent._source.Connect(); + if (++_parent._count == 1) + { + // We need to set _connectableSubscription to something + // before Connect because if Connect terminates synchronously, + // Dispose(bool) gets executed and will try to dispose + // _connectableSubscription of null. + // ?.Dispose() is no good because the dispose action has to be + // executed anyway. + // We can't inline SAD either because the IDisposable of Connect + // may belong to the wrong connection. + var sad = new SingleAssignmentDisposable(); + _parent._connectableSubscription = sad; + + sad.Disposable = _parent._source.Connect(); + } + } + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + { + lock (_parent._gate) + { + if (--_parent._count == 0) + { + _parent._connectableSubscription.Dispose(); + } + } } } } + } + + internal sealed class Lazy : Producer + { + private readonly object _gate; + private readonly IScheduler _scheduler; + private readonly TimeSpan _disconnectTime; + private readonly IConnectableObservable _source; + private readonly SerialDisposable _serial = new SerialDisposable(); + + private int _count; + private IDisposable _connectableSubscription; - protected override void Dispose(bool disposing) + public Lazy(IConnectableObservable source, TimeSpan disconnectTime, IScheduler scheduler) { - base.Dispose(disposing); + _source = source; + _gate = new object(); + _disconnectTime = disconnectTime; + _scheduler = scheduler; + _count = 0; + _connectableSubscription = default(IDisposable); + } - if (disposing) + protected override _ CreateSink(IObserver observer) => new _(observer); + + protected override void Run(_ sink) => sink.Run(this); + + internal sealed class _ : IdentitySink + { + public _(IObserver observer) + : base(observer) { - lock (_parent._gate) + } + + public void Run(Lazy parent) + { + var subscription = parent._source.SubscribeSafe(this); + + lock (parent._gate) { - if (--_parent._count == 0) + if (++parent._count == 1) { - _parent._connectableSubscription.Dispose(); + if (parent._connectableSubscription == null) + parent._connectableSubscription = parent._source.Connect(); + + parent._serial.Disposable = new SingleAssignmentDisposable(); } } + + SetUpstream(Disposable.Create(() => + { + subscription.Dispose(); + + lock (parent._gate) + { + if (--parent._count == 0) + { + var cancelable = (SingleAssignmentDisposable)parent._serial.Disposable; + + cancelable.Disposable = parent._scheduler.Schedule(cancelable, parent._disconnectTime, (self, state) => + { + lock (parent._gate) + { + if (object.ReferenceEquals(parent._serial.Disposable, state)) + { + parent._connectableSubscription.Dispose(); + parent._connectableSubscription = null; + } + } + + return Disposable.Empty; + }); + } + } + })); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs b/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs index 33f5bbf4e7..0f6881421e 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs @@ -1,5 +1,5 @@ -/* - * WARNING: Auto-generated file (05/28/2018 22:20:18) +/* + * WARNING: Auto-generated file (06/12/2018 13:00:48) * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory). */ @@ -10667,6 +10667,74 @@ public static IQbservable RefCount(this IQbservableProvider pr ); } + /// + /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + /// + /// Query provider used to construct the data source. + /// The type of the elements in the source sequence. + /// Connectable observable sequence. + /// The time span that should be waited before possibly unsubscribing from the connectable observable. + /// An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + /// + /// is null. + public static IQbservable RefCount(this IQbservableProvider provider, IConnectableObservable source, TimeSpan disconnectDelay) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + if (source == null) + throw new ArgumentNullException(nameof(source)); + + return provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.RefCount(default(IQbservableProvider), default(IConnectableObservable), default(TimeSpan))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), +#endif + Expression.Constant(provider, typeof(IQbservableProvider)), + Expression.Constant(source, typeof(IConnectableObservable)), + Expression.Constant(disconnectDelay, typeof(TimeSpan)) + ) + ); + } + + /// + /// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + /// + /// Query provider used to construct the data source. + /// The type of the elements in the source sequence. + /// Connectable observable sequence. + /// The time span that should be waited before possibly unsubscribing from the connectable observable. + /// The scheduler to use for delayed unsubscription. + /// An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + /// + /// is null. + public static IQbservable RefCount(this IQbservableProvider provider, IConnectableObservable source, TimeSpan disconnectDelay, IScheduler scheduler) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + if (source == null) + throw new ArgumentNullException(nameof(source)); + if (scheduler == null) + throw new ArgumentNullException(nameof(scheduler)); + + return provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.RefCount(default(IQbservableProvider), default(IConnectableObservable), default(TimeSpan), default(IScheduler))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), +#endif + Expression.Constant(provider, typeof(IQbservableProvider)), + Expression.Constant(source, typeof(IConnectableObservable)), + Expression.Constant(disconnectDelay, typeof(TimeSpan)), + Expression.Constant(scheduler, typeof(IScheduler)) + ) + ); + } + /// /// Generates an observable sequence that repeats the given element infinitely. /// diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs index cbaafa8556..f7f486d367 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs @@ -68,7 +68,17 @@ public virtual IObservable PublishLast(IObservable RefCount(IConnectableObservable source) { - return new RefCount(source); + return new RefCount.Eager(source); + } + + public virtual IObservable RefCount(IConnectableObservable source, TimeSpan disconnectTime) + { + return RefCount(source, disconnectTime, Scheduler.Default); + } + + public virtual IObservable RefCount(IConnectableObservable source, TimeSpan disconnectTime, IScheduler scheduler) + { + return new RefCount.Lazy(source, disconnectTime, scheduler); } #endregion diff --git a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt index 995069a93f..6b73f70a70 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt +++ b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt @@ -1246,6 +1246,8 @@ namespace System.Reactive.Linq public static System.IObservable Range(int start, int count) { } public static System.IObservable Range(int start, int count, System.Reactive.Concurrency.IScheduler scheduler) { } public static System.IObservable RefCount(this System.Reactive.Subjects.IConnectableObservable source) { } + public static System.IObservable RefCount(this System.Reactive.Subjects.IConnectableObservable source, System.TimeSpan disconnectDelay) { } + public static System.IObservable RefCount(this System.Reactive.Subjects.IConnectableObservable source, System.TimeSpan disconnectDelay, System.Reactive.Concurrency.IScheduler scheduler) { } public static System.IObservable Repeat(TResult value) { } public static System.IObservable Repeat(TResult value, System.Reactive.Concurrency.IScheduler scheduler) { } public static System.IObservable Repeat(TResult value, int repeatCount) { } @@ -1963,6 +1965,8 @@ namespace System.Reactive.Linq public static System.Reactive.Linq.IQbservable Range(this System.Reactive.Linq.IQbservableProvider provider, int start, int count) { } public static System.Reactive.Linq.IQbservable Range(this System.Reactive.Linq.IQbservableProvider provider, int start, int count, System.Reactive.Concurrency.IScheduler scheduler) { } public static System.Reactive.Linq.IQbservable RefCount(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable source) { } + public static System.Reactive.Linq.IQbservable RefCount(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable source, System.TimeSpan disconnectDelay) { } + public static System.Reactive.Linq.IQbservable RefCount(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable source, System.TimeSpan disconnectDelay, System.Reactive.Concurrency.IScheduler scheduler) { } public static System.Reactive.Linq.IQbservable Repeat(this System.Reactive.Linq.IQbservableProvider provider, TResult value) { } public static System.Reactive.Linq.IQbservable Repeat(this System.Reactive.Linq.IQbservableProvider provider, TResult value, int repeatCount) { } public static System.Reactive.Linq.IQbservable Repeat(this System.Reactive.Linq.IQbservableProvider provider, TResult value, int repeatCount, System.Reactive.Concurrency.IScheduler scheduler) { } diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs index 9f9da33308..5b4cac9c46 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RefCountTest.cs @@ -184,5 +184,179 @@ public void RefCount_Publish() ); } + [Fact] + public void LazyRefCount_ArgumentChecking() + { + ReactiveAssert.Throws(() => Observable.RefCount(null, TimeSpan.FromSeconds(2))); + } + + [Fact] + public void LazyRefCount_ConnectsOnFirst() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 1), + OnNext(220, 2), + OnNext(230, 3), + OnNext(240, 4), + OnCompleted(250) + ); + + var subject = new MySubject(); + + var conn = new ConnectableObservable(xs, subject); + + var res = scheduler.Start(() => + conn.RefCount(TimeSpan.FromSeconds(2)) + ); + + res.Messages.AssertEqual( + OnNext(210, 1), + OnNext(220, 2), + OnNext(230, 3), + OnNext(240, 4), + OnCompleted(250) + ); + + Assert.True(subject.Disposed); + } + + [Fact] + public void LazyRefCount_NotConnected() + { + var scheduler = new TestScheduler(); + var disconnected = false; + var count = 0; + var xs = Observable.Defer(() => + { + count++; + return Observable.Create(obs => + { + return () => { disconnected = true; }; + }); + }); + + var subject = new MySubject(); + + var conn = new ConnectableObservable(xs, subject); + var refd = conn.RefCount(TimeSpan.FromTicks(20), scheduler); + + var dis1 = refd.Subscribe(); + Assert.Equal(1, count); + Assert.Equal(1, subject.SubscribeCount); + Assert.False(disconnected); + + var dis2 = refd.Subscribe(); + Assert.Equal(1, count); + Assert.Equal(2, subject.SubscribeCount); + Assert.False(disconnected); + + dis1.Dispose(); + Assert.False(disconnected); + dis2.Dispose(); + Assert.False(disconnected); + + scheduler.AdvanceBy(19); + Assert.False(disconnected); + + scheduler.AdvanceBy(1); + Assert.True(disconnected); + disconnected = false; + + var dis3 = refd.Subscribe(); + Assert.Equal(2, count); + Assert.Equal(3, subject.SubscribeCount); + Assert.False(disconnected); + + dis3.Dispose(); + scheduler.AdvanceBy(20); + Assert.True(disconnected); + } + + [Fact] + public void LazyRefCount_OnError() + { + var ex = new Exception(); + var xs = Observable.Throw(ex, Scheduler.Immediate); + + var res = xs.Publish().RefCount(TimeSpan.FromSeconds(2)); + + res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception()); + res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception()); + } + + [Fact] + public void LazyRefCount_Publish() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 1), + OnNext(220, 2), + OnNext(230, 3), + OnNext(240, 4), + OnNext(250, 5), + OnNext(260, 6), + OnNext(270, 7), + OnNext(280, 8), + OnNext(290, 9), + OnCompleted(300) + ); + + var res = xs.Publish().RefCount(TimeSpan.FromTicks(9), scheduler); + + var d1 = default(IDisposable); + var o1 = scheduler.CreateObserver(); + scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); }); + scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); }); + + var d2 = default(IDisposable); + var o2 = scheduler.CreateObserver(); + scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); }); + scheduler.ScheduleAbsolute(275, () => + { + d2.Dispose(); + }); + + var d3 = default(IDisposable); + var o3 = scheduler.CreateObserver(); + scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); }); + scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); }); + + var d4 = default(IDisposable); + var o4 = scheduler.CreateObserver(); + scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); }); + scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); }); + + scheduler.Start(); + + o1.Messages.AssertEqual( + OnNext(220, 2), + OnNext(230, 3) + ); + + o2.Messages.AssertEqual( + OnNext(230, 3), + OnNext(240, 4), + OnNext(250, 5), + OnNext(260, 6), + OnNext(270, 7) + ); + + o3.Messages.AssertEqual( + OnNext(260, 6) + ); + + o4.Messages.AssertEqual( + OnNext(290, 9), + OnCompleted(300) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(215, 284), + Subscribe(285, 300) + ); + } } }