Skip to content

Commit e4a8d23

Browse files
authored
Implementation of two or more threads merging for multiple platform views (flutter#27662)
Implementation of two or more threads merging for both lightweight multiple engines and standalone multiple engines.
1 parent 4fef55d commit e4a8d23

21 files changed

+969
-472
lines changed

ci/licenses_golden/licenses_flutter

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ FILE: ../../../flutter/fml/posix_wrappers.h
259259
FILE: ../../../flutter/fml/raster_thread_merger.cc
260260
FILE: ../../../flutter/fml/raster_thread_merger.h
261261
FILE: ../../../flutter/fml/raster_thread_merger_unittests.cc
262+
FILE: ../../../flutter/fml/shared_thread_merger.cc
263+
FILE: ../../../flutter/fml/shared_thread_merger.h
262264
FILE: ../../../flutter/fml/size.h
263265
FILE: ../../../flutter/fml/status.h
264266
FILE: ../../../flutter/fml/synchronization/atomic_object.h

fml/BUILD.gn

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ source_set("fml") {
5757
"posix_wrappers.h",
5858
"raster_thread_merger.cc",
5959
"raster_thread_merger.h",
60+
"shared_thread_merger.cc",
61+
"shared_thread_merger.h",
6062
"size.h",
6163
"synchronization/atomic_object.h",
6264
"synchronization/count_down_latch.cc",

fml/memory/task_runner_checker.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace fml {
88

99
TaskRunnerChecker::TaskRunnerChecker()
1010
: initialized_queue_id_(InitTaskQueueId()),
11-
subsumed_queue_id_(
11+
subsumed_queue_ids_(
1212
MessageLoopTaskQueues::GetInstance()->GetSubsumedTaskQueueId(
1313
initialized_queue_id_)){};
1414

@@ -17,8 +17,15 @@ TaskRunnerChecker::~TaskRunnerChecker() = default;
1717
bool TaskRunnerChecker::RunsOnCreationTaskRunner() const {
1818
FML_CHECK(fml::MessageLoop::IsInitializedForCurrentThread());
1919
const auto current_queue_id = MessageLoop::GetCurrentTaskQueueId();
20-
return RunsOnTheSameThread(current_queue_id, initialized_queue_id_) ||
21-
RunsOnTheSameThread(current_queue_id, subsumed_queue_id_);
20+
if (RunsOnTheSameThread(current_queue_id, initialized_queue_id_)) {
21+
return true;
22+
}
23+
for (auto& subsumed : subsumed_queue_ids_) {
24+
if (RunsOnTheSameThread(current_queue_id, subsumed)) {
25+
return true;
26+
}
27+
}
28+
return false;
2229
};
2330

2431
bool TaskRunnerChecker::RunsOnTheSameThread(TaskQueueId queue_a,

fml/memory/task_runner_checker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class TaskRunnerChecker final {
2222

2323
private:
2424
TaskQueueId initialized_queue_id_;
25-
TaskQueueId subsumed_queue_id_;
25+
std::set<TaskQueueId> subsumed_queue_ids_;
2626

2727
TaskQueueId InitTaskQueueId();
2828
};

fml/memory/task_runner_checker_unittest.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,14 @@ TEST(TaskRunnerCheckerTests, MergedTaskRunnersRunsOnTheSameThread) {
9494
fml::TaskQueueId qid2 = loop2->GetTaskRunner()->GetTaskQueueId();
9595
const auto raster_thread_merger_ =
9696
fml::MakeRefCounted<fml::RasterThreadMerger>(qid1, qid2);
97-
const int kNumFramesMerged = 5;
97+
const size_t kNumFramesMerged = 5;
9898

9999
raster_thread_merger_->MergeWithLease(kNumFramesMerged);
100100

101101
// merged, running on the same thread
102102
EXPECT_EQ(TaskRunnerChecker::RunsOnTheSameThread(qid1, qid2), true);
103103

104-
for (int i = 0; i < kNumFramesMerged; i++) {
104+
for (size_t i = 0; i < kNumFramesMerged; i++) {
105105
ASSERT_TRUE(raster_thread_merger_->IsMerged());
106106
raster_thread_merger_->DecrementLease();
107107
}
@@ -154,7 +154,7 @@ TEST(TaskRunnerCheckerTests,
154154
});
155155
latch3.Wait();
156156

157-
fml::MessageLoopTaskQueues::GetInstance()->Unmerge(qid1);
157+
fml::MessageLoopTaskQueues::GetInstance()->Unmerge(qid1, qid2);
158158

159159
fml::AutoResetWaitableEvent latch4;
160160
loop2->GetTaskRunner()->PostTask([&]() {

fml/memory/weak_ptr_unittest.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,14 @@ TEST(TaskRunnerAffineWeakPtrTest, ShouldNotCrashIfRunningOnTheSameTaskRunner) {
212212
fml::TaskQueueId qid2 = loop2->GetTaskRunner()->GetTaskQueueId();
213213
const auto raster_thread_merger_ =
214214
fml::MakeRefCounted<fml::RasterThreadMerger>(qid1, qid2);
215-
const int kNumFramesMerged = 5;
215+
const size_t kNumFramesMerged = 5;
216216

217217
raster_thread_merger_->MergeWithLease(kNumFramesMerged);
218218

219219
loop2_task_start_latch.Signal();
220220
loop2_task_finish_latch.Wait();
221221

222-
for (int i = 0; i < kNumFramesMerged; i++) {
222+
for (size_t i = 0; i < kNumFramesMerged; i++) {
223223
ASSERT_TRUE(raster_thread_merger_->IsMerged());
224224
raster_thread_merger_->DecrementLease();
225225
}

fml/message_loop_task_queues.cc

Lines changed: 103 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88

99
#include <iostream>
1010
#include <memory>
11+
#include <optional>
1112

1213
#include "flutter/fml/make_copyable.h"
13-
#include "flutter/fml/message_loop_impl.h"
1414
#include "flutter/fml/task_source.h"
1515
#include "flutter/fml/thread_local.h"
1616

@@ -25,7 +25,7 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;
2525

2626
namespace {
2727

28-
// iOS prior to version 9 prevents c++11 thread_local and __thread specefier,
28+
// iOS prior to version 9 prevents c++11 thread_local and __thread specifier,
2929
// having us resort to boxed enum containers.
3030
class TaskSourceGradeHolder {
3131
public:
@@ -41,9 +41,7 @@ FML_THREAD_LOCAL ThreadLocalUniquePtr<TaskSourceGradeHolder>
4141
tls_task_source_grade;
4242

4343
TaskQueueEntry::TaskQueueEntry(TaskQueueId created_for_arg)
44-
: owner_of(_kUnmerged),
45-
subsumed_by(_kUnmerged),
46-
created_for(created_for_arg) {
44+
: subsumed_by(_kUnmerged), created_for(created_for_arg) {
4745
wakeable = NULL;
4846
task_observers = TaskObservers();
4947
task_source = std::make_unique<TaskSource>(created_for);
@@ -76,20 +74,21 @@ void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
7674
std::lock_guard guard(queue_mutex_);
7775
const auto& queue_entry = queue_entries_.at(queue_id);
7876
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
79-
TaskQueueId subsumed = queue_entry->owner_of;
80-
queue_entries_.erase(queue_id);
81-
if (subsumed != _kUnmerged) {
77+
auto& subsumed_set = queue_entry->owner_of;
78+
for (auto& subsumed : subsumed_set) {
8279
queue_entries_.erase(subsumed);
8380
}
81+
// Erase owner queue_id at last to avoid &subsumed_set from being invalid
82+
queue_entries_.erase(queue_id);
8483
}
8584

8685
void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
8786
std::lock_guard guard(queue_mutex_);
8887
const auto& queue_entry = queue_entries_.at(queue_id);
8988
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
90-
TaskQueueId subsumed = queue_entry->owner_of;
89+
auto& subsumed_set = queue_entry->owner_of;
9190
queue_entry->task_source->ShutDown();
92-
if (subsumed != _kUnmerged) {
91+
for (auto& subsumed : subsumed_set) {
9392
queue_entries_.at(subsumed)->task_source->ShutDown();
9493
}
9594
}
@@ -170,8 +169,8 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
170169
size_t total_tasks = 0;
171170
total_tasks += queue_entry->task_source->GetNumPendingTasks();
172171

173-
TaskQueueId subsumed = queue_entry->owner_of;
174-
if (subsumed != _kUnmerged) {
172+
auto& subsumed_set = queue_entry->owner_of;
173+
for (auto& subsumed : subsumed_set) {
175174
const auto& subsumed_entry = queue_entries_.at(subsumed);
176175
total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
177176
}
@@ -205,8 +204,8 @@ std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
205204
observers.push_back(observer.second);
206205
}
207206

208-
TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
209-
if (subsumed != _kUnmerged) {
207+
auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
208+
for (auto& subsumed : subsumed_set) {
210209
for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
211210
observers.push_back(observer.second);
212211
}
@@ -230,22 +229,41 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
230229
std::lock_guard guard(queue_mutex_);
231230
auto& owner_entry = queue_entries_.at(owner);
232231
auto& subsumed_entry = queue_entries_.at(subsumed);
233-
234-
if (owner_entry->owner_of == subsumed) {
232+
auto& subsumed_set = owner_entry->owner_of;
233+
if (subsumed_set.find(subsumed) != subsumed_set.end()) {
235234
return true;
236235
}
237236

238-
std::vector<TaskQueueId> owner_subsumed_keys = {
239-
owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of,
240-
subsumed_entry->subsumed_by};
237+
// Won't check owner_entry->owner_of, because it may contains items when
238+
// merged with other different queues.
241239

242-
for (auto key : owner_subsumed_keys) {
243-
if (key != _kUnmerged) {
244-
return false;
245-
}
240+
// Ensure owner_entry->subsumed_by being _kUnmerged
241+
if (owner_entry->subsumed_by != _kUnmerged) {
242+
FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
243+
"subsumed by others, owner="
244+
<< owner << ", subsumed=" << subsumed
245+
<< ", owner->subsumed_by=" << owner_entry->subsumed_by;
246+
return false;
246247
}
247-
248-
owner_entry->owner_of = subsumed;
248+
// Ensure subsumed_entry->owner_of being empty
249+
if (!subsumed_entry->owner_of.empty()) {
250+
FML_LOG(WARNING)
251+
<< "Thread merging failed: subsumed_entry already owns others, owner="
252+
<< owner << ", subsumed=" << subsumed
253+
<< ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
254+
return false;
255+
}
256+
// Ensure subsumed_entry->subsumed_by being _kUnmerged
257+
if (subsumed_entry->subsumed_by != _kUnmerged) {
258+
FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
259+
"subsumed by others, owner="
260+
<< owner << ", subsumed=" << subsumed
261+
<< ", subsumed->subsumed_by="
262+
<< subsumed_entry->subsumed_by;
263+
return false;
264+
}
265+
// All checking is OK, set merged state.
266+
owner_entry->owner_of.insert(subsumed);
249267
subsumed_entry->subsumed_by = owner;
250268

251269
if (HasPendingTasksUnlocked(owner)) {
@@ -255,16 +273,37 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
255273
return true;
256274
}
257275

258-
bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
276+
bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner, TaskQueueId subsumed) {
259277
std::lock_guard guard(queue_mutex_);
260278
const auto& owner_entry = queue_entries_.at(owner);
261-
const TaskQueueId subsumed = owner_entry->owner_of;
262-
if (subsumed == _kUnmerged) {
279+
if (owner_entry->owner_of.empty()) {
280+
FML_LOG(WARNING)
281+
<< "Thread unmerging failed: owner_entry doesn't own anyone, owner="
282+
<< owner << ", subsumed=" << subsumed;
283+
return false;
284+
}
285+
if (owner_entry->subsumed_by != _kUnmerged) {
286+
FML_LOG(WARNING)
287+
<< "Thread unmerging failed: owner_entry was subsumed by others, owner="
288+
<< owner << ", subsumed=" << subsumed
289+
<< ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
290+
return false;
291+
}
292+
if (queue_entries_.at(subsumed)->subsumed_by == _kUnmerged) {
293+
FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
294+
"subsumed by others, owner="
295+
<< owner << ", subsumed=" << subsumed;
296+
return false;
297+
}
298+
if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
299+
FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
300+
"given subsumed queue id, owner="
301+
<< owner << ", subsumed=" << subsumed;
263302
return false;
264303
}
265304

266305
queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
267-
owner_entry->owner_of = _kUnmerged;
306+
owner_entry->owner_of.erase(subsumed);
268307

269308
if (HasPendingTasksUnlocked(owner)) {
270309
WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
@@ -280,11 +319,14 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
280319
bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
281320
TaskQueueId subsumed) const {
282321
std::lock_guard guard(queue_mutex_);
283-
return owner != _kUnmerged && subsumed != _kUnmerged &&
284-
subsumed == queue_entries_.at(owner)->owner_of;
322+
if (owner == _kUnmerged || subsumed == _kUnmerged) {
323+
return false;
324+
}
325+
auto& subsumed_set = queue_entries_.at(owner)->owner_of;
326+
return subsumed_set.find(subsumed) != subsumed_set.end();
285327
}
286328

287-
TaskQueueId MessageLoopTaskQueues::GetSubsumedTaskQueueId(
329+
std::set<TaskQueueId> MessageLoopTaskQueues::GetSubsumedTaskQueueId(
288330
TaskQueueId owner) const {
289331
std::lock_guard guard(queue_mutex_);
290332
return queue_entries_.at(owner)->owner_of;
@@ -318,13 +360,11 @@ bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
318360
return true;
319361
}
320362

321-
const TaskQueueId subsumed = entry->owner_of;
322-
if (subsumed == _kUnmerged) {
323-
// this is not an owner and queue is empty.
324-
return false;
325-
} else {
326-
return !queue_entries_.at(subsumed)->task_source->IsEmpty();
327-
}
363+
auto& subsumed_set = entry->owner_of;
364+
return std::any_of(
365+
subsumed_set.begin(), subsumed_set.end(), [&](const auto& subsumed) {
366+
return !queue_entries_.at(subsumed)->task_source->IsEmpty();
367+
});
328368
}
329369

330370
fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
@@ -336,32 +376,35 @@ TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
336376
TaskQueueId owner) const {
337377
FML_DCHECK(HasPendingTasksUnlocked(owner));
338378
const auto& entry = queue_entries_.at(owner);
339-
const TaskQueueId subsumed = entry->owner_of;
340-
if (subsumed == _kUnmerged) {
379+
if (entry->owner_of.empty()) {
380+
FML_CHECK(!entry->task_source->IsEmpty());
341381
return entry->task_source->Top();
342382
}
343383

384+
// Use optional for the memory of TopTask object.
385+
std::optional<TaskSource::TopTask> top_task;
386+
387+
std::function<void(const TaskSource*)> top_task_updater =
388+
[&top_task](const TaskSource* source) {
389+
if (source && !source->IsEmpty()) {
390+
TaskSource::TopTask other_task = source->Top();
391+
if (!top_task.has_value() || top_task->task > other_task.task) {
392+
top_task.emplace(other_task);
393+
}
394+
}
395+
};
396+
344397
TaskSource* owner_tasks = entry->task_source.get();
345-
TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();
346-
347-
// we are owning another task queue
348-
const bool subsumed_has_task = !subsumed_tasks->IsEmpty();
349-
const bool owner_has_task = !owner_tasks->IsEmpty();
350-
fml::TaskQueueId top_queue_id = owner;
351-
if (owner_has_task && subsumed_has_task) {
352-
const auto owner_task = owner_tasks->Top();
353-
const auto subsumed_task = subsumed_tasks->Top();
354-
if (owner_task.task > subsumed_task.task) {
355-
top_queue_id = subsumed;
356-
} else {
357-
top_queue_id = owner;
358-
}
359-
} else if (owner_has_task) {
360-
top_queue_id = owner;
361-
} else {
362-
top_queue_id = subsumed;
398+
top_task_updater(owner_tasks);
399+
400+
for (TaskQueueId subsumed : entry->owner_of) {
401+
TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();
402+
top_task_updater(subsumed_tasks);
363403
}
364-
return queue_entries_.at(top_queue_id)->task_source->Top();
404+
// At least one task at the top because PeekNextTaskUnlocked() is called after
405+
// HasPendingTasksUnlocked()
406+
FML_CHECK(top_task.has_value());
407+
return top_task.value();
365408
}
366409

367410
} // namespace fml

0 commit comments

Comments
 (0)