Skip to content

Conversation

danielcweber
Copy link
Collaborator

@danielcweber danielcweber commented May 5, 2018

The main objective of this PR is to strengthen the concept of the Sink class. Before, it would wrap, yet
still internally expose an observer. Inheriting classes would access it directly and the pattern of disposing the Sink after calling OnCompleted/OnError on the wrapped observer would be on them and repeat throughout the code. This commit encapsulates the observer and exposes Forward*-methods, taking care of disposing at the right places. The result is a lot of saved duplicated code. Moreover, we find that almost all classes inheriting from Sink also implement an IObserver of some kind by themselves, so we establish a base class Sink<TSource, TTarget>. The concept of a Sink is now more obvious, i.e. it serves as the logic between the source-stream and the emitted target stream. At last, we introduce IdentitySink which will just relay events, again, this will save a lot of duplicated code.

/// <remarks>Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.</remarks>
internal abstract class Sink<TSource> : IDisposable
internal abstract class Sink<TTarget, TSource> : IObserver<TSource>, IDisposable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be more comfortable with the arguments ordered TSource, TTarget.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was aimimg for that, it's only that way I could easily do a find/replace of the pattern Sink<...>, IObserver<...>. ReSharper may do that with one click maybe. I'll have a look.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been done.

ForwardOnCompleted();
}

protected virtual void Dispose(bool disposing)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not that familiar with C# but saw this pattern mentioned several times. Is this Dispose(bool) called by the system on finalization?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not since Sink doesn't have a finalizer. It's just a pattern that IMHO should be followed if we want to have a clean, state-of-the-art codebase.


private sealed class _ : IObserver<TSource>
private sealed class _ : IObserver<TTarget>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked around much. Why is this wrapper needed? I'd assume so that Sink.Dispose() is not accessible to whomever receives the _.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed by the GetForwarder method and that's needed in one place only. I'd rather get rid of it but it's ok for now.

