diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs new file mode 100644 index 0000000000..111a4422e0 --- /dev/null +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs @@ -0,0 +1,115 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Text; +using System.Threading; + +namespace System.Reactive.Linq.ObservableImpl +{ + + internal abstract class BaseBlocking : CountdownEvent, IObserver + { + protected IDisposable _upstream; + + internal T _value; + internal bool _hasValue; + internal Exception _error; + + int once; + + internal BaseBlocking() : base(1) { } + + internal void SetUpstream(IDisposable d) + { + Disposable.SetSingle(ref _upstream, d); + } + + protected void Unblock() + { + if (Interlocked.CompareExchange(ref once, 1, 0) == 0) + { + Signal(); + } + } + + public abstract void OnCompleted(); + public virtual void OnError(Exception error) + { + _value = default; + _error = error; + Unblock(); + } + public abstract void OnNext(T value); + + public new void Dispose() + { + base.Dispose(); + if (!Disposable.GetIsDisposed(ref _upstream)) + { + Disposable.TryDispose(ref _upstream); + } + } + } + + internal sealed class FirstBlocking : BaseBlocking + { + internal FirstBlocking() : base() { } + + public override void OnCompleted() + { + Unblock(); + if (!Disposable.GetIsDisposed(ref _upstream)) + { + Disposable.TryDispose(ref _upstream); + } + } + + public override void OnError(Exception error) + { + base.OnError(error); + if (!Disposable.GetIsDisposed(ref _upstream)) + { + Disposable.TryDispose(ref _upstream); + } + } + + public override void OnNext(T value) + { + if (!_hasValue) + { + _value = value; + _hasValue = true; + Disposable.TryDispose(ref _upstream); + Unblock(); + } + } + } + + internal sealed class LastBlocking : BaseBlocking + { + internal LastBlocking() : base() { } + + public override void OnCompleted() + { + Unblock(); + Disposable.TryDispose(ref _upstream); + } + + public override void OnError(Exception error) + { + base.OnError(error); + Disposable.TryDispose(ref _upstream); + } + + public override void OnNext(T value) + { + _value = value; + _hasValue = true; + } + + } +} diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs index 18a771872d..8142c37ab7 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs @@ -67,45 +67,26 @@ public virtual TSource FirstOrDefault(IObservable source, Func private static TSource FirstOrDefaultInternal(IObservable source, bool throwOnEmpty) { - var value = default(TSource); - var seenValue = false; - var ex = default(Exception); - - using (var evt = new WaitAndSetOnce()) + using (var consumer = new FirstBlocking()) { - // - // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink. - // - using (source.Subscribe/*Unsafe*/(new AnonymousObserver( - v => - { - if (!seenValue) - { - value = v; - } - seenValue = true; - evt.Set(); - }, - e => - { - ex = e; - evt.Set(); - }, - () => - { - evt.Set(); - }))) + using (var d = source.Subscribe(consumer)) { - evt.WaitOne(); - } - } + consumer.SetUpstream(d); - ex.ThrowIfNotNull(); + if (consumer.CurrentCount != 0) + { + consumer.Wait(); + } + } - if (throwOnEmpty && !seenValue) - throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); + consumer._error.ThrowIfNotNull(); - return value; + if (throwOnEmpty && !consumer._hasValue) + { + throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); + } + return consumer._value; + } } #endregion @@ -182,41 +163,27 @@ public virtual TSource LastOrDefault(IObservable source, Func< private static TSource LastOrDefaultInternal(IObservable source, bool throwOnEmpty) { - var value = default(TSource); - var seenValue = false; - var ex = default(Exception); - - using (var evt = new WaitAndSetOnce()) + using (var consumer = new LastBlocking()) { - // - // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink. - // - using (source.Subscribe/*Unsafe*/(new AnonymousObserver( - v => - { - seenValue = true; - value = v; - }, - e => - { - ex = e; - evt.Set(); - }, - () => - { - evt.Set(); - }))) + + using (var d = source.Subscribe(consumer)) { - evt.WaitOne(); - } - } + consumer.SetUpstream(d); - ex.ThrowIfNotNull(); + if (consumer.CurrentCount != 0) + { + consumer.Wait(); + } + } - if (throwOnEmpty && !seenValue) - throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); + consumer._error.ThrowIfNotNull(); - return value; + if (throwOnEmpty && !consumer._hasValue) + { + throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); + } + return consumer._value; + } } #endregion