Skip to content

feat: Dynamically size the UnityTransport send queues [MTT-2816] #2212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 27, 2022
Merged
2 changes: 2 additions & 0 deletions com.unity.netcode.gameobjects/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Additional documentation and release notes are available at [Multiplayer Documen

### Changed

- The send queues of `UnityTransport` are now dynamically-sized. This means that there shouldn't be any need anymore to tweak the 'Max Send Queue Size' value. In fact, this field is now removed from the inspector and will not be serialized anymore. It is still possible to set it manually using the `MaxSendQueueSize` property, but it is not recommended to do so aside from some specific needs (e.g. limiting the amount of memory used by the send queues in very constrained environments). (#2212)
- As a consequence of the above change, the `UnityTransport.InitialMaxSendQueueSize` field is now deprecated. There is no default value anymore since send queues are dynamically-sized. (#2212)
- The debug simulator in `UnityTransport` is now non-deterministic. Its random number generator used to be seeded with a constant value, leading to the same pattern of packet drops, delays, and jitter in every run. (#2196)

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,30 @@ namespace Unity.Netcode.Transports.UTP
/// <summary>Queue for batched messages meant to be sent through UTP.</summary>
/// <remarks>
/// Messages should be pushed on the queue with <see cref="PushMessage"/>. To send batched
/// messages, call <see cref="FillWriter"> with the <see cref="DataStreamWriter"/> obtained from
/// <see cref="NetworkDriver.BeginSend"/>. This will fill the writer with as many messages as
/// possible. If the send is successful, call <see cref="Consume"/> to remove the data from the
/// queue.
/// messages, call <see cref="FillWriterWithMessages"/> or <see cref="FillWriterWithBytes"/>
/// with the <see cref="DataStreamWriter"/> obtained from <see cref="NetworkDriver.BeginSend"/>.
/// This will fill the writer with as many messages/bytes as possible. If the send is
/// successful, call <see cref="Consume"/> to remove the data from the queue.
///
/// This is meant as a companion to <see cref="BatchedReceiveQueue"/>, which should be used to
/// read messages sent with this queue.
/// </remarks>
internal struct BatchedSendQueue : IDisposable
{
private NativeArray<byte> m_Data;
// Note that we're using NativeList basically like a growable NativeArray, where the length
// of the list is the capacity of our array. (We can't use the capacity of the list as our
// queue capacity because NativeList may elect to set it higher than what we'd set it to
// with SetCapacity, which breaks the logic of our code.)
private NativeList<byte> m_Data;
private NativeArray<int> m_HeadTailIndices;
private int m_MaximumCapacity;
private int m_MinimumCapacity;

/// <summary>Overhead that is added to each message in the queue.</summary>
public const int PerMessageOverhead = sizeof(int);

internal const int MinimumMinimumCapacity = 4096;

// Indices into m_HeadTailIndicies.
private const int k_HeadInternalIndex = 0;
private const int k_TailInternalIndex = 1;
Expand All @@ -43,18 +51,33 @@ private int TailIndex
}

public int Length => TailIndex - HeadIndex;

public int Capacity => m_Data.Length;
public bool IsEmpty => HeadIndex == TailIndex;

public bool IsCreated => m_Data.IsCreated;

/// <summary>Construct a new empty send queue.</summary>
/// <param name="capacity">Maximum capacity of the send queue.</param>
public BatchedSendQueue(int capacity)
{
m_Data = new NativeArray<byte>(capacity, Allocator.Persistent);
// Make sure the maximum capacity will be even.
m_MaximumCapacity = capacity + (capacity & 1);

// We pick the minimum capacity such that if we keep doubling it, we'll eventually hit
// the maximum capacity exactly. The alternative would be to use capacities that are
// powers of 2, but this can lead to over-allocating quite a bit of memory (especially
// since we expect maximum capacities to be in the megabytes range). The approach taken
// here avoids this issue, at the cost of not having allocations of nice round sizes.
m_MinimumCapacity = m_MaximumCapacity;
while (m_MinimumCapacity / 2 >= MinimumMinimumCapacity)
{
m_MinimumCapacity /= 2;
}

m_Data = new NativeList<byte>(m_MinimumCapacity, Allocator.Persistent);
m_HeadTailIndices = new NativeArray<int>(2, Allocator.Persistent);

m_Data.ResizeUninitialized(m_MinimumCapacity);

HeadIndex = 0;
TailIndex = 0;
}
Expand All @@ -68,22 +91,28 @@ public void Dispose()
}
}

/// <summary>Write a raw buffer to a DataStreamWriter.</summary>
private unsafe void WriteBytes(ref DataStreamWriter writer, byte* data, int length)
{
#if UTP_TRANSPORT_2_0_ABOVE
writer.WriteBytesUnsafe(data, length);
#else
writer.WriteBytes(data, length);
#endif
}

/// <summary>Append data at the tail of the queue. No safety checks.</summary>
private void AppendDataAtTail(ArraySegment<byte> data)
{
unsafe
{
var writer = new DataStreamWriter((byte*)m_Data.GetUnsafePtr() + TailIndex, m_Data.Length - TailIndex);
var writer = new DataStreamWriter((byte*)m_Data.GetUnsafePtr() + TailIndex, Capacity - TailIndex);

writer.WriteInt(data.Count);

fixed (byte* dataPtr = data.Array)
{
#if UTP_TRANSPORT_2_0_ABOVE
writer.WriteBytesUnsafe(dataPtr + data.Offset, data.Count);
#else
writer.WriteBytes(dataPtr + data.Offset, data.Count);
#endif
WriteBytes(ref writer, dataPtr + data.Offset, data.Count);
}
}

Expand All @@ -104,29 +133,55 @@ public bool PushMessage(ArraySegment<byte> message)
}

// Check if there's enough room after the current tail index.
if (m_Data.Length - TailIndex >= sizeof(int) + message.Count)
if (Capacity - TailIndex >= sizeof(int) + message.Count)
{
AppendDataAtTail(message);
return true;
}

// Check if there would be enough room if we moved data at the beginning of m_Data.
if (m_Data.Length - TailIndex + HeadIndex >= sizeof(int) + message.Count)
// Move the data at the beginning of of m_Data. Either it will leave enough space for
// the message, or we'll grow m_Data and will want the data at the beginning anyway.
if (HeadIndex > 0 && Length > 0)
{
// Move the data back at the beginning of m_Data.
unsafe
{
UnsafeUtility.MemMove(m_Data.GetUnsafePtr(), (byte*)m_Data.GetUnsafePtr() + HeadIndex, Length);
}

TailIndex = Length;
HeadIndex = 0;
}

// If there's enough space left at the end for the message, now is a good time to trim
// the capacity of m_Data if it got very large. We define "very large" here as having
// more than 75% of m_Data unused after adding the new message.
if (Capacity - TailIndex >= sizeof(int) + message.Count)
{
AppendDataAtTail(message);

while (TailIndex < Capacity / 4 && Capacity > m_MinimumCapacity)
{
m_Data.ResizeUninitialized(Capacity / 2);
}

return true;
}

return false;
// If we get here we need to grow m_Data until the data fits (or it's too large).
while (Capacity - TailIndex < sizeof(int) + message.Count)
{
// Can't grow m_Data anymore. Message simply won't fit.
if (Capacity * 2 > m_MaximumCapacity)
{
return false;
}

m_Data.ResizeUninitialized(Capacity * 2);
}

// If we get here we know there's now enough room for the message.
AppendDataAtTail(message);
return true;
}

