Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,17 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<

var state1 = state;

var d = new MultipleAssignmentDisposable();
var td = new TernaryDisposable();

var gate = new AsyncLock();
td.Extra = gate;

var tick = default(Func<IScheduler, object, IDisposable>);
tick = (self_, _) =>
{
next += period;

d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
td.Next = self_.Schedule(null, next - _stopwatch.Elapsed, tick);

gate.Wait(() =>
{
Expand All @@ -214,9 +216,28 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
return Disposable.Empty;
};

d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
td.First = Schedule(null, next - _stopwatch.Elapsed, tick);

return td;
}

private sealed class TernaryDisposable : IDisposable
{
private IDisposable _task;
private IDisposable _extra;

// If Next was called before this assignment is executed, it won't overwrite
// a more fresh IDisposable task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a bug even in the original code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same bug as with #560.

public IDisposable First { set { Disposable.TrySetSingle(ref _task, value); } }
// It is fine to overwrite the first or previous IDisposable task
public IDisposable Next { set { Disposable.TrySetMultiple(ref _task, value); } }
public IDisposable Extra { set { Disposable.SetSingle(ref _extra, value); } }

return StableCompositeDisposable.Create(d, gate);
public void Dispose()
{
Disposable.TryDispose(ref _task);
Disposable.TryDispose(ref _extra);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private static IDisposable SchedulePeriodic_<TState>(IScheduler scheduler, TStat
}
}

private sealed class SchedulePeriodicStopwatch<TState>
private sealed class SchedulePeriodicStopwatch<TState> : IDisposable
{
private readonly IScheduler _scheduler;
private readonly TimeSpan _period;
Expand Down Expand Up @@ -341,6 +341,8 @@ public SchedulePeriodicStopwatch(IScheduler scheduler, TState state, TimeSpan pe
private const int SUSPENDED = 2;
private const int DISPOSED = 3;

private IDisposable _task;

public IDisposable Start()
{
RegisterHostLifecycleEventHandlers();
Expand All @@ -349,11 +351,14 @@ public IDisposable Start()
_nextDue = _period;
_runState = RUNNING;

return StableCompositeDisposable.Create
(
_scheduler.Schedule(_nextDue, Tick),
Disposable.Create(Cancel)
);
Disposable.TrySetSingle(ref _task, _scheduler.Schedule(_nextDue, Tick));
return this;
}

void IDisposable.Dispose()
{
Disposable.TryDispose(ref _task);
Cancel();
}

private void Tick(Action<TimeSpan> recurse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public _(SynchronizationContext context, IObserver<TSource> observer)
_context = context;
}

public void Run(IObservable<TSource> source)
public override void Run(IObservable<TSource> source)
{
//
// The interactions with OperationStarted/OperationCompleted below allow
Expand All @@ -83,10 +83,16 @@ public void Run(IObservable<TSource> source)
//
_context.OperationStarted();

var d = source.SubscribeSafe(this);
var c = Disposable.Create(_context.OperationCompleted);
SetUpstream(source.SubscribeSafe(this));
}

SetUpstream(StableCompositeDisposable.Create(d, c));
protected override void Dispose(bool disposing)
{
if (disposing)
{
_context.OperationCompleted();
}
base.Dispose(disposing);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the order is not significant.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not.

}

public override void OnNext(TSource value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,6 @@ internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable

readonly ConcurrentQueue<T> queue;

/// <summary>
/// The disposable of the upstream source.
/// </summary>
IDisposable upstream;

private IDisposable _run;

/// <summary>
Expand Down
79 changes: 60 additions & 19 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ internal sealed class _ : Sink<TSource, IList<TSource>>

private readonly object _gate = new object();
private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>();
private readonly SerialDisposable _timerD = new SerialDisposable();

private IDisposable _timerSerial;

public _(TimeSliding parent, IObserver<IList<TSource>> observer)
: base(observer)
Expand All @@ -146,9 +147,16 @@ public void Run(TimeSliding parent)
CreateWindow();
CreateTimer();

var subscription = parent._source.SubscribeSafe(this);
base.Run(parent._source);
}

SetUpstream(StableCompositeDisposable.Create(_timerD, subscription));
protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _timerSerial);
}
base.Dispose(disposing);
}

private void CreateWindow()
Expand All @@ -160,7 +168,8 @@ private void CreateWindow()
private void CreateTimer()
{
var m = new SingleAssignmentDisposable();
_timerD.Disposable = m;

Disposable.TrySetSerial(ref _timerSerial, m);

var isSpan = false;
var isShift = false;
Expand Down Expand Up @@ -281,14 +290,23 @@ public _(IObserver<IList<TSource>> observer)

private List<TSource> _list;

private IDisposable _periodicDisposable;

public void Run(TimeHopping parent)
{
_list = new List<TSource>();

var d = parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick);
var s = parent._source.SubscribeSafe(this);
Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick));
base.Run(parent._source);
}

SetUpstream(StableCompositeDisposable.Create(d, s));
protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _periodicDisposable);
}
base.Dispose(disposing);
}

private void Tick()
Expand Down Expand Up @@ -353,7 +371,7 @@ internal sealed class _ : Sink<TSource, IList<TSource>>
private readonly Ferry _parent;

private readonly object _gate = new object();
private readonly SerialDisposable _timerD = new SerialDisposable();
private IDisposable _timerSerial;

public _(Ferry parent, IObserver<IList<TSource>> observer)
: base(observer)
Expand All @@ -373,15 +391,22 @@ public void Run()

CreateTimer(0);

var subscription = _parent._source.SubscribeSafe(this);
SetUpstream(_parent._source.SubscribeSafe(this));
}

