Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Rx.NET/Source/src/System.Reactive/AnonymousSafeObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal sealed class AnonymousSafeObserver<T> : SafeObserver<T>
private readonly Action<Exception> _onError;
private readonly Action _onCompleted;

private int isStopped;
private int _isStopped;

public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
Expand All @@ -35,7 +35,7 @@ public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action

public override void OnNext(T value)
{
if (isStopped == 0)
if (_isStopped == 0)
{
var __noError = false;
try
Expand All @@ -55,7 +55,7 @@ public override void OnNext(T value)

public override void OnError(Exception error)
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
if (Interlocked.Exchange(ref _isStopped, 1) == 0)
{
using (this)
{
Expand All @@ -66,7 +66,7 @@ public override void OnError(Exception error)

public override void OnCompleted()
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
if (Interlocked.Exchange(ref _isStopped, 1) == 0)
{
using (this)
{
Expand Down
38 changes: 19 additions & 19 deletions Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace System.Reactive.Concurrency
/// </summary>
public sealed class AsyncLock : IDisposable
{
private bool isAcquired;
private bool hasFaulted;
private readonly object guard = new object();
private Queue<(Action<Delegate, object> action, Delegate @delegate, object state)> queue;
private bool _isAcquired;
private bool _hasFaulted;
private readonly object _guard = new object();
private Queue<(Action<Delegate, object> action, Delegate @delegate, object state)> _queue;

/// <summary>
/// Queues the action for execution. If the caller acquires the lock and becomes the owner,
Expand Down Expand Up @@ -56,32 +56,32 @@ internal void Wait<TState>(TState state, Action<TState> action)
private void Wait(object state, Delegate @delegate, Action<Delegate, object> action)
{
// allow one thread to update the state
lock (guard)
lock (_guard)
{
// if a previous action crashed, ignore any future actions
if (hasFaulted)
if (_hasFaulted)
{
return;
}

// if the "lock" is busy, queue up the extra work
// otherwise there is no need to queue up "action"
if (isAcquired)
if (_isAcquired)
{
// create the queue if necessary
var q = queue;
var q = _queue;
if (q == null)
{
q = new Queue<(Action<Delegate, object> action, Delegate @delegate, object state)>();
queue = q;
_queue = q;
}
// enqueue the work
q.Enqueue((action, @delegate, state));
return;
}

// indicate there is processing going on
isAcquired = true;
_isAcquired = true;
}

// if we get here, execute the "action" first
Expand All @@ -95,25 +95,25 @@ private void Wait(object state, Delegate @delegate, Action<Delegate, object> act
catch
{
// the execution failed, terminate this AsyncLock
lock (guard)
lock (_guard)
{
// throw away the queue
queue = null;
_queue = null;
// report fault
hasFaulted = true;
_hasFaulted = true;
}
throw;
}

// execution succeeded, let's see if more work has to be done
lock (guard)
lock (_guard)
{
var q = queue;
var q = _queue;
// either there is no queue yet or we run out of work
if (q == null || q.Count == 0)
{
// release the lock
isAcquired = false;
_isAcquired = false;
return;
}

Expand All @@ -129,10 +129,10 @@ private void Wait(object state, Delegate @delegate, Action<Delegate, object> act
/// </summary>
public void Dispose()
{
lock (guard)
lock (_guard)
{
queue = null;
hasFaulted = true;
_queue = null;
_hasFaulted = true;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information.

#if NO_THREAD && WINDOWS
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Concurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information.

#if !NO_THREAD
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;

Expand Down Expand Up @@ -236,7 +235,7 @@ public void Dispose()
private sealed class FastPeriodicTimer : IDisposable
{
private readonly Action _action;
private volatile bool disposed;
private volatile bool _disposed;

public FastPeriodicTimer(Action action)
{
Expand All @@ -254,15 +253,15 @@ private static void Loop(object threadParam)
{
var timer = (FastPeriodicTimer)threadParam;

while (!timer.disposed)
while (!timer._disposed)
{
timer._action();
}
}

public void Dispose()
{
disposed = true;
_disposed = true;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace System.Reactive.Concurrency
/// <seealso cref="Scheduler.CurrentThread">Singleton instance of this type exposed through this static property.</seealso>
public sealed class CurrentThreadScheduler : LocalScheduler
{
private static readonly Lazy<CurrentThreadScheduler> s_instance = new Lazy<CurrentThreadScheduler>(() => new CurrentThreadScheduler());
private static readonly Lazy<CurrentThreadScheduler> _staticInstance = new Lazy<CurrentThreadScheduler>(() => new CurrentThreadScheduler());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I understand the .editorconfig right, than static readonly fields should be in PascalCase. Although it does not state if this is the case independently of the accessibility modifiers. @onovotny used PascalCase in #720.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a property with name Instance already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See L24:

public static CurrentThreadScheduler Instance => _staticInstance.Value;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I also prefer to start private fields with an underscore else you have to become creative for backing fields. Like LazyInstance for example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always bring s_ back for static instance fields, but I thought that was nasty.


private CurrentThreadScheduler()
{
Expand All @@ -21,34 +21,34 @@ private CurrentThreadScheduler()
/// <summary>
/// Gets the singleton instance of the current thread scheduler.
/// </summary>
public static CurrentThreadScheduler Instance => s_instance.Value;
public static CurrentThreadScheduler Instance => _staticInstance.Value;

[ThreadStatic]
private static SchedulerQueue<TimeSpan> s_threadLocalQueue;
private static SchedulerQueue<TimeSpan> _threadLocalQueue;

[ThreadStatic]
private static IStopwatch s_clock;
private static IStopwatch _clock;

[ThreadStatic]
private static bool running;
private static bool _running;

private static SchedulerQueue<TimeSpan> GetQueue() => s_threadLocalQueue;
private static SchedulerQueue<TimeSpan> GetQueue() => _threadLocalQueue;

private static void SetQueue(SchedulerQueue<TimeSpan> newQueue)
{
s_threadLocalQueue = newQueue;
_threadLocalQueue = newQueue;
}

private static TimeSpan Time
{
get
{
if (s_clock == null)
if (_clock == null)
{
s_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch();
_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch();
}

return s_clock.Elapsed;
return _clock.Elapsed;
}
}

Expand All @@ -64,7 +64,7 @@ private static TimeSpan Time
/// Gets a value that indicates whether the caller must call a Schedule method.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static bool IsScheduleRequired => !running;
public static bool IsScheduleRequired => !_running;

/// <summary>
/// Schedules an action to be executed after dueTime.
Expand All @@ -85,9 +85,9 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
var queue = default(SchedulerQueue<TimeSpan>);

// There is no timed task and no task is currently running
if (!running)
if (!_running)
{
running = true;
_running = true;

if (dueTime > TimeSpan.Zero)
{
Expand All @@ -103,7 +103,7 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
catch
{
SetQueue(null);
running = false;
_running = false;
throw;
}

Expand All @@ -120,12 +120,12 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
finally
{
SetQueue(null);
running = false;
_running = false;
}
}
else
{
running = false;
_running = false;
}

return d;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ namespace System.Reactive.Concurrency
/// <seealso cref="Scheduler.Default">Singleton instance of this type exposed through this static property.</seealso>
public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic
{
private static readonly Lazy<DefaultScheduler> s_instance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
private static IConcurrencyAbstractionLayer s_cal = ConcurrencyAbstractionLayer.Current;
private static readonly Lazy<DefaultScheduler> _instance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
private static IConcurrencyAbstractionLayer _cal = ConcurrencyAbstractionLayer.Current;

/// <summary>
/// Gets the singleton instance of the default scheduler.
/// </summary>
public static DefaultScheduler Instance => s_instance.Value;
public static DefaultScheduler Instance => _instance.Value;

private DefaultScheduler()
{
Expand All @@ -41,7 +41,7 @@ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TSta

var workItem = new UserWorkItem<TState>(this, state, action);

workItem.CancelQueueDisposable = s_cal.QueueUserWorkItem(
workItem.CancelQueueDisposable = _cal.QueueUserWorkItem(
closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
workItem);

Expand Down Expand Up @@ -72,7 +72,7 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun

var workItem = new UserWorkItem<TState>(this, state, action);

workItem.CancelQueueDisposable = s_cal.StartTimer(
workItem.CancelQueueDisposable = _cal.StartTimer(
closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
workItem,
dt);
Expand Down Expand Up @@ -117,7 +117,7 @@ public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState,
_state = state;
_action = action;

_cancel = s_cal.StartPeriodicTimer(Tick, period);
_cancel = _cal.StartPeriodicTimer(Tick, period);
}

private void Tick()
Expand Down Expand Up @@ -145,7 +145,7 @@ protected override object GetService(Type serviceType)
{
if (serviceType == typeof(ISchedulerLongRunning))
{
if (s_cal.SupportsLongRunning)
if (_cal.SupportsLongRunning)
{
return LongRunning.Instance;
}
Expand All @@ -168,7 +168,7 @@ public LongScheduledWorkItem(TState state, Action<TState, ICancelable> action)
_state = state;
_action = action;

s_cal.StartThread(
_cal.StartThread(
@thisObject =>
{
var @this = (LongScheduledWorkItem<TState>)@thisObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDi
/// <summary>
/// Counter for diagnostic purposes, to name the threads.
/// </summary>
private static int s_counter;
private static int _counter;

/// <summary>
/// Thread factory function.
Expand Down Expand Up @@ -82,7 +82,7 @@ public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDi
/// Creates an object that schedules units of work on a designated thread.
/// </summary>
public EventLoopScheduler()
: this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true })
: this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref _counter), IsBackground = true })
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected override DateTimeOffset Add(DateTimeOffset absolute, TimeSpan relative
/// </summary>
public class HistoricalScheduler : HistoricalSchedulerBase
{
private readonly SchedulerQueue<DateTimeOffset> queue = new SchedulerQueue<DateTimeOffset>();
private readonly SchedulerQueue<DateTimeOffset> _queue = new SchedulerQueue<DateTimeOffset>();

/// <summary>
/// Creates a new historical scheduler with the minimum value of <see cref="DateTimeOffset"/> as the initial clock value.
Expand Down Expand Up @@ -105,13 +105,13 @@ public HistoricalScheduler(DateTimeOffset initialClock, IComparer<DateTimeOffset
/// <returns>The next scheduled item.</returns>
protected override IScheduledItem<DateTimeOffset> GetNext()
{
while (queue.Count > 0)
while (_queue.Count > 0)
{
var next = queue.Peek();
var next = _queue.Peek();

if (next.IsCanceled)
{
queue.Dequeue();
_queue.Dequeue();
}
else
{
Expand Down Expand Up @@ -142,12 +142,12 @@ public override IDisposable ScheduleAbsolute<TState>(TState state, DateTimeOffse

var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
{
queue.Remove(si);
_queue.Remove(si);
return action(scheduler, state1);
});

si = new ScheduledItem<DateTimeOffset, TState>(this, state, run, dueTime, Comparer);
queue.Enqueue(si);
_queue.Enqueue(si);

return si;
}
Expand Down
Loading