Skip to content

Commit f7b8240

Browse files
committed
Feature/event loop interop (#218)
1 parent 782d85a commit f7b8240

File tree

6 files changed

+238
-8
lines changed

6 files changed

+238
-8
lines changed

application.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,13 @@ func (a *Application) Run() error {
220220
return glfw.GetProcAddress(procName)
221221
}
222222

223+
eventLoop := newEventLoop(
224+
glfw.PostEmptyEvent, // Wakeup GLFW
225+
a.engine.RunTask, // Flush tasks
226+
)
227+
a.engine.TaskRunnerRunOnCurrentThread = eventLoop.RunOnCurrentThread
228+
a.engine.TaskRunnerPostTask = eventLoop.PostTask
229+
223230
a.engine.PlatfromMessage = messenger.handlePlatformMessage
224231

225232
texturer := newRegistry(a.engine, a.window)
@@ -290,8 +297,9 @@ func (a *Application) Run() error {
290297
defer a.engine.Shutdown()
291298

292299
for !a.window.ShouldClose() {
293-
glfw.WaitEventsTimeout(0.016) // timeout to get 60fps-ish iterations
294-
embedder.FlutterEngineFlushPendingTasksNow()
300+
eventLoop.WaitForEvents(func(duration float64) {
301+
glfw.WaitEventsTimeout(duration)
302+
})
295303
defaultPlatformPlugin.glfwTasker.ExecuteTasks()
296304
messenger.engineTasker.ExecuteTasks()
297305
}

embedder/embedder.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ type FlutterOpenGLTexture struct {
6363
Format uint32
6464
}
6565

66+
// FlutterTask is a type alias to C.FlutterTask
67+
type FlutterTask = C.FlutterTask
68+
6669
// FlutterEngine corresponds to the C.FlutterEngine with his associated callback's method.
6770
type FlutterEngine struct {
6871
// Flutter Engine.
@@ -83,6 +86,10 @@ type FlutterEngine struct {
8386
GLProcResolver func(procName string) unsafe.Pointer
8487
GLExternalTextureFrameCallback func(textureID int64, width int, height int) *FlutterOpenGLTexture
8588

89+
// task runner interop
90+
TaskRunnerRunOnCurrentThread func() bool
91+
TaskRunnerPostTask func(trask FlutterTask, targetTimeNanos uint64)
92+
8693
// platform message callback function
8794
PlatfromMessage func(message *PlatformMessage)
8895

@@ -310,6 +317,12 @@ func (flu *FlutterEngine) SendPlatformMessageResponse(
310317
return (Result)(res)
311318
}
312319

320+
// RunTask inform the engine to run the specified task.
321+
func (flu *FlutterEngine) RunTask(task *FlutterTask) Result {
322+
res := C.FlutterEngineRunTask(flu.Engine, task)
323+
return (Result)(res)
324+
}
325+
313326
// RegisterExternalTexture registers an external texture with a unique identifier.
314327
func (flu *FlutterEngine) RegisterExternalTexture(textureID int64) Result {
315328
flu.sync.Lock()
@@ -344,10 +357,8 @@ func (flu *FlutterEngine) MarkExternalTextureFrameAvailable(textureID int64) Res
344357
return (Result)(res)
345358
}
346359

347-
// FlutterEngineFlushPendingTasksNow flush tasks on a message loop not
348-
// controlled by the Flutter engine.
349-
//
350-
// deprecated soon.
351-
func FlutterEngineFlushPendingTasksNow() {
352-
C.__FlutterEngineFlushPendingTasksNow()
360+
// FlutterEngineGetCurrentTime gets the current time in nanoseconds from the
361+
// clock used by the flutter engine.
362+
func FlutterEngineGetCurrentTime() uint64 {
363+
return uint64(C.FlutterEngineGetCurrentTime())
353364
}

embedder/embedder_helper.c

+17
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ bool proxy_gl_external_texture_frame_callback(void *user_data,
1717
size_t height,
1818
FlutterOpenGLTexture *texture);
1919

20+
bool proxy_runs_task_on_current_thread_callback(void *user_data);
21+
void proxy_post_task_callback(FlutterTask task, uint64_t target_time_nanos,
22+
void *user_data);
23+
2024
// C helper
2125
FlutterEngineResult runFlutter(void *user_data, FlutterEngine *engine,
2226
FlutterProjectArgs *Args,
@@ -38,6 +42,19 @@ FlutterEngineResult runFlutter(void *user_data, FlutterEngine *engine,
3842
Args->command_line_argv = vmArgs;
3943
Args->platform_message_callback = proxy_platform_message_callback;
4044

45+
// Configure task runner interop
46+
FlutterTaskRunnerDescription platform_task_runner = {};
47+
platform_task_runner.struct_size = sizeof(FlutterTaskRunnerDescription);
48+
platform_task_runner.user_data = user_data;
49+
platform_task_runner.runs_task_on_current_thread_callback =
50+
proxy_runs_task_on_current_thread_callback;
51+
platform_task_runner.post_task_callback = proxy_post_task_callback;
52+
53+
FlutterCustomTaskRunners custom_task_runners = {};
54+
custom_task_runners.struct_size = sizeof(FlutterCustomTaskRunners);
55+
custom_task_runners.platform_task_runner = &platform_task_runner;
56+
Args->custom_task_runners = &custom_task_runners;
57+
4158
return FlutterEngineRun(FLUTTER_ENGINE_VERSION, &config, Args, user_data,
4259
engine);
4360
}

embedder/embedder_proxy.go

+14
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,17 @@ func proxy_gl_external_texture_frame_callback(userData unsafe.Pointer,
7777
texture.format = C.uint32_t(embedderGLTexture.Format)
7878
return C.bool(true)
7979
}
80+
81+
//export proxy_runs_task_on_current_thread_callback
82+
func proxy_runs_task_on_current_thread_callback(userData unsafe.Pointer) C.bool {
83+
flutterEnginePointer := *(*uintptr)(userData)
84+
flutterEngine := (*FlutterEngine)(unsafe.Pointer(flutterEnginePointer))
85+
return C.bool(flutterEngine.TaskRunnerRunOnCurrentThread())
86+
}
87+
88+
//export proxy_post_task_callback
89+
func proxy_post_task_callback(task C.FlutterTask, targetTimeNanos C.uint64_t, userData unsafe.Pointer) {
90+
flutterEnginePointer := *(*uintptr)(userData)
91+
flutterEngine := (*FlutterEngine)(unsafe.Pointer(flutterEnginePointer))
92+
flutterEngine.TaskRunnerPostTask(task, uint64(targetTimeNanos))
93+
}

event-loop.go

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package flutter
2+
3+
import (
4+
"container/heap"
5+
"fmt"
6+
"math"
7+
"runtime"
8+
"time"
9+
10+
"github.com/go-flutter-desktop/go-flutter/embedder"
11+
"github.com/go-flutter-desktop/go-flutter/internal/priorityqueue"
12+
)
13+
14+
// EventLoop is a event loop for the main thread that allows for delayed task
15+
// execution.()
16+
type EventLoop struct {
17+
// store the task (event) by their priorities
18+
priorityqueue *priorityqueue.PriorityQueue
19+
// called when a task has been received, used to Wakeup the rendering event loop
20+
postEmptyEvent func()
21+
22+
onExpiredTask func(*embedder.FlutterTask) embedder.Result
23+
24+
// timeout for non-Rendering events that needs to be processed in a polling manner
25+
maxWaitDuration time.Duration
26+
}
27+
28+
func newEventLoop(postEmptyEvent func(), onExpiredTask func(*embedder.FlutterTask) embedder.Result) *EventLoop {
29+
pq := priorityqueue.NewPriorityQueue()
30+
heap.Init(pq)
31+
return &EventLoop{
32+
priorityqueue: pq,
33+
postEmptyEvent: postEmptyEvent,
34+
onExpiredTask: onExpiredTask,
35+
36+
// 25 Millisecond is arbitrary value, not too high (adds too much delay to
37+
// platform messages) and not too low (heavy CPU consumption).
38+
// This value isn't related to FPS, as rendering events are process in a
39+
// waiting manner.
40+
maxWaitDuration: time.Duration(25) * time.Millisecond,
41+
}
42+
}
43+
44+
// RunOnCurrentThread returns if the current thread is the thread used
45+
// by the GLFW event loop.
46+
func (t *EventLoop) RunOnCurrentThread() bool {
47+
// We call runtime.LockOSThread, this ensure the current the
48+
// goroutine will always execute in that thread.
49+
return true
50+
}
51+
52+
// PostTask posts a Flutter engine tasks to the event loop for delayed execution.
53+
func (t *EventLoop) PostTask(task embedder.FlutterTask, targetTimeNanos uint64) {
54+
runtime.LockOSThread()
55+
56+
taskDuration := time.Duration(targetTimeNanos) * time.Nanosecond
57+
engineDuration := time.Duration(embedder.FlutterEngineGetCurrentTime())
58+
59+
t.priorityqueue.Lock()
60+
item := &priorityqueue.Item{
61+
Value: task,
62+
FireTime: time.Now().Add(taskDuration - engineDuration),
63+
}
64+
heap.Push(t.priorityqueue, item)
65+
t.priorityqueue.Unlock()
66+
67+
t.postEmptyEvent()
68+
}
69+
70+
// WaitForEvents waits for an any Rendering or pending Flutter Engine events
71+
// and returns when either is encountered.
72+
// Expired engine events are processed
73+
func (t *EventLoop) WaitForEvents(rendererWaitEvents func(float64)) {
74+
now := time.Now()
75+
76+
expiredTasks := make([]*priorityqueue.Item, 0)
77+
var top *priorityqueue.Item
78+
79+
t.priorityqueue.Lock()
80+
for t.priorityqueue.Len() > 0 {
81+
82+
// Remove the item from the delayed tasks queue.
83+
top = heap.Pop(t.priorityqueue).(*priorityqueue.Item)
84+
85+
// If this task (and all tasks after this) has not yet expired, there is
86+
// nothing more to do. Quit iterating.
87+
if top.FireTime.After(now) {
88+
heap.Push(t.priorityqueue, top) // push the item back into the queue
89+
break
90+
}
91+
92+
// Make a record of the expired task. Do NOT service the task here
93+
// because we are still holding onto the task queue mutex. We don't want
94+
// other threads to block on posting tasks onto this thread till we are
95+
// done processing expired tasks.
96+
expiredTasks = append(expiredTasks, top)
97+
98+
}
99+
t.priorityqueue.Unlock()
100+
101+
// Fire expired tasks.
102+
for _, item := range expiredTasks {
103+
task := item.Value.(embedder.FlutterTask)
104+
if t.onExpiredTask(&task) != embedder.ResultSuccess {
105+
fmt.Printf("go-flutter: couldn't process task %v\n", task)
106+
}
107+
}
108+
109+
// Sleep till the next task needs to be processed. If a new task comes
110+
// along, the rendererWaitEvents will be resolved early because PostTask
111+
// posts an empty event.
112+
if t.priorityqueue.Len() == 0 {
113+
rendererWaitEvents(t.maxWaitDuration.Seconds())
114+
} else {
115+
if top.FireTime.After(now) {
116+
durationWait := math.Min(top.FireTime.Sub(now).Seconds(), t.maxWaitDuration.Seconds())
117+
rendererWaitEvents(durationWait)
118+
}
119+
}
120+
}
+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package priorityqueue
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
// An Item is something we manage in a priority queue.
9+
type Item struct {
10+
Value interface{} // The value of the item; arbitrary.
11+
FireTime time.Time // The priority of the item in the queue.
12+
13+
// The index is needed by update and is maintained by the heap.Interface methods.
14+
index int // The index of the item in the heap.
15+
}
16+
17+
// A PriorityQueue implements heap.Interface and holds Items.
18+
type PriorityQueue struct {
19+
queue []*Item
20+
sync.Mutex
21+
}
22+
23+
// NewPriorityQueue create a new PriorityQueue
24+
func NewPriorityQueue() *PriorityQueue {
25+
pq := &PriorityQueue{}
26+
pq.queue = make([]*Item, 0)
27+
return pq
28+
}
29+
30+
func (pq *PriorityQueue) Len() int { return len(pq.queue) }
31+
32+
func (pq *PriorityQueue) Less(i, j int) bool {
33+
// We want Pop to give us the lowest, not highest, priority so we use lower
34+
// than here.
35+
return pq.queue[i].FireTime.Before(pq.queue[j].FireTime)
36+
}
37+
38+
func (pq *PriorityQueue) Swap(i, j int) {
39+
pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
40+
pq.queue[i].index = i
41+
pq.queue[j].index = j
42+
}
43+
44+
// Push add a new priority/value pair in the queue. 0 priority = max.
45+
func (pq *PriorityQueue) Push(x interface{}) {
46+
n := len(pq.queue)
47+
item := x.(*Item)
48+
item.index = n
49+
pq.queue = append(pq.queue, item)
50+
}
51+
52+
// Pop Remove and return the highest item (lowest priority)
53+
func (pq *PriorityQueue) Pop() interface{} {
54+
old := pq.queue
55+
n := len(old)
56+
item := old[n-1]
57+
item.index = -1 // for safety
58+
pq.queue = old[0 : n-1]
59+
return item
60+
}

0 commit comments

Comments
 (0)