diff --git a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs index 6c8af3bcce..9b1d620f69 100644 --- a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs +++ b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs @@ -15,7 +15,8 @@ static void Main() typeof(ZipBenchmark), typeof(CombineLatestBenchmark), typeof(SwitchBenchmark), - typeof(BufferCountBenchmark) + typeof(BufferCountBenchmark), + typeof(RangeBenchmark) }); switcher.Run(); diff --git a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/RangeBenchmark.cs b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/RangeBenchmark.cs new file mode 100644 index 0000000000..ab42b0fa35 --- /dev/null +++ b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/RangeBenchmark.cs @@ -0,0 +1,26 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Reactive.Linq; +using System.Threading; +using BenchmarkDotNet.Attributes; + +namespace Benchmarks.System.Reactive +{ + [MemoryDiagnoser] + public class RangeBenchmark + { + [Params(1, 10, 100, 1000, 10000, 100000, 1000000)] + public int N; + + int _store; + + [Benchmark] + public void Range() + { + Observable.Range(1, N).Subscribe(v => Volatile.Write(ref _store, v)); + } + } +} diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs index 10c0fd38f8..eb72f29897 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs @@ -7,71 +7,117 @@ namespace System.Reactive.Linq.ObservableImpl { - internal sealed class Range : Producer + internal sealed class RangeRecursive : Producer { private readonly int _start; private readonly int _count; private readonly IScheduler _scheduler; - public Range(int start, int count, IScheduler scheduler) + public RangeRecursive(int start, int count, IScheduler scheduler) { _start = start; _count = count; _scheduler = scheduler; } - protected override _ CreateSink(IObserver observer) => new _(this, observer); + protected override RangeSink CreateSink(IObserver observer) => new RangeSink(_start, _count, observer); - protected override void Run(_ sink) => sink.Run(_scheduler); + protected override void Run(RangeSink sink) => sink.Run(_scheduler); - internal sealed class _ : IdentitySink + internal sealed class RangeSink : IdentitySink { - private readonly int _start; - private readonly int _count; + readonly int _end; - public _(Range parent, IObserver observer) + int _index; + + IDisposable _task; + + public RangeSink(int start, int count, IObserver observer) : base(observer) { - _start = parent._start; - _count = parent._count; + _index = start; + _end = start + count; } public void Run(IScheduler scheduler) { - var longRunning = scheduler.AsLongRunning(); - if (longRunning != null) + var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler)); + Disposable.TrySetSingle(ref _task, first); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + if (disposing) { - SetUpstream(longRunning.ScheduleLongRunning(0, Loop)); + Disposable.TryDispose(ref _task); } - else + } + + private IDisposable LoopRec(IScheduler scheduler) + { + var idx = _index; + if (idx != _end) + { + _index = idx + 1; + ForwardOnNext(idx); + var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler)); + Disposable.TrySetMultiple(ref _task, next); + } else { - SetUpstream(scheduler.Schedule(0, LoopRec)); + ForwardOnCompleted(); } + return Disposable.Empty; } + } + } + + internal sealed class RangeLongRunning : Producer + { + private readonly int _start; + private readonly int _count; + private readonly ISchedulerLongRunning _scheduler; - private void Loop(int i, ICancelable cancel) + public RangeLongRunning(int start, int count, ISchedulerLongRunning scheduler) + { + _start = start; + _count = count; + _scheduler = scheduler; + } + + protected override RangeSink CreateSink(IObserver observer) => new RangeSink(_start, _count, observer); + + protected override void Run(RangeSink sink) => sink.Run(_scheduler); + + internal sealed class RangeSink : IdentitySink + { + readonly int _end; + + int _index; + + public RangeSink(int start, int count, IObserver observer) + : base(observer) { - while (!cancel.IsDisposed && i < _count) - { - ForwardOnNext(_start + i); - i++; - } + _index = start; + _end = start + count; + } - if (!cancel.IsDisposed) - ForwardOnCompleted(); + public void Run(ISchedulerLongRunning scheduler) + { + SetUpstream(scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel))); } - private void LoopRec(int i, Action recurse) + private void Loop(ICancelable cancel) { - if (i < _count) + var idx = _index; + var end = _end; + while (!cancel.IsDisposed && idx != end) { - ForwardOnNext(_start + i); - recurse(i + 1); + ForwardOnNext(idx++); } - else - { + + if (!cancel.IsDisposed) ForwardOnCompleted(); - } } } } diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index c2947046b4..2939a33480 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -338,7 +338,12 @@ public virtual IObservable Range(int start, int count, IScheduler scheduler private static IObservable Range_(int start, int count, IScheduler scheduler) { - return new Range(start, count, scheduler); + var longRunning = scheduler.AsLongRunning(); + if (longRunning != null) + { + return new RangeLongRunning(start, count, longRunning); + } + return new RangeRecursive(start, count, scheduler); } #endregion