diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
index a5030bcee65616..c058b0b216b17a 100644
--- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
+++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
@@ -25,7 +25,6 @@ System.Threading.Channel<T>
-
@@ -45,6 +44,7 @@ System.Threading.Channel<T>
+
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs
index 834d8ad88ed1d7..317636579a05fd 100644
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs
@@ -1,10 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-
namespace System.Threading.Channels
{
/// Provides static methods for creating channels.
@@ -13,7 +9,7 @@ public static partial class Channel
/// Creates an unbounded channel usable by any number of readers and writers concurrently.
/// The created channel.
public static Channel CreateUnbounded() =>
- new UnboundedChannel>(new(new()), runContinuationsAsynchronously: true);
+ new UnboundedChannel(runContinuationsAsynchronously: true);
/// Creates an unbounded channel subject to the provided options.
/// Specifies the type of data in the channel.
@@ -31,7 +27,7 @@ public static Channel CreateUnbounded(UnboundedChannelOptions options)
return new SingleConsumerUnboundedChannel(!options.AllowSynchronousContinuations);
}
- return new UnboundedChannel>(new(new()), !options.AllowSynchronousContinuations);
+ return new UnboundedChannel(!options.AllowSynchronousContinuations);
}
/// Creates a channel with the specified maximum capacity.
@@ -75,32 +71,5 @@ public static Channel CreateBounded(BoundedChannelOptions options, Action<
return new BoundedChannel(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
}
-
- /// Provides an for a .
- private readonly struct UnboundedChannelConcurrentQueue(ConcurrentQueue queue) : IUnboundedChannelQueue
- {
- private readonly ConcurrentQueue _queue = queue;
-
- ///
- public bool IsThreadSafe => true;
-
- ///
- public void Enqueue(T item) => _queue.Enqueue(item);
-
- ///
- public bool TryDequeue([MaybeNullWhen(false)] out T item) => _queue.TryDequeue(out item);
-
- ///
- public bool TryPeek([MaybeNullWhen(false)] out T item) => _queue.TryPeek(out item);
-
- ///
- public int Count => _queue.Count;
-
- ///
- public bool IsEmpty => _queue.IsEmpty;
-
- ///
- public IEnumerator GetEnumerator() => _queue.GetEnumerator();
- }
}
}
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs
index c8fc9baebae7db..6c24b3e41ec7b5 100644
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs
@@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
namespace System.Threading.Channels
{
@@ -10,14 +9,13 @@ namespace System.Threading.Channels
public static partial class Channel
{
/// Creates an unbounded prioritized channel usable by any number of readers and writers concurrently.
- /// Specifies the type of data in the channel.
/// The created channel.
///
/// is used to determine priority of elements.
/// The next item read from the channel will be the element available in the channel with the lowest priority value.
///
public static Channel CreateUnboundedPrioritized() =>
- new UnboundedChannel>(new(new()), runContinuationsAsynchronously: true);
+ new UnboundedPrioritizedChannel(runContinuationsAsynchronously: true, comparer: null);
/// Creates an unbounded prioritized channel subject to the provided options.
/// Specifies the type of data in the channel.
@@ -32,45 +30,7 @@ public static Channel CreateUnboundedPrioritized(UnboundedPrioritizedChann
{
ArgumentNullException.ThrowIfNull(options);
- return new UnboundedChannel>(new(new(options.Comparer)), !options.AllowSynchronousContinuations);
- }
-
- /// Provides an for a .
- private readonly struct UnboundedChannelPriorityQueue(PriorityQueue queue) : IUnboundedChannelQueue
- {
- private readonly PriorityQueue _queue = queue;
-
- ///
- public bool IsThreadSafe => false;
-
- ///
- public void Enqueue(T item) => _queue.Enqueue(true, item);
-
- ///
- public bool TryDequeue([MaybeNullWhen(false)] out T item) => _queue.TryDequeue(out _, out item);
-
- ///
- public bool TryPeek([MaybeNullWhen(false)] out T item) => _queue.TryPeek(out _, out item);
-
- ///
- public int Count => _queue.Count;
-
- ///
- public bool IsEmpty => _queue.Count == 0;
-
- ///
- public IEnumerator GetEnumerator()
- {
- List list = [];
- foreach ((bool _, T Priority) item in _queue.UnorderedItems)
- {
- list.Add(item.Priority);
- }
-
- list.Sort(_queue.Comparer);
-
- return list.GetEnumerator();
- }
+ return new UnboundedPrioritizedChannel(!options.AllowSynchronousContinuations, options.Comparer);
}
}
}
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs
index af2a77bb1bf775..a3d072ee9f7cb4 100644
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs
@@ -11,7 +11,7 @@ internal interface IDebugEnumerable
IEnumerator GetEnumerator();
}
- internal class DebugEnumeratorDebugView
+ internal sealed class DebugEnumeratorDebugView
{
public DebugEnumeratorDebugView(IDebugEnumerable enumerable)
{
@@ -26,6 +26,4 @@ public DebugEnumeratorDebugView(IDebugEnumerable enumerable)
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public T[] Items { get; }
}
-
- internal sealed class DebugEnumeratorDebugView(IDebugEnumerable enumerable) : DebugEnumeratorDebugView(enumerable);
}
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs
deleted file mode 100644
index b1b65a1dffeb10..00000000000000
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-
-namespace System.Threading.Channels
-{
- /// Representation of the queue data structure used by .
- internal interface IUnboundedChannelQueue : IDebugEnumerable
- {
- /// Gets whether the other members are safe to use concurrently with each other and themselves.
- bool IsThreadSafe { get; }
-
- /// Enqueues an item into the queue.
- /// The item to enqueue.
- void Enqueue(T item);
-
- /// Dequeues an item from the queue, if possible.
- /// The dequeued item, or default if the queue was empty.
- /// Whether an item was dequeued.
- bool TryDequeue([MaybeNullWhen(false)] out T item);
-
- /// Peeks at the next item from the queue that would be dequeued, if possible.
- /// The peeked item, or default if the queue was empty.
- /// Whether an item was peeked.
- bool TryPeek([MaybeNullWhen(false)] out T item);
-
- /// Gets the number of elements in the queue.
- int Count { get; }
-
- /// Gets whether the queue is empty.
- bool IsEmpty { get; }
- }
-}
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs
index ad7ee0e3608d3c..fb3facf83dc47e 100644
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs
@@ -5,20 +5,19 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
-using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace System.Threading.Channels
{
/// Provides a buffered channel of unbounded capacity.
[DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))]
- internal sealed class UnboundedChannel : Channel, IDebugEnumerable where TQueue : struct, IUnboundedChannelQueue
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
+ internal sealed class UnboundedChannel : Channel, IDebugEnumerable
{
/// Task that indicates the channel has completed.
private readonly TaskCompletionSource _completion;
/// The items in the channel.
- private readonly TQueue _items;
+ private readonly ConcurrentQueue _items = new ConcurrentQueue();
/// Readers blocked reading from the channel.
private readonly Deque> _blockedReaders = new Deque>();
/// Whether to force continuations to be executed asynchronously from producer writes.
@@ -30,9 +29,8 @@ internal sealed class UnboundedChannel : Channel, IDebugEnumerable
private Exception? _doneWriting;
/// Initialize the channel.
- internal UnboundedChannel(TQueue items, bool runContinuationsAsynchronously)
+ internal UnboundedChannel(bool runContinuationsAsynchronously)
{
- _items = items;
_runContinuationsAsynchronously = runContinuationsAsynchronously;
_completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
Reader = new UnboundedChannelReader(this);
@@ -40,14 +38,14 @@ internal UnboundedChannel(TQueue items, bool runContinuationsAsynchronously)
}
[DebuggerDisplay("Items = {Count}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))]
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
private sealed class UnboundedChannelReader : ChannelReader, IDebugEnumerable
{
- internal readonly UnboundedChannel _parent;
+ internal readonly UnboundedChannel _parent;
private readonly AsyncOperation _readerSingleton;
private readonly AsyncOperation _waiterSingleton;
- internal UnboundedChannelReader(UnboundedChannel parent)
+ internal UnboundedChannelReader(UnboundedChannel parent)
{
_parent = parent;
_readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true);
@@ -70,8 +68,8 @@ public override ValueTask ReadAsync(CancellationToken cancellationToken)
}
// Dequeue an item if we can.
- UnboundedChannel parent = _parent;
- if (parent._items.IsThreadSafe && parent._items.TryDequeue(out T? item))
+ UnboundedChannel parent = _parent;
+ if (parent._items.TryDequeue(out T? item))
{
CompleteIfDone(parent);
return new ValueTask(item);
@@ -114,60 +112,24 @@ public override ValueTask ReadAsync(CancellationToken cancellationToken)
public override bool TryRead([MaybeNullWhen(false)] out T item)
{
- UnboundedChannel parent = _parent;
- return parent._items.IsThreadSafe ?
- LockFree(parent, out item) :
- Locked(parent, out item);
+ UnboundedChannel parent = _parent;
- static bool LockFree(UnboundedChannel parent, [MaybeNullWhen(false)] out T item)
+ // Dequeue an item if we can
+ if (parent._items.TryDequeue(out item))
{
- if (parent._items.TryDequeue(out item))
- {
- CompleteIfDone(parent);
- return true;
- }
-
- item = default;
- return false;
+ CompleteIfDone(parent);
+ return true;
}
- static bool Locked(UnboundedChannel parent, [MaybeNullWhen(false)] out T item)
- {
- lock (parent.SyncObj)
- {
- if (parent._items.TryDequeue(out item))
- {
- CompleteIfDone(parent);
- return true;
- }
- }
-
- item = default;
- return false;
- }
+ item = default;
+ return false;
}
- public override bool TryPeek([MaybeNullWhen(false)] out T item)
- {
- UnboundedChannel parent = _parent;
- return parent._items.IsThreadSafe ?
- parent._items.TryPeek(out item) :
- Locked(parent, out item);
-
- // Separated out to keep the try/finally from preventing TryPeek from being inlined
- static bool Locked(UnboundedChannel parent, [MaybeNullWhen(false)] out T item)
- {
- lock (parent.SyncObj)
- {
- return parent._items.TryPeek(out item);
- }
- }
- }
+ public override bool TryPeek([MaybeNullWhen(false)] out T item) =>
+ _parent._items.TryPeek(out item);
- private static void CompleteIfDone(UnboundedChannel parent)
+ private static void CompleteIfDone(UnboundedChannel parent)
{
- Debug.Assert(parent._items.IsThreadSafe || Monitor.IsEntered(parent.SyncObj));
-
if (parent._doneWriting != null && parent._items.IsEmpty)
{
// If we've now emptied the items queue and we're not getting any more, complete.
@@ -182,12 +144,12 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo
return new ValueTask(Task.FromCanceled(cancellationToken));
}
- if (_parent._items.IsThreadSafe && !_parent._items.IsEmpty)
+ if (!_parent._items.IsEmpty)
{
return new ValueTask(true);
}
- UnboundedChannel parent = _parent;
+ UnboundedChannel parent = _parent;
lock (parent.SyncObj)
{
@@ -230,15 +192,15 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo
}
[DebuggerDisplay("Items = {ItemsCountForDebugger}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))]
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
private sealed class UnboundedChannelWriter : ChannelWriter, IDebugEnumerable
{
- internal readonly UnboundedChannel _parent;
- internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent;
+ internal readonly UnboundedChannel _parent;
+ internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent;
public override bool TryComplete(Exception? error)
{
- UnboundedChannel parent = _parent;
+ UnboundedChannel parent = _parent;
bool completeTask;
lock (parent.SyncObj)
@@ -278,7 +240,7 @@ public override bool TryComplete(Exception? error)
public override bool TryWrite(T item)
{
- UnboundedChannel parent = _parent;
+ UnboundedChannel parent = _parent;
while (true)
{
AsyncOperation? blockedReader = null;
@@ -359,7 +321,7 @@ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken
}
/// Gets the object used to synchronize access to all state on this instance.
- private object SyncObj => _blockedReaders;
+ private object SyncObj => _items;
[Conditional("DEBUG")]
private void AssertInvariants()
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs
new file mode 100644
index 00000000000000..a6eaba556c1f3a
--- /dev/null
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs
@@ -0,0 +1,376 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Threading.Tasks;
+
+// This file is primarily a copy of UnboundedChannel, subsequently tweaked to account for differences
+// between ConcurrentQueue and PriorityQueue, e.g. that PQ isn't thread safe and so fast
+// paths outside of locks need to be removed, that Enqueue/Dequeue methods take priorities, etc. Any
+// changes made to this or that file should largely be kept in sync.
+
+namespace System.Threading.Channels
+{
+ /// Provides a buffered channel of unbounded capacity.
+ [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")]
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
+ internal sealed class UnboundedPrioritizedChannel : Channel, IDebugEnumerable
+ {
+ /// Task that indicates the channel has completed.
+ private readonly TaskCompletionSource _completion;
+ /// The items in the channel.
+ /// To avoid double storing of a potentially large struct T, the priority doubles as the element and the element is ignored.
+ private readonly PriorityQueue _items;
+ /// Readers blocked reading from the channel.
+ private readonly Deque> _blockedReaders = new Deque>();
+ /// Whether to force continuations to be executed asynchronously from producer writes.
+ private readonly bool _runContinuationsAsynchronously;
+
+ /// Readers waiting for a notification that data is available.
+ private AsyncOperation? _waitingReadersTail;
+ /// Set to non-null once Complete has been called.
+ private Exception? _doneWriting;
+
+ /// Initialize the channel.
+ internal UnboundedPrioritizedChannel(bool runContinuationsAsynchronously, IComparer? comparer)
+ {
+ _runContinuationsAsynchronously = runContinuationsAsynchronously;
+ _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
+ Reader = new UnboundedPrioritizedChannelReader(this);
+ Writer = new UnboundedPrioritizedChannelWriter(this);
+ _items = new PriorityQueue(comparer);
+ }
+
+ [DebuggerDisplay("Items = {Count}")]
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
+ private sealed class UnboundedPrioritizedChannelReader : ChannelReader, IDebugEnumerable
+ {
+ internal readonly UnboundedPrioritizedChannel _parent;
+ private readonly AsyncOperation _readerSingleton;
+ private readonly AsyncOperation _waiterSingleton;
+
+ internal UnboundedPrioritizedChannelReader(UnboundedPrioritizedChannel parent)
+ {
+ _parent = parent;
+ _readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true);
+ _waiterSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true);
+ }
+
+ public override Task Completion => _parent._completion.Task;
+
+ public override bool CanCount => true;
+
+ public override bool CanPeek => true;
+
+ public override int Count => _parent._items.Count;
+
+ public override ValueTask ReadAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return ValueTask.FromCanceled(cancellationToken);
+ }
+
+ // Dequeue an item if we can.
+ UnboundedPrioritizedChannel parent = _parent;
+ lock (parent.SyncObj)
+ {
+ parent.AssertInvariants();
+
+ // Try to dequeue again, now that we hold the lock.
+ if (parent._items.TryDequeue(out _, out T? item))
+ {
+ CompleteIfDone(parent);
+ return new ValueTask(item);
+ }
+
+ // There are no items, so if we're done writing, fail.
+ if (parent._doneWriting != null)
+ {
+ return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting);
+ }
+
+ // If we're able to use the singleton reader, do so.
+ if (!cancellationToken.CanBeCanceled)
+ {
+ AsyncOperation singleton = _readerSingleton;
+ if (singleton.TryOwnAndReset())
+ {
+ parent._blockedReaders.EnqueueTail(singleton);
+ return singleton.ValueTaskOfT;
+ }
+ }
+
+ // Otherwise, create and queue a reader.
+ var reader = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken);
+ parent._blockedReaders.EnqueueTail(reader);
+ return reader.ValueTaskOfT;
+ }
+ }
+
+ public override bool TryRead([MaybeNullWhen(false)] out T item)
+ {
+ UnboundedPrioritizedChannel parent = _parent;
+ lock (parent.SyncObj)
+ {
+ // Dequeue an item if we can
+ if (parent._items.TryDequeue(out _, out item))
+ {
+ CompleteIfDone(parent);
+ return true;
+ }
+
+ item = default;
+ return false;
+ }
+ }
+
+ public override bool TryPeek([MaybeNullWhen(false)] out T item)
+ {
+ UnboundedPrioritizedChannel parent = _parent;
+ lock (parent.SyncObj)
+ {
+ return parent._items.TryPeek(out _, out item);
+ }
+ }
+
+ private static void CompleteIfDone(UnboundedPrioritizedChannel parent)
+ {
+ Debug.Assert(Monitor.IsEntered(parent.SyncObj));
+
+ if (parent._doneWriting != null && parent._items.Count == 0)
+ {
+ // If we've now emptied the items queue and we're not getting any more, complete.
+ ChannelUtilities.Complete(parent._completion, parent._doneWriting);
+ }
+ }
+
+ public override ValueTask WaitToReadAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return ValueTask.FromCanceled(cancellationToken);
+ }
+
+ UnboundedPrioritizedChannel parent = _parent;
+ lock (parent.SyncObj)
+ {
+ parent.AssertInvariants();
+
+ // Try again to read now that we're synchronized with writers.
+ if (parent._items.Count != 0)
+ {
+ return new ValueTask(true);
+ }
+
+ // There are no items, so if we're done writing, there's never going to be data available.
+ if (parent._doneWriting != null)
+ {
+ return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
+ ValueTask.FromException(parent._doneWriting) :
+ default;
+ }
+
+ // If we're able to use the singleton waiter, do so.
+ if (!cancellationToken.CanBeCanceled)
+ {
+ AsyncOperation singleton = _waiterSingleton;
+ if (singleton.TryOwnAndReset())
+ {
+ ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, singleton);
+ return singleton.ValueTaskOfT;
+ }
+ }
+
+ // Otherwise, create and queue a waiter.
+ var waiter = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken);
+ ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, waiter);
+ return waiter.ValueTaskOfT;
+ }
+ }
+
+ /// Gets an enumerator the debugger can use to show the contents of the channel.
+ IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator();
+ }
+
+ [DebuggerDisplay("Items = {ItemsCountForDebugger}")]
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
+ private sealed class UnboundedPrioritizedChannelWriter : ChannelWriter, IDebugEnumerable
+ {
+ internal readonly UnboundedPrioritizedChannel _parent;
+
+ internal UnboundedPrioritizedChannelWriter(UnboundedPrioritizedChannel parent) => _parent = parent;
+
+ public override bool TryComplete(Exception? error)
+ {
+ UnboundedPrioritizedChannel parent = _parent;
+ bool completeTask;
+
+ lock (parent.SyncObj)
+ {
+ parent.AssertInvariants();
+
+ // If we've already marked the channel as completed, bail.
+ if (parent._doneWriting != null)
+ {
+ return false;
+ }
+
+ // Mark that we're done writing.
+ parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel;
+ completeTask = parent._items.Count == 0;
+ }
+
+ // If there are no items in the queue, complete the channel's task,
+ // as no more data can possibly arrive at this point. We do this outside
+ // of the lock in case we'll be running synchronous completions, and we
+ // do it before completing blocked/waiting readers, so that when they
+ // wake up they'll see the task as being completed.
+ if (completeTask)
+ {
+ ChannelUtilities.Complete(parent._completion, error);
+ }
+
+ // At this point, _blockedReaders and _waitingReaders will not be mutated:
+ // they're only mutated by readers while holding the lock, and only if _doneWriting is null.
+ // freely manipulate _blockedReaders and _waitingReaders without any concurrency concerns.
+ ChannelUtilities.FailOperations, T>(parent._blockedReaders, ChannelUtilities.CreateInvalidCompletionException(error));
+ ChannelUtilities.WakeUpWaiters(ref parent._waitingReadersTail, result: false, error: error);
+
+ // Successfully transitioned to completed.
+ return true;
+ }
+
+ public override bool TryWrite(T item)
+ {
+ UnboundedPrioritizedChannel parent = _parent;
+ while (true)
+ {
+ AsyncOperation? blockedReader = null;
+ AsyncOperation? waitingReadersTail = null;
+ lock (parent.SyncObj)
+ {
+ // If writing has already been marked as done, fail the write.
+ parent.AssertInvariants();
+ if (parent._doneWriting != null)
+ {
+ return false;
+ }
+
+ // If there aren't any blocked readers, just add the data to the queue,
+ // and let any waiting readers know that they should try to read it.
+ // We can only complete such waiters here under the lock if they run
+ // continuations asynchronously (otherwise the synchronous continuations
+ // could be invoked under the lock). If we don't complete them here, we
+ // need to do so outside of the lock.
+ if (parent._blockedReaders.IsEmpty)
+ {
+ parent._items.Enqueue(true, item);
+ waitingReadersTail = parent._waitingReadersTail;
+ if (waitingReadersTail == null)
+ {
+ return true;
+ }
+ parent._waitingReadersTail = null;
+ }
+ else
+ {
+ // There were blocked readers. Grab one, and then complete it outside of the lock.
+ blockedReader = parent._blockedReaders.DequeueHead();
+ }
+ }
+
+ if (blockedReader != null)
+ {
+ // Complete the reader. It's possible the reader was canceled, in which
+ // case we loop around to try everything again.
+ if (blockedReader.TrySetResult(item))
+ {
+ return true;
+ }
+ }
+ else
+ {
+ // Wake up all of the waiters. Since we've released the lock, it's possible
+ // we could cause some spurious wake-ups here, if we tell a waiter there's
+ // something available but all data has already been removed. It's a benign
+ // race condition, though, as consumers already need to account for such things.
+ ChannelUtilities.WakeUpWaiters(ref waitingReadersTail, result: true);
+ return true;
+ }
+ }
+ }
+
+ public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken)
+ {
+ Exception? doneWriting = _parent._doneWriting;
+ return
+ cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) :
+ doneWriting == null ? new ValueTask(true) : // unbounded writing can always be done if we haven't completed
+ doneWriting != ChannelUtilities.s_doneWritingSentinel ? ValueTask.FromException(doneWriting) :
+ default;
+ }
+
+ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) =>
+ cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) :
+ TryWrite(item) ? default :
+ ValueTask.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting));
+
+ /// Gets the number of items in the channel. This should only be used by the debugger.
+ private int ItemsCountForDebugger => _parent._items.Count;
+
+ /// Gets an enumerator the debugger can use to show the contents of the channel.
+ IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator();
+ }
+
+ /// Gets the object used to synchronize access to all state on this instance.
+ private object SyncObj => _items;
+
+ [Conditional("DEBUG")]
+ private void AssertInvariants()
+ {
+ Debug.Assert(SyncObj != null, "The sync obj must not be null.");
+ Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock.");
+
+ if (_items.Count != 0)
+ {
+ if (_runContinuationsAsynchronously)
+ {
+ Debug.Assert(_blockedReaders.IsEmpty, "There's data available, so there shouldn't be any blocked readers.");
+ Debug.Assert(_waitingReadersTail == null, "There's data available, so there shouldn't be any waiting readers.");
+ }
+ Debug.Assert(!_completion.Task.IsCompleted, "We still have data available, so shouldn't be completed.");
+ }
+ if ((!_blockedReaders.IsEmpty || _waitingReadersTail != null) && _runContinuationsAsynchronously)
+ {
+ Debug.Assert(_items.Count == 0, "There are blocked/waiting readers, so there shouldn't be any data available.");
+ }
+ if (_completion.Task.IsCompleted)
+ {
+ Debug.Assert(_doneWriting != null, "We're completed, so we must be done writing.");
+ }
+ }
+
+ /// Gets the number of items in the channel. This should only be used by the debugger.
+ private int ItemsCountForDebugger => _items.Count;
+
+ /// Report if the channel is closed or not. This should only be used by the debugger.
+ private bool ChannelIsClosedForDebugger => _doneWriting != null;
+
+ /// Gets an enumerator the debugger can use to show the contents of the channel.
+ public IEnumerator GetEnumerator()
+ {
+ List list = [];
+ foreach ((bool _, T Priority) item in _items.UnorderedItems)
+ {
+ list.Add(item.Priority);
+ }
+
+ list.Sort(_items.Comparer);
+
+ return list.GetEnumerator();
+ }
+ }
+}