-
Notifications
You must be signed in to change notification settings - Fork 770
4.x: Inline StableCompositeDisposable.Create into the Sinks #578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
4.x: Inline StableCompositeDisposable.Create into the Sinks #578
Conversation
private IDisposable _extra; | ||
|
||
// If Next was called before this assignment is executed, it won't overwrite | ||
// a more fresh IDisposable task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a bug even in the original code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same bug as with #560.
|
||
void IDisposable.Dispose() | ||
{ | ||
Disposable.TrySetSerial(ref _task, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not TryDispose ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it wasn't clear if Start would be called multiple times and thus a TryDispose
would prevent that completely. Well, If I change this to TryDispose
, no unit test fails so I guess it could be disposed.
{ | ||
_context.OperationCompleted(); | ||
} | ||
base.Dispose(disposing); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming the order is not significant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not.
@@ -146,9 +147,16 @@ public void Run(TimeSliding parent) | |||
CreateWindow(); | |||
CreateTimer(); | |||
|
|||
var subscription = parent._source.SubscribeSafe(this); | |||
SetUpstream(parent._source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use base.Run?
var d = parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick); | ||
var s = parent._source.SubscribeSafe(this); | ||
Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick)); | ||
SetUpstream(parent._source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
{ | ||
_buffer = new List<TSource>(); | ||
|
||
var groupDisposable = StableCompositeDisposable.Create(_bufferClosingSubscription, source.SubscribeSafe(this)); | ||
SetUpstream(source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
public void Run(Boundaries parent) | ||
{ | ||
_buffer = new List<TSource>(); | ||
|
||
var sourceSubscription = parent._source.SubscribeSafe(this); | ||
var boundariesSubscription = parent._bufferBoundaries.SubscribeSafe(new BufferClosingObserver(this)); | ||
SetUpstream(parent._source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
@@ -91,7 +91,7 @@ public _(Func<TException, IObservable<TSource>> handler, IObserver<TSource> obse | |||
|
|||
private SerialDisposable _subscription; | |||
|
|||
public void Run(IObservable<TSource> source) | |||
public override void Run(IObservable<TSource> source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe indeed a use case for applying serial-semantics to the upstream-disposable in Sink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VS was full of complaining about these missing override
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Careful, it's not complaining about missing overrides, it's notifying you that the new Run hides base.Run, which might or might not be what you want. Don't just write override
everywhere now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was still ambiguous; the base.Run is designed to be overridden but perhaps all void Runs could be also new
, an since Run
is always called manually, it may be called even something else.
{ | ||
var subscription = source.SubscribeSafe(this); | ||
Disposable.SetSingle(ref _sourceDisposable, source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a protected method in Sink like "DisposeUpstream"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And TrySetSingle, TrySetSerial, TrySetMultiple, TryDispose...
@@ -47,10 +49,17 @@ public void Run(Multicast<TSource, TIntermediate, TResult> parent) | |||
return; | |||
} | |||
|
|||
var subscription = observable.SubscribeSafe(this); | |||
var connection = connectable.Connect(); | |||
SetUpstream(observable.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run?
var sourceSubscription = new SingleAssignmentDisposable(); | ||
_sourceSubscription = sourceSubscription; | ||
sourceSubscription.Disposable = parent._source.SubscribeSafe(this); | ||
Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, _sourceDisposable has to be disposed separately: see OnCompleted
@@ -212,7 +212,7 @@ public _(ObservableSelectorIndexed parent, IObserver<TResult> observer) | |||
private bool _isStopped; | |||
private int _index; | |||
|
|||
public void Run(IObservable<TSource> source) | |||
public override void Run(IObservable<TSource> source) | |||
{ | |||
_isStopped = false; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't that be vice versa?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, access to _sourceSubscription is needed.
{ | ||
_count = 1; | ||
|
||
SetUpstream(StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel)); | ||
SetUpstream(source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
_enumerator = parent._second.GetEnumerator(); | ||
var enumerator = parent._second.GetEnumerator(); | ||
|
||
if (Interlocked.CompareExchange(ref _enumerator, enumerator, null) != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have generic versions of TrySet... of T where T : IDisposable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You still need a type-specific disposed instance, in case some methods are called on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... right.
@@ -172,11 +172,21 @@ public _(IObserver<TSource> observer) | |||
{ | |||
} | |||
|
|||
private IDisposable _sourceDisposable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why explicitly `?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either the scheduled task disposable or the upstream disposable has to be stored in a field. I'd guess you'd say base.Run.
SetUpstream(StableCompositeDisposable.Create(t, d)); | ||
SetUpstream(parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick())); | ||
Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works but is a little bit of misuse, because upstream is usually the source subscription, not a timer subscription. I know there is no way currently.
_id = 0UL; | ||
|
||
var subscription = source.SubscribeSafe(this); | ||
SetUpstream(source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
_id = 0UL; | ||
|
||
var subscription = parent._source.SubscribeSafe(this); | ||
SetUpstream(parent._source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
|
||
return; | ||
} | ||
|
||
SetUpstream(StableCompositeDisposable.Create(source.SubscribeSafe(this), disposable)); | ||
SetUpstream(source.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
@@ -282,9 +295,16 @@ public void Run(IObservable<TFirst> first, IEnumerable<TSecond> second) | |||
return; | |||
} | |||
|
|||
var leftSubscription = first.SubscribeSafe(this); | |||
SetUpstream(first.SubscribeSafe(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
base.Run
I think I've addressed all the feedback. |
I try to do another round of review today, if I won't make it today I'll merge it on monday. |
@@ -262,6 +263,13 @@ public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> obser | |||
|
|||
private IEnumerator<TSecond> _rightEnumerator; | |||
|
|||
private static IEnumerator<TSecond> DisposedEnumerator = MakeDisposedEnumerator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readonly please
This PR inlines the two argument
StableCompositeDisposable.Create()
into the variousSink
implementation, saving on allocation and indirection.