SetUpstream(StableCompositeDisposable.Create(_timerD, subscription));
protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _timerSerial);
}
base.Dispose(disposing);
}

private void CreateTimer(int id)
{
var m = new SingleAssignmentDisposable();
_timerD.Disposable = m;
Disposable.TrySetSerial(ref _timerSerial, m);

m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
}
Expand Down Expand Up @@ -477,7 +502,7 @@ internal sealed class _ : Sink<TSource, IList<TSource>>
{
private readonly object _gate = new object();
private readonly AsyncLock _bufferGate = new AsyncLock();
private readonly SerialDisposable _bufferClosingSubscription = new SerialDisposable();
private IDisposable _bufferClosingSerialDisposable;

private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;

Expand All @@ -489,15 +514,22 @@ public _(Selector parent, IObserver<IList<TSource>> observer)

private IList<TSource> _buffer;

public void Run(IObservable<TSource> source)
public override void Run(IObservable<TSource> source)
{
_buffer = new List<TSource>();

var groupDisposable = StableCompositeDisposable.Create(_bufferClosingSubscription, source.SubscribeSafe(this));
base.Run(source);

_bufferGate.Wait(CreateBufferClose);
}

SetUpstream(groupDisposable);
protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _bufferClosingSerialDisposable);
}
base.Dispose(disposing);
}

private void CreateBufferClose()
Expand All @@ -517,7 +549,7 @@ private void CreateBufferClose()
}

var closingSubscription = new SingleAssignmentDisposable();
_bufferClosingSubscription.Disposable = closingSubscription;
Disposable.TrySetSerial(ref _bufferClosingSerialDisposable, closingSubscription);
closingSubscription.Disposable = bufferClose.SubscribeSafe(new BufferClosingObserver(this, closingSubscription));
}

Expand Down Expand Up @@ -616,14 +648,23 @@ public _(IObserver<IList<TSource>> observer)

private IList<TSource> _buffer;

private IDisposable _boundariesDisposable;

public void Run(Boundaries parent)
{
_buffer = new List<TSource>();

var sourceSubscription = parent._source.SubscribeSafe(this);
var boundariesSubscription = parent._bufferBoundaries.SubscribeSafe(new BufferClosingObserver(this));
base.Run(parent._source);
Disposable.SetSingle(ref _boundariesDisposable, parent._bufferBoundaries.SubscribeSafe(new BufferClosingObserver(this)));
}

SetUpstream(StableCompositeDisposable.Create(sourceSubscription, boundariesSubscription));
protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _boundariesDisposable);
}
base.Dispose(disposing);
}

private sealed class BufferClosingObserver : IObserver<TBufferClosing>
Expand Down
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public _(Func<TException, IObservable<TSource>> handler, IObserver<TSource> obse

private SerialDisposable _subscription;

public void Run(IObservable<TSource> source)
public override void Run(IObservable<TSource> source)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe indeed a use case for applying serial-semantics to the upstream-disposable in Sink?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VS was full of complaining about these missing overrides.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful, it's not complaining about missing overrides, it's notifying you that the new Run hides base.Run, which might or might not be what you want. Don't just write override everywhere now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was still ambiguous; the base.Run is designed to be overridden but perhaps all void Runs could be also new, an since Run is always called manually, it may be called even something else.

{
_subscription = new SerialDisposable();

Expand Down
36 changes: 20 additions & 16 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,41 @@ public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> obser

private object _gate;

private IDisposable _firstDisposable;
private IDisposable _secondDisposable;

public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
{
_gate = new object();

var fstSubscription = new SingleAssignmentDisposable();
var sndSubscription = new SingleAssignmentDisposable();

var fstO = new FirstObserver(this, fstSubscription);
var sndO = new SecondObserver(this, sndSubscription);
var fstO = new FirstObserver(this);
var sndO = new SecondObserver(this);

fstO.Other = sndO;
sndO.Other = fstO;

fstSubscription.Disposable = first.SubscribeSafe(fstO);
sndSubscription.Disposable = second.SubscribeSafe(sndO);
Disposable.SetSingle(ref _firstDisposable, first.SubscribeSafe(fstO));
Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(sndO));
}

SetUpstream(StableCompositeDisposable.Create(fstSubscription, sndSubscription));
protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _firstDisposable);
Disposable.TryDispose(ref _secondDisposable);
}
base.Dispose(disposing);
}

private sealed class FirstObserver : IObserver<TFirst>
{
private readonly _ _parent;
private readonly IDisposable _self;
private SecondObserver _other;

public FirstObserver(_ parent, IDisposable self)
public FirstObserver(_ parent)
{
_parent = parent;
_self = self;
}

public SecondObserver Other { set { _other = value; } }
Expand Down Expand Up @@ -128,7 +134,7 @@ public void OnCompleted()
}
else
{
_self.Dispose();
Disposable.TryDispose(ref _parent._firstDisposable);
}
}
}
Expand All @@ -137,13 +143,11 @@ public void OnCompleted()
private sealed class SecondObserver : IObserver<TSecond>
{
private readonly _ _parent;
private readonly IDisposable _self;
private FirstObserver _other;

public SecondObserver(_ parent, IDisposable self)
public SecondObserver(_ parent)
{
_parent = parent;
_self = self;
}

public FirstObserver Other { set { _other = value; } }
Expand Down Expand Up @@ -203,7 +207,7 @@ public void OnCompleted()
}
else
{
_self.Dispose();
Disposable.TryDispose(ref _parent._secondDisposable);
}
}
}
Expand Down
Loading