-
Notifications
You must be signed in to change notification settings - Fork 770
4.x: Improve blocking First & Last operators #590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
{ | ||
evt.Set(); | ||
}))) | ||
consumer.SetUpstream(d); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? Any completion will unblock the consumer and d will be disposed by the using-block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unblocking may happen any time later and the upstream would still push items to be dropped unnecessarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any time later, how?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pausing and resuming a thread takes time, about 5 microseconds on Windows based on my experience. During this time, the source may generate anything between zero to 1000 items, all of them wasting resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, you're taking that into account.
namespace System.Reactive.Linq.ObservableImpl | ||
{ | ||
|
||
internal abstract class BaseBlocking<T> : CountdownEvent, IObserver<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CountdownEvent vs. ManualResetEvent ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ManualResetEvent
cannot be extended, CountdownEvent
can, saving on an allocation.
internal bool _hasValue; | ||
internal Exception _error; | ||
|
||
int once; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above, ManualResetEvent would save this field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allocation vs an atomic operation tradeoff. If Signal
wouldn't throw by default, we'd not need this.
{ | ||
this._value = value; | ||
this._hasValue = true; | ||
Disposable.TryDispose(ref _upstream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why no check here (or, why check above) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OnNext is called in a serialized fashion and a previous call already set _hasValue
to true. Checking that upfront guarantees this executes at most once, but OnCompleted
may still run regardless of _upstream
disposed so that we want to avoid doing again there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the order (Unblock, Dispose) reversed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No practical reason other than the reverse will race the upstream disposition with the using
disposing.
I'm thinking about whether CountdownEvent.Dispose should be overridden to dispose the upstream, not that it would conceptually be required but simply because there is the disposable field and the instance is disposable. |
Calling CountdownEvent.Dispose breaks the CountdownEvent and it cannot be unblocked after that. Methods will start throwing ObjectDisposedException as well. |
Disposable.TryDispose(ref _upstream); | ||
} | ||
|
||
public override void OnError(Exception error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Save for the IsDisposed-check, the overriding code is almost identical. Enough reason to push it to the base-class ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last doesn't need that dispose check so why have it there. First needs it so why penalize everybody else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least push some of the code into the base class.
What I mean is, if CountdownEvent.Dispose is called, should the upstream be disposed? |
It can't be called here as the class/instance is not exposed. |
Still, BaseBlocking implements IDisposable and holds a disposable, it should override and dispose of the upstream. |
Why? |
Maybe it should be put in a using block then and honour that it implements IDisposable. |
No. It makes no sense in this context. You dispose the subscription returned by |
Thinking about that, it might definitely make sense since the CountdownEvent probably holds onto an unmanaged resource and should be disposed as soon you're done with it. |
WaitAndSetOnce wrapped a ManualResetEvent before and it's in a using-block. |
Optimize the blocking
First
andLast
(==Wait
) by inlining the various classes into one container and not waiting if the operation terminated by the timeSubscribe
returned.