Skip to content
Closed
77 changes: 52 additions & 25 deletions com.unity.multiplayer.mlapi/Runtime/Core/NetworkManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,16 @@ private void HandleRawTransportPoll(NetworkEvent networkEvent, ulong clientId, N
}

private readonly NetworkBuffer m_InputBufferWrapper = new NetworkBuffer(new byte[0]);
bool m_InputBufferWrapperUsed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool m_InputBufferWrapperUsed;
private bool m_InputBufferWrapperUsed;


// The fallback wrapper is used in case we have to handle incoming data but the InputStreamWrapper is already being used.
// This change is needed because MLAPI calls HandleIncomingData nested when it is applying buffered messages to an object spawned in HandleIncomingData.
// This fallback wrapper solution works because HandleIncomingData will never get nested more then once because:
// - Messages we buffer and execute in nest level 1 can never end up in another HandleIncomingData call. This is true because HandleIncomingData is only called in two cases:
// 1. When a new message arrives (nest level 0)
// 2. When that new message spawns an object and applies buffered messages (nest level 1)
//Nest level 1 can never trigger case 1. or 2. again because case 1. can only be triggered by the server sending a spawn packet down and not locally and case 2. can only be triggered by case 1..
private readonly NetworkBuffer m_FallbackInputBufferWrapper = new NetworkBuffer(new byte[0]);
private readonly RpcBatcher m_RpcBatcher = new RpcBatcher();

internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel, ArraySegment<byte> data, float receiveTime, bool allowBuffer)
Expand All @@ -927,13 +937,24 @@ internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel,
#endif
if (NetworkLog.CurrentLogLevel <= LogLevel.Developer) NetworkLog.LogInfo("Unwrapping Data Header");

m_InputBufferWrapper.SetTarget(data.Array);
m_InputBufferWrapper.SetLength(data.Count + data.Offset);
m_InputBufferWrapper.Position = data.Offset;
NetworkBuffer inputBufferWrapper;
if (m_InputBufferWrapperUsed)
{
inputBufferWrapper = m_FallbackInputBufferWrapper;
}
else
{
inputBufferWrapper = m_InputBufferWrapper;
m_InputBufferWrapperUsed = true;
}

using (var messageStream = MessagePacker.UnwrapMessage(m_InputBufferWrapper, out byte messageType))
inputBufferWrapper.SetTarget(data.Array);
inputBufferWrapper.SetLength(data.Count + data.Offset);
inputBufferWrapper.Position = data.Offset;

