Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Rx.NET/Source/src/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
using System;
using System.Resources;
using System.Runtime.InteropServices;
using System.Security;



[assembly: ComVisible(false)]
[assembly: CLSCompliant(true)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
{
return new FastPeriodicTimer(action);
}
else
{
return new PeriodicTimer(action, period);
}

return new PeriodicTimer(action, period);
}

public IDisposable QueueUserWorkItem(Action<object> action, object state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ protected virtual object GetService(Type serviceType)
{
if (serviceType == typeof(IStopwatchProvider))
{
return this as IStopwatchProvider;
return this;
}
else if (serviceType == typeof(ISchedulerLongRunning))

if (serviceType == typeof(ISchedulerLongRunning))
{
return this as ISchedulerLongRunning;
}
else if (serviceType == typeof(ISchedulerPeriodic))

if (serviceType == typeof(ISchedulerPeriodic))
{
return this as ISchedulerPeriodic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ private void Tick(Action<SchedulePeriodicStopwatch<TState>, TimeSpan> recurse)
next = Normalize(_nextDue - (_stopwatch.Elapsed - _inactiveTime));
break;
}
else if (_runState == DISPOSED)

if (_runState == DISPOSED)
{
//
// In case the periodic job gets disposed but we are currently
Expand All @@ -422,16 +423,14 @@ private void Tick(Action<SchedulePeriodicStopwatch<TState>, TimeSpan> recurse)
//
return;
}
else
{
//
// This is the least common case where we got suspended and need
// to block such that future reevaluations of the next due time
// will pick up the cumulative inactive time delta.
//
Debug.Assert(_runState == SUSPENDED);
shouldWaitForResume = true;
}

//
// This is the least common case where we got suspended and need
// to block such that future reevaluations of the next due time
// will pick up the cumulative inactive time delta.
//
Debug.Assert(_runState == SUSPENDED);
shouldWaitForResume = true;
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace System.Reactive.Concurrency
//
public static partial class Scheduler
{
internal static Type[] OPTIMIZATIONS = new Type[] {
internal static Type[] OPTIMIZATIONS = {
typeof(ISchedulerLongRunning),
typeof(IStopwatchProvider),
typeof(ISchedulerPeriodic),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ public Synchronize(IObservable<TSource> source)

internal sealed class _ : IdentitySink<TSource>
{
private readonly Synchronize<TSource> _parent;
private readonly object _gate;

public _(Synchronize<TSource> parent, IObserver<TSource> observer)
: base(observer)
{
_parent = parent;
_gate = _parent._gate ?? new object();
_gate = parent._gate ?? new object();
}

public override void OnNext(TSource value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,8 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
{
return new FastPeriodicTimer<TState>(state, action);
}
else
{
return new PeriodicTimer<TState>(state, period, action);
}

return new PeriodicTimer<TState>(state, period, action);
}

private sealed class FastPeriodicTimer<TState> : IDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ protected virtual object GetService(Type serviceType)
{
if (serviceType == typeof(IStopwatchProvider))
{
return this as IStopwatchProvider;
return this;
}

return null;
Expand Down
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/ExperimentalAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace System.Reactive
/// <summary>
/// Marks the program elements that are experimental. This class cannot be inherited.
/// </summary>
[Experimental, AttributeUsage(AttributeTargets.All, AllowMultiple = false, Inherited = true)]
[Experimental, AttributeUsage(AttributeTargets.All)]
public sealed class ExperimentalAttribute : Attribute
{
}
Expand Down
6 changes: 2 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Internal/SafeObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ public static ISafeObserver<TSource> Wrap(IObserver<TSource> observer)
{
return a.MakeSafe();
}
else
{
return new WrappingSafeObserver(observer);
}

return new WrappingSafeObserver(observer);
}

private IDisposable _disposable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delay
}
return true;
}
else

// the queue is empty and the upstream hasn't completed yet
if (empty)
{
Expand Down
58 changes: 27 additions & 31 deletions Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,44 +109,40 @@ private void Drain()
_stack.Push(nextEnumerator);
continue;
}
else
{
Volatile.Write(ref _isDisposed, true);
continue;
}

Volatile.Write(ref _isDisposed, true);
continue;
}
else
{
// we need an unique indicator for this as
// Subscribe could return a Disposable.Empty or
// a BooleanDisposable
var sad = ReadyToken.Ready;

// Swap in the Ready indicator so we know the sequence hasn't been disposed
if (Disposable.TrySetSingle(ref _currentSubscription, sad) == TrySetSingleResult.Success)
{
// subscribe to the source
var d = next.SubscribeSafe(this);
// we need an unique indicator for this as
// Subscribe could return a Disposable.Empty or
// a BooleanDisposable
var sad = ReadyToken.Ready;

// Try to swap in the returned disposable in place of the Ready indicator
// Since this drain loop is the only one to use Ready, this should
// be unambiguous
var u = Interlocked.CompareExchange(ref _currentSubscription, d, sad);
// Swap in the Ready indicator so we know the sequence hasn't been disposed
if (Disposable.TrySetSingle(ref _currentSubscription, sad) == TrySetSingleResult.Success)
{
// subscribe to the source
var d = next.SubscribeSafe(this);

// sequence disposed or completed synchronously
if (u != sad)
// Try to swap in the returned disposable in place of the Ready indicator
// Since this drain loop is the only one to use Ready, this should
// be unambiguous
var u = Interlocked.CompareExchange(ref _currentSubscription, d, sad);

// sequence disposed or completed synchronously
if (u != sad)
{
d.Dispose();
if (u == BooleanDisposable.True)
{
d.Dispose();
if (u == BooleanDisposable.True)
{
continue;
}
continue;
}
}
else
{
continue;
}
}
else
{
continue;
}
}
else
Expand Down
12 changes: 5 additions & 7 deletions Rx.NET/Source/src/System.Reactive/Linq/GroupedObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ protected override IDisposable SubscribeCore(IObserver<TElement> observer)
var subscription = _subject.Subscribe/*Unsafe*/(observer);
return StableCompositeDisposable.Create(release, subscription);
}
else
{
//
// [OK] Use of unsafe Subscribe: called on a known subject implementation.
//
return _subject.Subscribe/*Unsafe*/(observer);
}

//
// [OK] Use of unsafe Subscribe: called on a known subject implementation.
//
return _subject.Subscribe/*Unsafe*/(observer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace System.Reactive.Linq
/// target class type.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[AttributeUsage(AttributeTargets.Class, Inherited = false, AllowMultiple = false)]
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public sealed class LocalQueryMethodImplementationTypeAttribute : Attribute
{
private readonly Type _targetType;
Expand Down
36 changes: 15 additions & 21 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ public IAppendPrepend<TSource> Append(TSource value)
return new AppendPrependMultiple<TSource>(_source,
null, new Node<TSource>(prev, value), Scheduler);
}
else
{
return new AppendPrependMultiple<TSource>(_source,
prev, new Node<TSource>(value), Scheduler);
}

return new AppendPrependMultiple<TSource>(_source,
prev, new Node<TSource>(value), Scheduler);
}

public IAppendPrepend<TSource> Prepend(TSource value)
Expand All @@ -57,11 +55,9 @@ public IAppendPrepend<TSource> Prepend(TSource value)
return new AppendPrependMultiple<TSource>(_source,
new Node<TSource>(value), prev, Scheduler);
}
else
{
return new AppendPrependMultiple<TSource>(_source,
new Node<TSource>(prev, value), null, Scheduler);
}

return new AppendPrependMultiple<TSource>(_source,
new Node<TSource>(prev, value), null, Scheduler);
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
Expand Down Expand Up @@ -239,17 +235,15 @@ private IDisposable Schedule(TSource[] array, Action<_> continueWith)
//
return longRunning.ScheduleLongRunning(new State(null, this, array, continueWith), Loop);
}
else
{
//
// We never allow the scheduled work to be cancelled. Instead, the flag
// is used to have LoopRec bail out and perform proper clean-up of the
// enumerator.
//
var flag = new BooleanDisposable();
_scheduler.Schedule(new State(flag, this, array, continueWith), LoopRec);
return flag;
}

//
// We never allow the scheduled work to be cancelled. Instead, the flag
// is used to have LoopRec bail out and perform proper clean-up of the
// enumerator.
//
var flag = new BooleanDisposable();
_scheduler.Schedule(new State(flag, this, array, continueWith), LoopRec);
return flag;
}

