From 6b4f148d414041a2a8eee668cb169ace678cc2ee Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 28 Jun 2018 16:52:45 +0200 Subject: [PATCH] 4.x: Improve the performance of ToObservable() --- .../Benchmarks.System.Reactive/Program.cs | 3 +- .../ToObservableBenchmark.cs | 30 ++++ .../Linq/Observable/ToObservable.cs | 134 +++++++++++------- .../Linq/QueryLanguage.Conversions.cs | 30 +++- 4 files changed, 145 insertions(+), 52 deletions(-) create mode 100644 Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/ToObservableBenchmark.cs diff --git a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs index 6c8af3bcce..6e73ebea8b 100644 --- a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs +++ b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs @@ -15,7 +15,8 @@ static void Main() typeof(ZipBenchmark), typeof(CombineLatestBenchmark), typeof(SwitchBenchmark), - typeof(BufferCountBenchmark) + typeof(BufferCountBenchmark), + typeof(ToObservableBenchmark) }); switcher.Run(); diff --git a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/ToObservableBenchmark.cs b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/ToObservableBenchmark.cs new file mode 100644 index 0000000000..b6d7e68f8c --- /dev/null +++ b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/ToObservableBenchmark.cs @@ -0,0 +1,30 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Linq; +using System.Threading; +using BenchmarkDotNet.Attributes; + +namespace Benchmarks.System.Reactive +{ + [MemoryDiagnoser] + public class ToObservableBenchmark + { + [Params(1, 10, 100, 1000, 10000, 100000, 1000000)] + public int N; + + int _store; + + [Benchmark] + public void Exact() + { + Enumerable.Range(1, N) + .ToObservable() + .Subscribe(v => Volatile.Write(ref _store, v)); + } + } +} diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ToObservable.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ToObservable.cs index c8f4e29e1e..18176c8057 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ToObservable.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ToObservable.cs @@ -5,15 +5,16 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Threading; namespace System.Reactive.Linq.ObservableImpl { - internal sealed class ToObservable : Producer._> + internal sealed class ToObservableRecursive : Producer._> { private readonly IEnumerable _source; private readonly IScheduler _scheduler; - public ToObservable(IEnumerable source, IScheduler scheduler) + public ToObservableRecursive(IEnumerable source, IScheduler scheduler) { _source = source; _scheduler = scheduler; @@ -21,21 +22,24 @@ public ToObservable(IEnumerable source, IScheduler scheduler) protected override _ CreateSink(IObserver observer) => new _(observer); - protected override void Run(_ sink) => sink.Run(this); + protected override void Run(_ sink) => sink.Run(_source, _scheduler); internal sealed class _ : IdentitySink { + IEnumerator _enumerator; + + volatile bool _disposed; + public _(IObserver observer) : base(observer) { } - public void Run(ToObservable parent) + public void Run(IEnumerable source, IScheduler scheduler) { - var e = default(IEnumerator); try { - e = parent._source.GetEnumerator(); + _enumerator = source.GetEnumerator(); } catch (Exception exception) { @@ -44,61 +48,44 @@ public void Run(ToObservable parent) return; } - var longRunning = parent._scheduler.AsLongRunning(); - if (longRunning != null) - { - // - // Long-running schedulers have the contract they should *never* prevent - // the work from starting, such that the scheduled work has the chance - // to observe the cancellation and perform proper clean-up. In this case, - // we're sure Loop will be entered, allowing us to dispose the enumerator. - // - SetUpstream(longRunning.ScheduleLongRunning((@this: this, e), (tuple, cancelable) => tuple.@this.Loop(tuple.e, cancelable))); - } - else - { - // - // We never allow the scheduled work to be cancelled. Instead, the flag - // is used to have LoopRec bail out and perform proper clean-up of the - // enumerator. - // - var flag = new BooleanDisposable(); - parent._scheduler.Schedule(new State(this, flag, e), (state, action) => state.sink.LoopRec(state, action)); - SetUpstream(flag); - } + // + // We never allow the scheduled work to be cancelled. Instead, the _disposed flag + // is used to have LoopRec bail out and perform proper clean-up of the + // enumerator. + // + scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler)); } - private struct State + protected override void Dispose(bool disposing) { - public readonly _ sink; - public readonly ICancelable flag; - public readonly IEnumerator enumerator; - - public State(_ sink, ICancelable flag, IEnumerator enumerator) + base.Dispose(disposing); + if (disposing) { - this.sink = sink; - this.flag = flag; - this.enumerator = enumerator; + _disposed = true; } } - private void LoopRec(State state, Action recurse) + private IDisposable LoopRec(IScheduler scheduler) { var hasNext = false; var ex = default(Exception); var current = default(TSource); - if (state.flag.IsDisposed) + var enumerator = _enumerator; + + if (_disposed) { - state.enumerator.Dispose(); - return; + _enumerator.Dispose(); + _enumerator = null; + + return Disposable.Empty; } try { - hasNext = state.enumerator.MoveNext(); + hasNext = enumerator.MoveNext(); if (hasNext) - current = state.enumerator.Current; + current = enumerator.Current; } catch (Exception exception) { @@ -107,22 +94,73 @@ private void LoopRec(State state, Action recurse) if (ex != null) { - state.enumerator.Dispose(); + enumerator.Dispose(); + _enumerator = null; ForwardOnError(ex); - return; + return Disposable.Empty; } if (!hasNext) { - state.enumerator.Dispose(); + enumerator.Dispose(); + _enumerator = null; ForwardOnCompleted(); - return; + return Disposable.Empty; } ForwardOnNext(current); - recurse(state); + + // + // We never allow the scheduled work to be cancelled. Instead, the _disposed flag + // is used to have LoopRec bail out and perform proper clean-up of the + // enumerator. + // + scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler)); + + return Disposable.Empty; + } + } + } + + internal sealed class ToObservableLongRunning : Producer._> + { + private readonly IEnumerable _source; + private readonly ISchedulerLongRunning _scheduler; + + public ToObservableLongRunning(IEnumerable source, ISchedulerLongRunning scheduler) + { + _source = source; + _scheduler = scheduler; + } + + protected override _ CreateSink(IObserver observer) => new _(observer); + + protected override void Run(_ sink) => sink.Run(_source, _scheduler); + + internal sealed class _ : IdentitySink + { + public _(IObserver observer) + : base(observer) + { + } + + public void Run(IEnumerable source, ISchedulerLongRunning scheduler) + { + var e = default(IEnumerator); + try + { + e = source.GetEnumerator(); + } + catch (Exception exception) + { + ForwardOnError(exception); + + return; + } + + SetUpstream(scheduler.ScheduleLongRunning((@this: this, e), (tuple, cancelable) => tuple.@this.Loop(tuple.e, cancelable))); } private void Loop(IEnumerator enumerator, ICancelable cancel) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Conversions.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Conversions.cs index c75474f928..83203730b8 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Conversions.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Conversions.cs @@ -26,10 +26,18 @@ public virtual IDisposable Subscribe(IEnumerable source, IObse private static IDisposable Subscribe_(IEnumerable source, IObserver observer, IScheduler scheduler) { + var longRunning = scheduler.AsLongRunning(); + if (longRunning != null) + { + // + // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. + // + return new ToObservableLongRunning(source, longRunning).Subscribe/*Unsafe*/(observer); + } // // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. // - return new ToObservable(source, scheduler).Subscribe/*Unsafe*/(observer); + return new ToObservableRecursive(source, scheduler).Subscribe/*Unsafe*/(observer); } #endregion @@ -73,12 +81,28 @@ public virtual IEventPatternSource ToEventPattern(IObser public virtual IObservable ToObservable(IEnumerable source) { - return new ToObservable(source, SchedulerDefaults.Iteration); + return ToObservable_(source, SchedulerDefaults.Iteration); } public virtual IObservable ToObservable(IEnumerable source, IScheduler scheduler) { - return new ToObservable(source, scheduler); + return ToObservable_(source, scheduler); + } + + private static IObservable ToObservable_(IEnumerable source, IScheduler scheduler) + { + var longRunning = scheduler.AsLongRunning(); + if (longRunning != null) + { + // + // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. + // + return new ToObservableLongRunning(source, longRunning); + } + // + // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. + // + return new ToObservableRecursive(source, scheduler); } #endregion