diff --git a/application.go b/application.go index faed1a4f..16b1f753 100644 --- a/application.go +++ b/application.go @@ -220,6 +220,13 @@ func (a *Application) Run() error { return glfw.GetProcAddress(procName) } + eventLoop := newEventLoop( + glfw.PostEmptyEvent, // Wakeup GLFW + a.engine.RunTask, // Flush tasks + ) + a.engine.TaskRunnerRunOnCurrentThread = eventLoop.RunOnCurrentThread + a.engine.TaskRunnerPostTask = eventLoop.PostTask + a.engine.PlatfromMessage = messenger.handlePlatformMessage texturer := newRegistry(a.engine, a.window) @@ -290,8 +297,9 @@ func (a *Application) Run() error { defer a.engine.Shutdown() for !a.window.ShouldClose() { - glfw.WaitEventsTimeout(0.016) // timeout to get 60fps-ish iterations - embedder.FlutterEngineFlushPendingTasksNow() + eventLoop.WaitForEvents(func(duration float64) { + glfw.WaitEventsTimeout(duration) + }) defaultPlatformPlugin.glfwTasker.ExecuteTasks() messenger.engineTasker.ExecuteTasks() } diff --git a/embedder/embedder.go b/embedder/embedder.go index c4e15fec..a249c8df 100644 --- a/embedder/embedder.go +++ b/embedder/embedder.go @@ -63,6 +63,9 @@ type FlutterOpenGLTexture struct { Format uint32 } +// FlutterTask is a type alias to C.FlutterTask +type FlutterTask = C.FlutterTask + // FlutterEngine corresponds to the C.FlutterEngine with his associated callback's method. type FlutterEngine struct { // Flutter Engine. @@ -83,6 +86,10 @@ type FlutterEngine struct { GLProcResolver func(procName string) unsafe.Pointer GLExternalTextureFrameCallback func(textureID int64, width int, height int) *FlutterOpenGLTexture + // task runner interop + TaskRunnerRunOnCurrentThread func() bool + TaskRunnerPostTask func(trask FlutterTask, targetTimeNanos uint64) + // platform message callback function PlatfromMessage func(message *PlatformMessage) @@ -310,6 +317,12 @@ func (flu *FlutterEngine) SendPlatformMessageResponse( return (Result)(res) } +// RunTask inform the engine to run the specified task. +func (flu *FlutterEngine) RunTask(task *FlutterTask) Result { + res := C.FlutterEngineRunTask(flu.Engine, task) + return (Result)(res) +} + // RegisterExternalTexture registers an external texture with a unique identifier. func (flu *FlutterEngine) RegisterExternalTexture(textureID int64) Result { flu.sync.Lock() @@ -344,10 +357,8 @@ func (flu *FlutterEngine) MarkExternalTextureFrameAvailable(textureID int64) Res return (Result)(res) } -// FlutterEngineFlushPendingTasksNow flush tasks on a message loop not -// controlled by the Flutter engine. -// -// deprecated soon. -func FlutterEngineFlushPendingTasksNow() { - C.__FlutterEngineFlushPendingTasksNow() -} +// Get the current time in nanoseconds from the clock used by the flutter +// engine. +func FlutterEngineGetCurrentTime() uint64 { + return uint64(C.FlutterEngineGetCurrentTime()) +} \ No newline at end of file diff --git a/embedder/embedder_helper.c b/embedder/embedder_helper.c index c177fa15..763af3b3 100644 --- a/embedder/embedder_helper.c +++ b/embedder/embedder_helper.c @@ -17,6 +17,10 @@ bool proxy_gl_external_texture_frame_callback(void *user_data, size_t height, FlutterOpenGLTexture *texture); +bool proxy_runs_task_on_current_thread_callback(void *user_data); +void proxy_post_task_callback(FlutterTask task, uint64_t target_time_nanos, + void *user_data); + // C helper FlutterEngineResult runFlutter(void *user_data, FlutterEngine *engine, FlutterProjectArgs *Args, @@ -38,6 +42,19 @@ FlutterEngineResult runFlutter(void *user_data, FlutterEngine *engine, Args->command_line_argv = vmArgs; Args->platform_message_callback = proxy_platform_message_callback; + // Configure task runner interop + FlutterTaskRunnerDescription platform_task_runner = {}; + platform_task_runner.struct_size = sizeof(FlutterTaskRunnerDescription); + platform_task_runner.user_data = user_data; + platform_task_runner.runs_task_on_current_thread_callback = + proxy_runs_task_on_current_thread_callback; + platform_task_runner.post_task_callback = proxy_post_task_callback; + + FlutterCustomTaskRunners custom_task_runners = {}; + custom_task_runners.struct_size = sizeof(FlutterCustomTaskRunners); + custom_task_runners.platform_task_runner = &platform_task_runner; + Args->custom_task_runners = &custom_task_runners; + return FlutterEngineRun(FLUTTER_ENGINE_VERSION, &config, Args, user_data, engine); } diff --git a/embedder/embedder_proxy.go b/embedder/embedder_proxy.go index 53dd8c9b..2da4f5ab 100644 --- a/embedder/embedder_proxy.go +++ b/embedder/embedder_proxy.go @@ -77,3 +77,17 @@ func proxy_gl_external_texture_frame_callback(userData unsafe.Pointer, texture.format = C.uint32_t(embedderGLTexture.Format) return C.bool(true) } + +//export proxy_runs_task_on_current_thread_callback +func proxy_runs_task_on_current_thread_callback(userData unsafe.Pointer) C.bool { + flutterEnginePointer := *(*uintptr)(userData) + flutterEngine := (*FlutterEngine)(unsafe.Pointer(flutterEnginePointer)) + return C.bool(flutterEngine.TaskRunnerRunOnCurrentThread()) +} + +//export proxy_post_task_callback +func proxy_post_task_callback(task C.FlutterTask, targetTimeNanos C.uint64_t, userData unsafe.Pointer) { + flutterEnginePointer := *(*uintptr)(userData) + flutterEngine := (*FlutterEngine)(unsafe.Pointer(flutterEnginePointer)) + flutterEngine.TaskRunnerPostTask(task, uint64(targetTimeNanos)) +} \ No newline at end of file diff --git a/event-loop.go b/event-loop.go new file mode 100644 index 00000000..f6cd2f15 --- /dev/null +++ b/event-loop.go @@ -0,0 +1,120 @@ +package flutter + +import ( + "container/heap" + "fmt" + "math" + "runtime" + "time" + + "github.com/go-flutter-desktop/go-flutter/embedder" + "github.com/go-flutter-desktop/go-flutter/internal/priorityqueue" +) + +// EventLoop is a event loop for the main thread that allows for delayed task +// execution.() +type EventLoop struct { + // store the task (event) by their priorities + priorityqueue *priorityqueue.PriorityQueue + // called when a task has been received, used to Wakeup the rendering event loop + postEmptyEvent func() + + onExpiredTask func(*embedder.FlutterTask) embedder.Result + + // timeout for non-Rendering events that needs to be processed in a polling manner + maxWaitDuration time.Duration +} + +func newEventLoop(postEmptyEvent func(), onExpiredTask func(*embedder.FlutterTask) embedder.Result) *EventLoop { + pq := priorityqueue.NewPriorityQueue() + heap.Init(pq) + return &EventLoop{ + priorityqueue: pq, + postEmptyEvent: postEmptyEvent, + onExpiredTask: onExpiredTask, + + // 25 Millisecond is arbitrary value, not too high (adds too much delay to + // platform messages) and not too low (heavy CPU consumption). + // This value isn't related to FPS, as rendering events are process in a + // waiting manner. + maxWaitDuration: time.Duration(25) * time.Millisecond, + } +} + +// RunOnCurrentThread returns if the current thread is the thread used +// by the GLFW event loop. +func (t *EventLoop) RunOnCurrentThread() bool { + // We call runtime.LockOSThread, this ensure the current the + // goroutine will always execute in that thread. + return true +} + +// PostTask posts a Flutter engine tasks to the event loop for delayed execution. +func (t *EventLoop) PostTask(task embedder.FlutterTask, targetTimeNanos uint64) { + runtime.LockOSThread() + + taskDuration := time.Duration(targetTimeNanos) * time.Nanosecond + engineDuration := time.Duration(embedder.FlutterEngineGetCurrentTime()) + + t.priorityqueue.Lock() + item := &priorityqueue.Item{ + Value: task, + FireTime: time.Now().Add(taskDuration - engineDuration), + } + heap.Push(t.priorityqueue, item) + t.priorityqueue.Unlock() + + t.postEmptyEvent() +} + +// WaitForEvents waits for an any Rendering or pending Flutter Engine events +// and returns when either is encountered. +// Expired engine events are processed +func (t *EventLoop) WaitForEvents(rendererWaitEvents func(float64)) { + now := time.Now() + + expiredTasks := make([]*priorityqueue.Item, 0) + var top *priorityqueue.Item + + t.priorityqueue.Lock() + for t.priorityqueue.Len() > 0 { + + // Remove the item from the delayed tasks queue. + top = heap.Pop(t.priorityqueue).(*priorityqueue.Item) + + // If this task (and all tasks after this) has not yet expired, there is + // nothing more to do. Quit iterating. + if top.FireTime.After(now) { + heap.Push(t.priorityqueue, top) // push the item back into the queue + break + } + + // Make a record of the expired task. Do NOT service the task here + // because we are still holding onto the task queue mutex. We don't want + // other threads to block on posting tasks onto this thread till we are + // done processing expired tasks. + expiredTasks = append(expiredTasks, top) + + } + t.priorityqueue.Unlock() + + // Fire expired tasks. + for _, item := range expiredTasks { + task := item.Value.(embedder.FlutterTask) + if t.onExpiredTask(&task) != embedder.ResultSuccess { + fmt.Printf("go-flutter: couldn't process task %v\n", task) + } + } + + // Sleep till the next task needs to be processed. If a new task comes + // along, the rendererWaitEvents will be resolved early because PostTask + // posts an empty event. + if t.priorityqueue.Len() == 0 { + rendererWaitEvents(t.maxWaitDuration.Seconds()) + } else { + if top.FireTime.After(now) { + durationWait := math.Min(top.FireTime.Sub(now).Seconds(), t.maxWaitDuration.Seconds()) + rendererWaitEvents(durationWait) + } + } +} diff --git a/internal/priorityqueue/priorityqueue.go b/internal/priorityqueue/priorityqueue.go new file mode 100644 index 00000000..918ffa5a --- /dev/null +++ b/internal/priorityqueue/priorityqueue.go @@ -0,0 +1,60 @@ +package priorityqueue + +import ( + "sync" + "time" +) + +// An Item is something we manage in a priority queue. +type Item struct { + Value interface{} // The value of the item; arbitrary. + FireTime time.Time // The priority of the item in the queue. + + // The index is needed by update and is maintained by the heap.Interface methods. + index int // The index of the item in the heap. +} + +// A PriorityQueue implements heap.Interface and holds Items. +type PriorityQueue struct { + queue []*Item + sync.Mutex +} + +// NewPriorityQueue create a new PriorityQueue +func NewPriorityQueue() *PriorityQueue { + pq := &PriorityQueue{} + pq.queue = make([]*Item, 0) + return pq +} + +func (pq *PriorityQueue) Len() int { return len(pq.queue) } + +func (pq *PriorityQueue) Less(i, j int) bool { + // We want Pop to give us the lowest, not highest, priority so we use lower + // than here. + return pq.queue[i].FireTime.Before(pq.queue[j].FireTime) +} + +func (pq *PriorityQueue) Swap(i, j int) { + pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i] + pq.queue[i].index = i + pq.queue[j].index = j +} + +// Push add a new priority/value pair in the queue. 0 priority = max. +func (pq *PriorityQueue) Push(x interface{}) { + n := len(pq.queue) + item := x.(*Item) + item.index = n + pq.queue = append(pq.queue, item) +} + +// Pop Remove and return the highest item (lowest priority) +func (pq *PriorityQueue) Pop() interface{} { + old := pq.queue + n := len(old) + item := old[n-1] + item.index = -1 // for safety + pq.queue = old[0 : n-1] + return item +}