diff --git a/Rx.NET/Source/src/System.Reactive/Joins/JoinObserver.cs b/Rx.NET/Source/src/System.Reactive/Joins/JoinObserver.cs index 01b5c3b591..57ff6ddcf5 100644 --- a/Rx.NET/Source/src/System.Reactive/Joins/JoinObserver.cs +++ b/Rx.NET/Source/src/System.Reactive/Joins/JoinObserver.cs @@ -16,32 +16,31 @@ internal interface IJoinObserver : IDisposable internal sealed class JoinObserver : ObserverBase>, IJoinObserver { - private object gate; - private readonly IObservable source; - private readonly Action onError; - private List activePlans; + private object _gate; + private readonly IObservable _source; + private readonly Action _onError; + private readonly List _activePlans; public Queue> Queue { get; private set; } - private readonly SingleAssignmentDisposable subscription; - private bool isDisposed; + private IDisposable _subscription; + private bool _isDisposed; public JoinObserver(IObservable source, Action onError) { - this.source = source; - this.onError = onError; + _source = source; + _onError = onError; Queue = new Queue>(); - subscription = new SingleAssignmentDisposable(); - activePlans = new List(); + _activePlans = new List(); } public void AddActivePlan(ActivePlan activePlan) { - activePlans.Add(activePlan); + _activePlans.Add(activePlan); } public void Subscribe(object gate) { - this.gate = gate; - subscription.Disposable = source.Materialize().SubscribeSafe(this); + _gate = gate; + Disposable.SetSingle(ref _subscription, _source.Materialize().SubscribeSafe(this)); } public void Dequeue() @@ -51,18 +50,18 @@ public void Dequeue() protected override void OnNextCore(Notification notification) { - lock (gate) + lock (_gate) { - if (!isDisposed) + if (!_isDisposed) { if (notification.Kind == NotificationKind.OnError) { - onError(notification.Exception); + _onError(notification.Exception); return; } Queue.Enqueue(notification); - foreach (var activePlan in activePlans.ToArray()) + foreach (var activePlan in _activePlans.ToArray()) // Working on a copy since _activePlans might change while iterating. activePlan.Match(); } } @@ -78,8 +77,8 @@ protected override void OnCompletedCore() internal void RemoveActivePlan(ActivePlan activePlan) { - activePlans.Remove(activePlan); - if (activePlans.Count == 0) + _activePlans.Remove(activePlan); + if (_activePlans.Count == 0) Dispose(); } @@ -87,13 +86,13 @@ protected override void Dispose(bool disposing) { base.Dispose(disposing); - if (!isDisposed) + if (!_isDisposed) { if (disposing) - subscription.Dispose(); + Disposable.TryDispose(ref _subscription); - isDisposed = true; + _isDisposed = true; } } } -} \ No newline at end of file +}