/// <summary>
Expand All @@ -153,11 +208,13 @@ public int FillWriterWithMessages(ref DataStreamWriter writer)

unsafe
{
var dataPtr = (byte*)m_Data.GetUnsafePtr() + HeadIndex;

#if UTP_TRANSPORT_2_0_ABOVE
var slice = m_Data.GetSubArray(HeadIndex, Length);
var slice = NativeArray.ConvertExistingDataToNativeArray<byte>(dataPtr, Length, Allocator.None);
var reader = new DataStreamReader(slice);
#else
var reader = new DataStreamReader((byte*)m_Data.GetUnsafePtr() + HeadIndex, Length);
var reader = new DataStreamReader(dataPtr, Length);
#endif

var writerAvailable = writer.Capacity;
Expand All @@ -177,11 +234,7 @@ public int FillWriterWithMessages(ref DataStreamWriter writer)
writer.WriteInt(messageLength);

var messageOffset = HeadIndex + reader.GetBytesRead();
#if UTP_TRANSPORT_2_0_ABOVE
writer.WriteBytesUnsafe((byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength);
#else
writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength);
#endif
WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength);

writerAvailable -= sizeof(int) + messageLength;
readerOffset += sizeof(int) + messageLength;
Expand Down Expand Up @@ -218,11 +271,7 @@ public int FillWriterWithBytes(ref DataStreamWriter writer)

unsafe
{
#if UTP_TRANSPORT_2_0_ABOVE
writer.WriteBytesUnsafe((byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength);
#else
writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength);
#endif
WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength);
}