private struct State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void OnNext(TFirst value)
else if (_other.Done)
{
_parent.ForwardOnCompleted();
return;
}
}
}
Expand All @@ -130,7 +129,6 @@ public void OnCompleted()
if (_other.Done)
{
_parent.ForwardOnCompleted();
return;
}
else
{
Expand Down Expand Up @@ -203,7 +201,6 @@ public void OnCompleted()
if (_other.Done)
{
_parent.ForwardOnCompleted();
return;
}
else
{
Expand Down Expand Up @@ -323,7 +320,6 @@ public void Done(int index)
if (allDone)
{
ForwardOnCompleted();
return;
}
}
}
Expand Down Expand Up @@ -472,7 +468,6 @@ private void OnNext(int index, TSource value)
else if (_isDone.AllExcept(index))
{
ForwardOnCompleted();
return;
}
}
}
Expand All @@ -494,7 +489,6 @@ private void OnCompleted(int index)
if (_isDone.All())
{
ForwardOnCompleted();
return;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private Action AddHandlerCore(Delegate handler)
private Action AddHandlerCoreWinRT(Delegate handler)
{
var token = _addMethod.Invoke(_target, new object[] { handler });
return () => _removeMethod.Invoke(_target, new object[] { token });
return () => _removeMethod.Invoke(_target, new[] { token });
}
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ public bool MoveNext()
Current = current;
return true;
}
else
{
_done = true;
Dispose();
}

_done = true;
Dispose();
}

return false;
Expand Down
6 changes: 2 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@ public IObservable<TSource> Combine(TimeSpan duration)
{
return this;
}
else
{
return new Time(_source, duration, _scheduler);
}

return new Time(_source, duration, _scheduler);
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
Expand Down
Loading