@@ -188,28 +187,21 @@ private bool TryGetEnumerator(IEnumerable<IObservable<TSource>> sources, out IEn
// enumerating to find the next observable sequence. Therefore,
// we feed those errors directly to the observer.
//
_observer.OnError(exception);
base.Dispose();
ForwardOnError(exception);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd think the allocation L157 is not really necessary. It could be replaced by simply:

_subscription.Disposable = next.SubscribeSafe(this);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in the scope of this PR, at least not for now. I am sure there is a lot to gain in allocation efficiency though.

@@ -8,7 +8,7 @@

namespace System.Reactive
{
internal abstract class TailRecursiveSink<TSource> : Sink<TSource>, IObserver<TSource>
internal abstract class TailRecursiveSink<TSource> : Sink<TSource>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is odd to my eyes. It is supposed to implement a trampolining mechanism where only one inner source should be running at a time but don't want to get into an infinite recursion when trying to continue with the next one.

Given that it operates out of a synchronous/blocking IEnumerable, I can't see why the stacks are even necessary. Also it enables some transformations on top of those which I don't know why they are provided.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will need to review that in depth.

/// </summary>
protected IDisposable Get(ref IDisposable fieldRef)
{
var current = fieldRef;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ref preserve the volatileness of fieldRef? I'd call Volatile.Read(ref fieldRef) to be sure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a copy of what was there in SingleAssignmentDisposable. The old code produced warnings in this regard as well.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// We use a sentinel value to indicate we've been disposed. This sentinel never leaks
// to the outside world (see the Disposable property getter), so no-one can ever assign
// this value to us manually.
return fieldRef == BooleanDisposable.True;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Volatile.Read(ref fieldRef)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You see both patterns throughout the code, sometimes with Volatile.Read, sometimes without. Won't the volatile field just do?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AssignmentDisposable shall be removed then. You may have to fetch and force-update your branch in some minutes, I'll do a rebase.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for removal, just use Volatile.Read as I suggested.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might as well just copy-paste SingleAssignmentDisposable. Copy and paste may be justified at this level.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in Reactive4.NET I didn't specify fields as volatile and had a helper static class work with refs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, that base helper static class does exactly that. Could you please review 3fc620d then.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, much better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks


protected void Dispose(ref IDisposable fieldRef)
{
Interlocked.Exchange(ref fieldRef, BooleanDisposable.True)?.Dispose();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Rx is calling Dispose all over the place and even possibly multiple times, I'd bias this so that if fieldRef is already BooleanDisposable.True, don't exchange:

if (Volatile.Read(ref fieldRef) != BooleanDisposable.True) {
    Interlocked.Exchange(ref fieldRef, BooleanDisposable.True)?.Dispose();
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be done.

@akarnokd
Copy link
Collaborator

akarnokd commented May 8, 2018

The most non-trivial thing when writing an operator externally is this: Producer.cs#L52.

If you don't want to include Rx.NET, there is no way to know a Subscribe should trampoline. The Producers are internal so you can't externally depend on this feature either.

@danielcweber
Copy link
Collaborator Author

We'll get there, I'm sure.


protected virtual void Dispose(bool disposing)
{
_observer = NopObserver<TTarget>.Instance;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really need full barrier volatile write here due to the full barrier of Exchange. A Volatile.Write(ref _observer, NopObserver<TTarget>.Instance) should suffice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the other accesses to _observer? Should be Volatile.Read then, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_observer is declared volatile so you get it for free. I tend to use Volatile.Read to clearly indicate the barriers in the code.


public Sink(IObserver<TSource> observer, IDisposable cancel)
protected Sink(IObserver<TTarget> observer, IDisposable cancel)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swap the assignment so that _observer = ... releases the plain _cancel field before the constructor returns.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the advantage ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct publication of the IDisposable reference received with respect to the memory model.

Copy link
Collaborator Author

@danielcweber danielcweber May 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the incorrect behaviour that could happen? (the memory model has always puzzled me)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential early publication of the referenced IDisposable before its state has become visible to other threads. Plain reference writes can be tricky in concurrent settings.

if (CurrentThreadScheduler.IsScheduleRequired)
{
var state = new State { sink = sink, inner = subscription.Inner };
var state = new State { sink = sink, inner = runDisposable };

CurrentThreadScheduler.Instance.Schedule(state, Run);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this BasicProducer.Run could be replaced with a static delegate so that only one state object is allocated and not any delegate (I assume assigning a method to a delegate type actually instantiates some objects in C# right?):

       private struct State
        {
            public BasicProducer<TSource> parent;
            public SingleAssignmentDisposable subscription;
            public IObserver<TSource> observer;
        }

        static readonly Func<IScheduler, State, IDisposable> RunRun = (_, x) =>
        {
            x.subscription.Disposable = x.parent.Run(x.observer);
            return Disposable.Empty;
        };

//...
           if (CurrentThreadScheduler.IsScheduleRequired)
            {
                var state = new State { parent = this, subscription = subscription, observer = observer };
                CurrentThreadScheduler.Instance.Schedule(state, RunRun);
            }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'm actually puzzled that making Run static won't make the compiler cache the delegate...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlining Run as anonymous lambda that's not dependent on closure will do the trick. Interesting, I had thought that delegate caching will work on static methods as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can implement all your suggestions, you may as well file a pull request to get the credit for the commit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on TailRecursiveSink at the moment and I have these already changed in my version.

@danielcweber danielcweber changed the title [WIP] Review producers and sinks Review producers and sinks May 15, 2018
@danielcweber danielcweber mentioned this pull request May 17, 2018
…still internally expose an observer. Inheriting classes would access it directly and the pattern of disposing the Sink after calling OnCompleted/OnError on the wrapped observer would be on them and repeat throughout the code. This commit encapsulates the observer and exposes Forward*-methods, taking care of disposing at the right places. The result is a lot of saved duplicated code. Moreover, we find that almost all classes inheriting from Sink also implement an IObserver of some kind by themselves, so we establish a base class Sink<TSource, TTarget>. The concept of a Sink is now more obvious, i.e. it serves as the logic between the source-stream and the emitted target stream. At last, we introduce IdentitySink which will just relay events, again, this will save a lot of duplicated code.
@danielcweber danielcweber merged commit f2ea212 into dotnet:master May 28, 2018
@danielcweber danielcweber deleted the ReviewProducersAndSinks branch May 28, 2018 16:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants