Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 355 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive.Linq.ObservableImpl
{
static internal class AppendPrepend
{
internal interface IAppendPrepend<TSource> : IObservable<TSource>
{
IAppendPrepend<TSource> Append(TSource value);
IAppendPrepend<TSource> Prepend(TSource value);
IScheduler Scheduler { get; }
}

internal sealed class AppendPrependSingle<TSource> : Producer<TSource, AppendPrependSingle<TSource>._>, IAppendPrepend<TSource>
{
private readonly IObservable<TSource> _source;
private readonly TSource _value;
private readonly bool _append;

public IScheduler Scheduler { get; }

public AppendPrependSingle(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append)
{
_source = source;
_value = value;
_append = append;
Scheduler = scheduler;
}

public IAppendPrepend<TSource> Append(TSource value)
{
var prev = new Node<TSource>(_value);

if (_append)
return new AppendPrependMultiple<TSource>(_source,
null, new Node<TSource>(prev, value), Scheduler);
else
return new AppendPrependMultiple<TSource>(_source,
prev, new Node<TSource>(value), Scheduler);
}

public IAppendPrepend<TSource> Prepend(TSource value)
{
var prev = new Node<TSource>(_value);

if (_append)
return new AppendPrependMultiple<TSource>(_source,
new Node<TSource>(value), prev, Scheduler);
else
return new AppendPrependMultiple<TSource>(_source,
new Node<TSource>(prev, value), null, Scheduler);
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);

protected override void Run(_ sink) => sink.Run();

internal sealed class _ : IdentitySink<TSource>
{
private readonly IObservable<TSource> _source;
private readonly TSource _value;
private readonly IScheduler _scheduler;
private readonly bool _append;
private IDisposable _schedulerDisposable;

public _(AppendPrependSingle<TSource> parent, IObserver<TSource> observer)
: base(observer)
{
_source = parent._source;
_value = parent._value;
_scheduler = parent.Scheduler;
_append = parent._append;
}

public void Run()
{
var disp = _append
? _source.SubscribeSafe(this)
: _scheduler.Schedule(this, PrependValue);

SetUpstream(disp);
}

private static IDisposable PrependValue(IScheduler scheduler, _ sink)
{
sink.ForwardOnNext(sink._value);
return sink._source.SubscribeSafe(sink);
}

public override void OnCompleted()
{
if (_append)
{
var disposable = _scheduler.Schedule(this, AppendValue);
Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
}
else
{
ForwardOnCompleted();
}
}

private static IDisposable AppendValue(IScheduler scheduler, _ sink)
{
sink.ForwardOnNext(sink._value);
sink.ForwardOnCompleted();
return Disposable.Empty;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _schedulerDisposable);
}
base.Dispose(disposing);
}
}
}

private sealed class AppendPrependMultiple<TSource> : Producer<TSource, AppendPrependMultiple<TSource>._>, IAppendPrepend<TSource>
{
private readonly IObservable<TSource> _source;
private readonly Node<TSource> _appends;
private readonly Node<TSource> _prepends;

public IScheduler Scheduler { get; }

public AppendPrependMultiple(IObservable<TSource> source, Node<TSource> prepends, Node<TSource> appends, IScheduler scheduler)
{
_source = source;
_appends = appends;
_prepends = prepends;
Scheduler = scheduler;
}

public IAppendPrepend<TSource> Append(TSource value)
{
return new AppendPrependMultiple<TSource>(_source,
_prepends, new Node<TSource>(_appends, value), Scheduler);
}

public IAppendPrepend<TSource> Prepend(TSource value)
{
return new AppendPrependMultiple<TSource>(_source,
new Node<TSource>(_prepends, value), _appends, Scheduler);
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);

protected override void Run(_ sink) => sink.Run();

// The sink is based on the sink of the ToObervalbe class and does basically
// the same twice, once for the append list and once for the prepend list.
// Inbetween it forwards the values of the source class.
//
internal sealed class _ : IdentitySink<TSource>
{
private readonly IObservable<TSource> _source;
private readonly TSource[] _prepends;
private readonly TSource[] _appends;
private readonly IScheduler _scheduler;
private IDisposable _schedulerDisposable;

public _(AppendPrependMultiple<TSource> parent, IObserver<TSource> observer)
: base(observer)
{
_source = parent._source;
_scheduler = parent.Scheduler;

if (parent._prepends != null)
_prepends = parent._prepends.ToArray();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid the creation of this array by simply walking the node structure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we could. I haven't done it because we then have to duplicate the two loops. It's a tradeoff between code reuse and one allocation.


if (parent._appends != null)
_appends = parent._appends.ToReverseArray();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This array creation could be avoided as well if the appends where in a doubly-linked node chain. In that case, it would be enough to traverse the chain once to find the very first value and then walking backwards.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beware, by doubly linking the nodes, we can't pass around the original observable once we appended/prepended to it, or even append/prepend something to the original observable. The instances we get from appying operators should be, in a way, immutable. We would get subtle side effects here, if I got your idea correctly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about getting the immutable collections back in (#222), then it's just a stack and a queue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instances we get from appying operators should be, in a way, immutable.

Indeed, the _appends should be an immutable list. I was somehow thinking it being a copy-on-write list (i.e., ImmutableList) which requires way more allocation and copying. My mistake.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we manage to mitigate the performance degradation that has been reported in #222, immutable collections could be helpful in a lot of places. It's used internally in the Roslyn compiler since it's highly concurrent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation has a time complexity of O(n) and needs n + 1 allocations. I doubt the immutable collections offer a datatype that can beat that.

Copy link
Collaborator

@danielcweber danielcweber Jun 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a node to the lists as they are currently implemented is even in O(1). Yes, even immutable collections won't beat that. Maybe they'll find their way into the project eventually since they would be useful in the Subjects, but until that, this solution is perfectly fine.

}

public void Run()
{
if (_prepends != null)
{
var disposable = Schedule(_prepends, s => s.SetUpstream(s._source.SubscribeSafe(s)));
Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
}
else
{
SetUpstream(_source.SubscribeSafe(this));
}
}

public override void OnCompleted()
{
if (_appends != null)
{
var disposable = Schedule(_appends, s => s.ForwardOnCompleted());
Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
}
else
{
ForwardOnCompleted();
}
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _schedulerDisposable);
}
base.Dispose(disposing);
}

