-
Notifications
You must be signed in to change notification settings - Fork 6k
Fix race condition introduced by background platform channels #29377
Changes from all commits
45cc4b5
0cab9a1
ff024ca
6b02fb0
a480997
c59266f
4bae8ac
5f0cbf8
fcc5ae1
c90f6ea
983943a
db476b4
ea25323
0b794ea
4f193f2
d36def9
291e88e
d40a3b7
9aad025
b14cdef
7d6dd55
4120dbd
7fce615
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,9 +14,10 @@ | |
import io.flutter.plugin.common.BinaryMessenger; | ||
import java.nio.ByteBuffer; | ||
import java.util.HashMap; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.WeakHashMap; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
@@ -33,22 +34,37 @@ class DartMessenger implements BinaryMessenger, PlatformMessageHandler { | |
|
||
@NonNull private final FlutterJNI flutterJNI; | ||
|
||
@NonNull private final ConcurrentHashMap<String, HandlerInfo> messageHandlers; | ||
/** | ||
* Maps a channel name to an object that contains the task queue and the handler associated with | ||
* the channel. | ||
* | ||
* <p>Reads and writes to this map must lock {@code handlersLock}. | ||
*/ | ||
@NonNull private final Map<String, HandlerInfo> messageHandlers = new HashMap<>(); | ||
|
||
/** | ||
* Maps a channel name to an object that holds information about the incoming Dart message. | ||
* | ||
* <p>Reads and writes to this map must lock {@code handlersLock}. | ||
*/ | ||
@NonNull private Map<String, List<BufferedMessageInfo>> bufferedMessages = new HashMap<>(); | ||
|
||
@NonNull private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies; | ||
@NonNull private final Object handlersLock = new Object(); | ||
@NonNull private final AtomicBoolean enableBufferingIncomingMessages = new AtomicBoolean(false); | ||
|
||
@NonNull private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies = new HashMap<>(); | ||
private int nextReplyId = 1; | ||
|
||
@NonNull private final DartMessengerTaskQueue platformTaskQueue = new PlatformTaskQueue(); | ||
|
||
@NonNull private WeakHashMap<TaskQueue, DartMessengerTaskQueue> createdTaskQueues; | ||
@NonNull | ||
private WeakHashMap<TaskQueue, DartMessengerTaskQueue> createdTaskQueues = | ||
new WeakHashMap<TaskQueue, DartMessengerTaskQueue>(); | ||
|
||
@NonNull private TaskQueueFactory taskQueueFactory; | ||
|
||
DartMessenger(@NonNull FlutterJNI flutterJNI, @NonNull TaskQueueFactory taskQueueFactory) { | ||
this.flutterJNI = flutterJNI; | ||
this.messageHandlers = new ConcurrentHashMap<>(); | ||
this.pendingReplies = new HashMap<>(); | ||
this.createdTaskQueues = new WeakHashMap<TaskQueue, DartMessengerTaskQueue>(); | ||
this.taskQueueFactory = taskQueueFactory; | ||
} | ||
|
||
|
@@ -78,6 +94,10 @@ public DartMessengerTaskQueue makeBackgroundTaskQueue() { | |
} | ||
} | ||
|
||
/** | ||
* Holds information about a platform handler, such as the task queue that processes messages from | ||
* Dart. | ||
*/ | ||
private static class HandlerInfo { | ||
@NonNull public final BinaryMessenger.BinaryMessageHandler handler; | ||
@Nullable public final DartMessengerTaskQueue taskQueue; | ||
|
@@ -90,7 +110,22 @@ private static class HandlerInfo { | |
} | ||
} | ||
|
||
/** A serial task queue that can run on a concurrent ExecutorService. */ | ||
/** | ||
* Holds information that allows to dispatch a Dart message to a platform handler when it becomes | ||
* available. | ||
*/ | ||
private static class BufferedMessageInfo { | ||
@NonNull public final ByteBuffer message; | ||
int replyId; | ||
long messageData; | ||
|
||
BufferedMessageInfo(@NonNull ByteBuffer message, int replyId, long messageData) { | ||
this.message = message; | ||
this.replyId = replyId; | ||
this.messageData = messageData; | ||
} | ||
} | ||
|
||
static class DefaultTaskQueue implements DartMessengerTaskQueue { | ||
@NonNull private final ExecutorService executor; | ||
@NonNull private final ConcurrentLinkedQueue<Runnable> queue; | ||
|
@@ -154,18 +189,53 @@ public void setMessageHandler( | |
@Nullable TaskQueue taskQueue) { | ||
if (handler == null) { | ||
Log.v(TAG, "Removing handler for channel '" + channel + "'"); | ||
messageHandlers.remove(channel); | ||
} else { | ||
DartMessengerTaskQueue dartMessengerTaskQueue = null; | ||
if (taskQueue != null) { | ||
dartMessengerTaskQueue = createdTaskQueues.get(taskQueue); | ||
if (dartMessengerTaskQueue == null) { | ||
throw new IllegalArgumentException( | ||
"Unrecognized TaskQueue, use BinaryMessenger to create your TaskQueue (ex makeBackgroundTaskQueue)."); | ||
} | ||
synchronized (handlersLock) { | ||
messageHandlers.remove(channel); | ||
} | ||
Log.v(TAG, "Setting handler for channel '" + channel + "'"); | ||
return; | ||
} | ||
DartMessengerTaskQueue dartMessengerTaskQueue = null; | ||
if (taskQueue != null) { | ||
dartMessengerTaskQueue = createdTaskQueues.get(taskQueue); | ||
if (dartMessengerTaskQueue == null) { | ||
throw new IllegalArgumentException( | ||
"Unrecognized TaskQueue, use BinaryMessenger to create your TaskQueue (ex makeBackgroundTaskQueue)."); | ||
} | ||
} | ||
Log.v(TAG, "Setting handler for channel '" + channel + "'"); | ||
|
||
List<BufferedMessageInfo> list; | ||
synchronized (handlersLock) { | ||
messageHandlers.put(channel, new HandlerInfo(handler, dartMessengerTaskQueue)); | ||
list = bufferedMessages.remove(channel); | ||
if (list == null) { | ||
return; | ||
} | ||
} | ||
for (BufferedMessageInfo info : list) { | ||
gaaclarke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
dispatchMessageToQueue( | ||
channel, messageHandlers.get(channel), info.message, info.replyId, info.messageData); | ||
} | ||
} | ||
|
||
@Override | ||
public void enableBufferingIncomingMessages() { | ||
blasten marked this conversation as resolved.
Show resolved
Hide resolved
|
||
enableBufferingIncomingMessages.set(true); | ||
} | ||
|
||
@Override | ||
public void disableBufferingIncomingMessages() { | ||
Map<String, List<BufferedMessageInfo>> pendingMessages; | ||
synchronized (handlersLock) { | ||
enableBufferingIncomingMessages.set(false); | ||
pendingMessages = bufferedMessages; | ||
bufferedMessages = new HashMap<>(); | ||
} | ||
for (Map.Entry<String, List<BufferedMessageInfo>> channel : pendingMessages.entrySet()) { | ||
for (BufferedMessageInfo info : channel.getValue()) { | ||
dispatchMessageToQueue( | ||
channel.getKey(), null, info.message, info.replyId, info.messageData); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -218,25 +288,21 @@ private void invokeHandler( | |
} | ||
} | ||
|
||
@Override | ||
public void handleMessageFromDart( | ||
@NonNull final String channel, | ||
private void dispatchMessageToQueue( | ||
@NonNull String channel, | ||
@Nullable HandlerInfo handlerInfo, | ||
@Nullable ByteBuffer message, | ||
final int replyId, | ||
int replyId, | ||
long messageData) { | ||
// Called from the ui thread. | ||
Log.v(TAG, "Received message from Dart over channel '" + channel + "'"); | ||
@Nullable final HandlerInfo handlerInfo = messageHandlers.get(channel); | ||
@Nullable | ||
final DartMessengerTaskQueue taskQueue = (handlerInfo != null) ? handlerInfo.taskQueue : null; | ||
Runnable myRunnable = | ||
() -> { | ||
Trace.beginSection("DartMessenger#handleMessageFromDart on " + channel); | ||
try { | ||
invokeHandler(handlerInfo, message, replyId); | ||
if (message != null && message.isDirect()) { | ||
// This ensures that if a user retains an instance to the ByteBuffer and it happens to | ||
// be direct they will get a deterministic error. | ||
// This ensures that if a user retains an instance to the ByteBuffer and it | ||
// happens to be direct they will get a deterministic error. | ||
message.limit(0); | ||
} | ||
} finally { | ||
|
@@ -245,12 +311,43 @@ public void handleMessageFromDart( | |
Trace.endSection(); | ||
} | ||
}; | ||
@NonNull | ||
final DartMessengerTaskQueue nonnullTaskQueue = | ||
taskQueue == null ? platformTaskQueue : taskQueue; | ||
nonnullTaskQueue.dispatch(myRunnable); | ||
} | ||
|
||
@Override | ||
public void handleMessageFromDart( | ||
@NonNull String channel, @Nullable ByteBuffer message, int replyId, long messageData) { | ||
// Called from the ui thread. | ||
Log.v(TAG, "Received message from Dart over channel '" + channel + "'"); | ||
|
||
HandlerInfo handlerInfo; | ||
boolean messageDeferred; | ||
synchronized (handlersLock) { | ||
handlerInfo = messageHandlers.get(channel); | ||
messageDeferred = (enableBufferingIncomingMessages.get() && handlerInfo == null); | ||
if (messageDeferred) { | ||
// The channel is not defined when the Dart VM sends a message before the channels are | ||
// registered. | ||
// | ||
// This is possible if the Dart VM starts before channel registration, and if the thread | ||
// that registers the channels is busy or slow at registering the channel handlers. | ||
// | ||
// In such cases, the task dispatchers are queued, and processed when the channel is | ||
// defined. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't this also happen if the handler gets unregistered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is right. In this case, |
||
if (!bufferedMessages.containsKey(channel)) { | ||
bufferedMessages.put(channel, new LinkedList<>()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I implemented Channel Buffers I got a lot of flak about having unbounded buffering of platform channels. The resolution we ultimately decided upon was a RingBuffer of size 1 with control messages that can be used to adjust the size. The concern was that incorrect usage could blow up memory. I thought it was overly cautious. Maybe we should have a RingBuffer at least so it isn't completely unbounded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the suggestion to override messages? How is that desired in an app that runs into this situation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A RingBuffer drops messages after its capacity is exhausted, dropping messages isn't a desired behavior but also queueing up infinite messages isn't desired. Messages that write data can have 2 different semantics, So, in summary, a RingBuffer doesn't always drop messages and when it does it isn't necessarily a logical error. If I were king of Flutter I'd say just chose some reasonable upper bounds for the RingBuffer, like 1024. I can tell you though that we've had this discussion before as a team and the resolution was that the RingBuffer size should be 1 (with an option to change it). attn @Hixie who was involved in deciding how Channel Buffers should work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a RingBuffer is preferred, I'm happy to change it. I wasn't aware that it's ok to drop some messages. e.g. could that yield to invalid state in a way that is really hard to debug? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the plan that buffering messages only happens for a limited amount of time via What is the plan for messages that are sent, buffered, but no handler is ever registered for them? Are you going to hold onto them indefinitely? In the past they would have gotten an exception that maybe was ignored but now will be held onto. Maybe once we turn off the time frame where messages are buffered all of those messages without handlers could be cleared out to match the previous behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
right on
Added some logic to address this in disableBufferingIncomingMessages |
||
} | ||
List<BufferedMessageInfo> buffer = bufferedMessages.get(channel); | ||
buffer.add(new BufferedMessageInfo(message, replyId, messageData)); | ||
} | ||
} | ||
if (!messageDeferred) { | ||
dispatchMessageToQueue(channel, handlerInfo, message, replyId, messageData); | ||
} | ||
} | ||
|
||
@Override | ||
public void handlePlatformMessageResponse(int replyId, @Nullable ByteBuffer reply) { | ||
Log.v(TAG, "Received message reply from Dart."); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,22 @@ default void setMessageHandler( | |
setMessageHandler(channel, handler); | ||
} | ||
|
||
/** | ||
* Enables the ability to queue messages received from Dart. | ||
* | ||
* <p>This is useful when there are pending channel handler registrations. For example, Dart may | ||
* be initialized concurrently, and prior to the registration of the channel handlers. This | ||
* implies that Dart may start sending messages while plugins are being registered. | ||
*/ | ||
void enableBufferingIncomingMessages(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is going to be another PR that will enable and disable the buffering during the timeframe of plugin registration I imagine? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's right |
||
|
||
/** | ||
* Disables the ability to queue messages received from Dart. | ||
* | ||
* <p>This can be used after all pending channel handlers have been registered. | ||
*/ | ||
void disableBufferingIncomingMessages(); | ||
|
||
/** Handler for incoming binary messages from Flutter. */ | ||
interface BinaryMessageHandler { | ||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of two
Map
objects, could we jus tkeep oneConcurrentHashMap
and either create a wrapper object for the buffered messages or add them toHandlerInfo
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wrapper object would still need to represent two different states. One with a Handler, and one without a handler, but with buffered messages. FWIW, having two separate maps provides a more transparent separation of concerns.