Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ static void Main()
typeof(ZipBenchmark),
typeof(CombineLatestBenchmark),
typeof(SwitchBenchmark),
typeof(BufferCountBenchmark)
typeof(BufferCountBenchmark),
typeof(RangeBenchmark)
});

switcher.Run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Reactive.Linq;
using System.Threading;
using BenchmarkDotNet.Attributes;

namespace Benchmarks.System.Reactive
{
[MemoryDiagnoser]
public class RangeBenchmark
{
[Params(1, 10, 100, 1000, 10000, 100000, 1000000)]
public int N;

int _store;

[Benchmark]
public void Range()
{
Observable.Range(1, N).Subscribe(v => Volatile.Write(ref _store, v));
}
}
}
106 changes: 76 additions & 30 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,71 +7,117 @@

namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class Range : Producer<int, Range._>
internal sealed class RangeRecursive : Producer<int, RangeRecursive.RangeSink>
{
private readonly int _start;
private readonly int _count;
private readonly IScheduler _scheduler;

public Range(int start, int count, IScheduler scheduler)
public RangeRecursive(int start, int count, IScheduler scheduler)
{
_start = start;
_count = count;
_scheduler = scheduler;
}

protected override _ CreateSink(IObserver<int> observer) => new _(this, observer);
protected override RangeSink CreateSink(IObserver<int> observer) => new RangeSink(_start, _count, observer);

protected override void Run(_ sink) => sink.Run(_scheduler);
protected override void Run(RangeSink sink) => sink.Run(_scheduler);

internal sealed class _ : IdentitySink<int>
internal sealed class RangeSink : IdentitySink<int>
{
private readonly int _start;
private readonly int _count;
readonly int _end;

public _(Range parent, IObserver<int> observer)
int _index;

IDisposable _task;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the _upstream is not accessible so it can't be used to replace a previous task.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to dispose this. I'll update the PR shortly with a fresh benchmark as well.


public RangeSink(int start, int count, IObserver<int> observer)
: base(observer)
{
_start = parent._start;
_count = parent._count;
_index = start;
_end = start + count;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calculate the end index once (exclusive).

}

public void Run(IScheduler scheduler)
{
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetSingle(ref _task, first);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the first schedule returns, a subsequent schedule might be underway with _task set on it. This will avoid overwriting that newer IDisposable.

}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
SetUpstream(longRunning.ScheduleLongRunning(0, Loop));
Disposable.TryDispose(ref _task);
}
else
}

private IDisposable LoopRec(IScheduler scheduler)
{
var idx = _index;
if (idx != _end)
{
_index = idx + 1;
ForwardOnNext(idx);
var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
Disposable.TrySetMultiple(ref _task, next);
} else
{
SetUpstream(scheduler.Schedule(0, LoopRec));
ForwardOnCompleted();
}
return Disposable.Empty;
}
}
}

internal sealed class RangeLongRunning : Producer<int, RangeLongRunning.RangeSink>
{
private readonly int _start;
private readonly int _count;
private readonly ISchedulerLongRunning _scheduler;

private void Loop(int i, ICancelable cancel)
public RangeLongRunning(int start, int count, ISchedulerLongRunning scheduler)
{
_start = start;
_count = count;
_scheduler = scheduler;
}

protected override RangeSink CreateSink(IObserver<int> observer) => new RangeSink(_start, _count, observer);

protected override void Run(RangeSink sink) => sink.Run(_scheduler);

internal sealed class RangeSink : IdentitySink<int>
{
readonly int _end;

int _index;

public RangeSink(int start, int count, IObserver<int> observer)
: base(observer)
{
while (!cancel.IsDisposed && i < _count)
{
ForwardOnNext(_start + i);
i++;
}
_index = start;
_end = start + count;
}

if (!cancel.IsDisposed)
ForwardOnCompleted();
public void Run(ISchedulerLongRunning scheduler)
{
SetUpstream(scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
}

private void LoopRec(int i, Action<int> recurse)
private void Loop(ICancelable cancel)
{
if (i < _count)
var idx = _index;
var end = _end;
while (!cancel.IsDisposed && idx != end)
{
ForwardOnNext(_start + i);
recurse(i + 1);
ForwardOnNext(idx++);
}
else
{

if (!cancel.IsDisposed)
ForwardOnCompleted();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ public virtual IObservable<int> Range(int start, int count, IScheduler scheduler

private static IObservable<int> Range_(int start, int count, IScheduler scheduler)
{
return new Range(start, count, scheduler);
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
return new RangeLongRunning(start, count, longRunning);
}
return new RangeRecursive(start, count, scheduler);
}

#endregion
Expand Down