return copyLength;
Expand All @@ -236,10 +285,14 @@ public int FillWriterWithBytes(ref DataStreamWriter writer)
/// <param name="size">Number of bytes to consume from the queue.</param>
public void Consume(int size)
{
// Adjust the head/tail indices such that we consume the given size.
if (size >= Length)
{
HeadIndex = 0;
TailIndex = 0;

// This is a no-op if m_Data is already at minimum capacity.
m_Data.ResizeUninitialized(m_MinimumCapacity);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,13 @@ private enum State
/// <summary>
/// The default maximum send queue size
/// </summary>
[Obsolete("MaxSendQueueSize is now determined dynamically (can still be set programmatically using the MaxSendQueueSize property). This initial value is not used anymore.", false)]
public const int InitialMaxSendQueueSize = 16 * InitialMaxPayloadSize;

// Maximum reliable throughput, assuming the full reliable window can be sent on every
// frame at 60 FPS. This will be a large over-estimation in any realistic scenario.
private const int k_MaxReliableThroughput = (NetworkParameterConstants.MTU * 32 * 60) / 1000; // bytes per millisecond

private static ConnectionAddressData s_DefaultConnectionAddressData = new ConnectionAddressData { Address = "127.0.0.1", Port = 7777, ServerListenAddress = string.Empty };

#pragma warning disable IDE1006 // Naming Styles
Expand Down Expand Up @@ -193,15 +198,17 @@ public int MaxPayloadSize
set => m_MaxPayloadSize = value;
}

[Tooltip("The maximum size in bytes of the transport send queue. The send queue accumulates messages for batching and stores messages when other internal send queues are full. If you routinely observe an error about too many in-flight packets, try increasing this.")]
[SerializeField]
private int m_MaxSendQueueSize = InitialMaxSendQueueSize;
private int m_MaxSendQueueSize = 0;

/// <summary>The maximum size in bytes of the transport send queue.</summary>
/// <remarks>
/// The send queue accumulates messages for batching and stores messages when other internal
/// send queues are full. If you routinely observe an error about too many in-flight packets,
/// try increasing this.
/// send queues are full. Note that there should not be any need to set this value manually
/// since the send queue size is dynamically sized based on need.
///
/// This value should only be set if you have particular requirements (e.g. if you want to
/// limit the memory usage of the send queues). Note however that setting this value too low
/// can easily lead to disconnections under heavy traffic.
/// </remarks>
public int MaxSendQueueSize
{
Expand Down Expand Up @@ -551,11 +558,6 @@ private static RelayConnectionData ConvertConnectionData(byte[] connectionData)
}
}

internal void SetMaxPayloadSize(int maxPayloadSize)
{
m_MaxPayloadSize = maxPayloadSize;
}

private void SetProtocol(ProtocolType inProtocol)
{
m_ProtocolType = inProtocol;
Expand Down Expand Up @@ -1211,7 +1213,23 @@ public override void Send(ulong clientId, ArraySegment<byte> payload, NetworkDel
var sendTarget = new SendTarget(clientId, pipeline);
if (!m_SendQueue.TryGetValue(sendTarget, out var queue))
{
queue = new BatchedSendQueue(Math.Max(m_MaxSendQueueSize, m_MaxPayloadSize));
// The maximum size of a send queue is determined according to the disconnection
// timeout. The idea being that if the send queue contains enough reliable data that
// sending it all out would take longer than the disconnection timeout, then there's
// no point storing even more in the queue (it would be like having a ping higher
// than the disconnection timeout, which is far into the realm of unplayability).
//
// The throughput used to determine what consists the maximum send queue size is
// the maximum theoritical throughput of the reliable pipeline assuming we only send
// on each update at 60 FPS, which turns out to be around 2.688 MB/s.
//
// Note that we only care about reliable throughput for send queues because that's
// the only case where a full send queue causes a connection loss. Full unreliable
// send queues are dealt with by flushing it out to the network or simply dropping
// new messages if that fails.
var maxCapacity = m_MaxSendQueueSize > 0 ? m_MaxSendQueueSize : m_DisconnectTimeoutMS * k_MaxReliableThroughput;

queue = new BatchedSendQueue(Math.Max(maxCapacity, m_MaxPayloadSize));
m_SendQueue.Add(sendTarget, queue);
}

Expand All @@ -1225,8 +1243,7 @@ public override void Send(ulong clientId, ArraySegment<byte> payload, NetworkDel

var ngoClientId = NetworkManager?.TransportIdToClientId(clientId) ?? clientId;
Debug.LogError($"Couldn't add payload of size {payload.Count} to reliable send queue. " +
$"Closing connection {ngoClientId} as reliability guarantees can't be maintained. " +
$"Perhaps 'Max Send Queue Size' ({m_MaxSendQueueSize}) is too small for workload.");
$"Closing connection {ngoClientId} as reliability guarantees can't be maintained.");

if (clientId == m_ServerClientId)
{
Expand Down
Loading