private IDisposable Schedule(TSource[] array, Action<_> continueWith)
{
var longRunning = _scheduler.AsLongRunning();
if (longRunning != null)
{
//
// Long-running schedulers have the contract they should *never* prevent
// the work from starting, such that the scheduled work has the chance
// to observe the cancellation and perform proper clean-up. In this case,
// we're sure Loop will be entered, allowing us to dispose the enumerator.
//
return longRunning.ScheduleLongRunning(new State(null, this, array, continueWith), Loop);
}
else
{
//
// We never allow the scheduled work to be cancelled. Instead, the flag
// is used to have LoopRec bail out and perform proper clean-up of the
// enumerator.
//
var flag = new BooleanDisposable();
_scheduler.Schedule(new State(flag, this, array, continueWith), LoopRec);
return flag;
}
}

private struct State
{
public readonly _ _sink;
public readonly ICancelable _flag;
public readonly TSource[] _array;
public readonly Action<_> _continue;
public int _current;

public State(ICancelable flag, _ sink, TSource[] array, Action<_> c)
{
_sink = sink;
_flag = flag;
_continue = c;
_array = array;
_current = 0;
}
}

private void LoopRec(State state, Action<State> recurse)
{
if (state._flag.IsDisposed)
return;

var current = state._array[state._current];
ForwardOnNext(current);

state._current++;

if (state._current == state._array.Length)
{
state._continue(state._sink);
return;
}

recurse(state);
}

private void Loop(State state, ICancelable cancel)
{
var array = state._array;
int i = 0;

while (!cancel.IsDisposed)
{
ForwardOnNext(array[i]);
i++;

if (i == array.Length)
{
state._continue(state._sink);
break;
}
}

base.Dispose();
}
}
}

private sealed class Node<T>
{
public readonly Node<T> Parent;
public readonly T Value;
public readonly int Count;

public Node(T value)
: this(null, value)
{
}

public Node(Node<T> parent, T value)
{
Parent = parent;
Value = value;

if (parent == null)
Count = 1;
else
{
if (parent.Count == int.MaxValue)
throw new NotSupportedException($"Consecutive appends or prepends with a count of more than int.MaxValue ({int.MaxValue}) are not supported.");

Count = parent.Count + 1;
}
}

public T[] ToArray()
{
var array = new T[Count];
var current = this;
for (int i = 0; i < Count; i++)
{
array[i] = current.Value;
current = current.Parent;
}
return array;
}

public T[] ToReverseArray()
{
var array = new T[Count];
var current = this;
for (int i = Count - 1; i >= 0; i--)
{
array[i] = current.Value;
current = current.Parent;
}
return array;
}
}
}
}
10 changes: 8 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ public virtual IObservable<TSource> Append<TSource>(IObservable<TSource> source,

private static IObservable<TSource> Append_<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
{
return source.Concat(new [] { value }.ToObservable(scheduler));
if (source is AppendPrepend.IAppendPrepend<TSource> ap && ap.Scheduler == scheduler)
return ap.Append(value);

return new AppendPrepend.AppendPrependSingle<TSource>(source, value, scheduler, append: true);
}

#endregion
Expand Down Expand Up @@ -187,7 +190,10 @@ public virtual IObservable<TSource> Prepend<TSource>(IObservable<TSource> source

private static IObservable<TSource> Prepend_<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
{
return StartWith_(source, scheduler, new[] { value });
if (source is AppendPrepend.IAppendPrepend<TSource> ap && ap.Scheduler == scheduler)
return ap.Prepend(value);

return new AppendPrepend.AppendPrependSingle<TSource>(source, value, scheduler, append: false);
}

#endregion
Expand Down
Loading