using (var messageBuffer = MessagePacker.UnwrapMessage(inputBufferWrapper, out byte messageType))
{
if (messageStream == null)
if (messageBuffer == null)
{
if (NetworkLog.CurrentLogLevel <= LogLevel.Error) NetworkLog.LogError("Message unwrap could not be completed. Was the header corrupt? Crypto error?");
return;
Expand Down Expand Up @@ -965,34 +986,34 @@ internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel,
switch (messageType)
{
case NetworkConstants.CONNECTION_REQUEST:
if (IsServer) InternalMessageHandler.HandleConnectionRequest(clientId, messageStream);
if (IsServer) InternalMessageHandler.HandleConnectionRequest(clientId, messageBuffer);
break;
case NetworkConstants.CONNECTION_APPROVED:
if (IsClient) InternalMessageHandler.HandleConnectionApproved(clientId, messageStream, receiveTime);
if (IsClient) InternalMessageHandler.HandleConnectionApproved(clientId, messageBuffer, receiveTime);
break;
case NetworkConstants.ADD_OBJECT:
if (IsClient) InternalMessageHandler.HandleAddObject(clientId, messageStream);
if (IsClient) InternalMessageHandler.HandleAddObject(clientId, messageBuffer);
break;
case NetworkConstants.DESTROY_OBJECT:
if (IsClient) InternalMessageHandler.HandleDestroyObject(clientId, messageStream);
if (IsClient) InternalMessageHandler.HandleDestroyObject(clientId, messageBuffer);
break;
case NetworkConstants.SWITCH_SCENE:
if (IsClient) InternalMessageHandler.HandleSwitchScene(clientId, messageStream);
if (IsClient) InternalMessageHandler.HandleSwitchScene(clientId, messageBuffer);
break;
case NetworkConstants.CHANGE_OWNER:
if (IsClient) InternalMessageHandler.HandleChangeOwner(clientId, messageStream);
if (IsClient) InternalMessageHandler.HandleChangeOwner(clientId, messageBuffer);
break;
case NetworkConstants.ADD_OBJECTS:
if (IsClient) InternalMessageHandler.HandleAddObjects(clientId, messageStream);
if (IsClient) InternalMessageHandler.HandleAddObjects(clientId, messageBuffer);
break;
case NetworkConstants.DESTROY_OBJECTS:
if (IsClient) InternalMessageHandler.HandleDestroyObjects(clientId, messageStream);
if (IsClient) InternalMessageHandler.HandleDestroyObjects(clientId, messageBuffer);
break;
case NetworkConstants.TIME_SYNC:
if (IsClient) InternalMessageHandler.HandleTimeSync(clientId, messageStream, receiveTime);
if (IsClient) InternalMessageHandler.HandleTimeSync(clientId, messageBuffer, receiveTime);
break;
case NetworkConstants.NETWORK_VARIABLE_DELTA:
InternalMessageHandler.HandleNetworkVariableDelta(clientId, messageStream, BufferCallback, new PreBufferPreset()
InternalMessageHandler.HandleNetworkVariableDelta(clientId, messageBuffer, BufferCallback, new PreBufferPreset()
{
AllowBuffer = allowBuffer,
NetworkChannel = networkChannel,
Expand All @@ -1003,7 +1024,7 @@ internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel,
});
break;
case NetworkConstants.NETWORK_VARIABLE_UPDATE:
InternalMessageHandler.HandleNetworkVariableUpdate(clientId, messageStream, BufferCallback, new PreBufferPreset()
InternalMessageHandler.HandleNetworkVariableUpdate(clientId, messageBuffer, BufferCallback, new PreBufferPreset()
{
AllowBuffer = allowBuffer,
NetworkChannel = networkChannel,
Expand All @@ -1014,24 +1035,24 @@ internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel,
});
break;
case NetworkConstants.UNNAMED_MESSAGE:
InternalMessageHandler.HandleUnnamedMessage(clientId, messageStream);
InternalMessageHandler.HandleUnnamedMessage(clientId, messageBuffer);
break;
case NetworkConstants.NAMED_MESSAGE:
InternalMessageHandler.HandleNamedMessage(clientId, messageStream);
InternalMessageHandler.HandleNamedMessage(clientId, messageBuffer);
break;
case NetworkConstants.CLIENT_SWITCH_SCENE_COMPLETED:
if (IsServer && NetworkConfig.EnableSceneManagement) InternalMessageHandler.HandleClientSwitchSceneCompleted(clientId, messageStream);
if (IsServer && NetworkConfig.EnableSceneManagement) InternalMessageHandler.HandleClientSwitchSceneCompleted(clientId, messageBuffer);
break;
case NetworkConstants.SERVER_LOG:
if (IsServer && NetworkConfig.EnableNetworkLogs) InternalMessageHandler.HandleNetworkLog(clientId, messageStream);
if (IsServer && NetworkConfig.EnableNetworkLogs) InternalMessageHandler.HandleNetworkLog(clientId, messageBuffer);
break;
case NetworkConstants.SERVER_RPC:
{
if (IsServer)
{
if (rpcQueueContainer.IsUsingBatching())
{
m_RpcBatcher.ReceiveItems(messageStream, ReceiveCallback, RpcQueueContainer.QueueItemType.ServerRpc, clientId, receiveTime);
m_RpcBatcher.ReceiveItems(messageBuffer, ReceiveCallback, RpcQueueContainer.QueueItemType.ServerRpc, clientId, receiveTime);
ProfilerStatManager.rpcBatchesRcvd.Record();
PerformanceDataManager.Increment(ProfilerConstants.NumberOfRPCBatchesReceived);
}
Expand All @@ -1040,7 +1061,7 @@ internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel,
#if DEVELOPMENT_BUILD || UNITY_EDITOR
s_ServerRpcQueued.Begin();
#endif
InternalMessageHandler.RpcReceiveQueueItem(clientId, messageStream, receiveTime, RpcQueueContainer.QueueItemType.ServerRpc);
InternalMessageHandler.RpcReceiveQueueItem(clientId, messageBuffer, receiveTime, RpcQueueContainer.QueueItemType.ServerRpc);
#if DEVELOPMENT_BUILD || UNITY_EDITOR
s_ServerRpcQueued.End();
#endif
Expand All @@ -1055,7 +1076,7 @@ internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel,
{
if (rpcQueueContainer.IsUsingBatching())
{
m_RpcBatcher.ReceiveItems(messageStream, ReceiveCallback, RpcQueueContainer.QueueItemType.ClientRpc, clientId, receiveTime);
m_RpcBatcher.ReceiveItems(messageBuffer, ReceiveCallback, RpcQueueContainer.QueueItemType.ClientRpc, clientId, receiveTime);
ProfilerStatManager.rpcBatchesRcvd.Record();
PerformanceDataManager.Increment(ProfilerConstants.NumberOfRPCBatchesReceived);
}
Expand All @@ -1064,7 +1085,7 @@ internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel,
#if DEVELOPMENT_BUILD || UNITY_EDITOR
s_ClientRpcQueued.Begin();
#endif
InternalMessageHandler.RpcReceiveQueueItem(clientId, messageStream, receiveTime, RpcQueueContainer.QueueItemType.ClientRpc);
InternalMessageHandler.RpcReceiveQueueItem(clientId, messageBuffer, receiveTime, RpcQueueContainer.QueueItemType.ClientRpc);
#if DEVELOPMENT_BUILD || UNITY_EDITOR
s_ClientRpcQueued.End();
#endif
Expand All @@ -1084,6 +1105,12 @@ internal void HandleIncomingData(ulong clientId, NetworkChannel networkChannel,
NetworkProfiler.EndEvent();
#endif
}

if (inputBufferWrapper == m_InputBufferWrapper)
{
m_InputBufferWrapperUsed = false;
}

#if DEVELOPMENT_BUILD || UNITY_EDITOR
s_HandleIncomingData.End();
#endif
Expand Down Expand Up @@ -1509,4 +1536,4 @@ internal void HandleApproval(ulong clientId, bool createPlayerObject, ulong? pla
}
}
}
}
}