Skip to content

Feature/event loop interop #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions application.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,13 @@ func (a *Application) Run() error {
return glfw.GetProcAddress(procName)
}

eventLoop := newEventLoop(
glfw.PostEmptyEvent, // Wakeup GLFW
a.engine.RunTask, // Flush tasks
)
a.engine.TaskRunnerRunOnCurrentThread = eventLoop.RunOnCurrentThread
a.engine.TaskRunnerPostTask = eventLoop.PostTask

a.engine.PlatfromMessage = messenger.handlePlatformMessage

texturer := newRegistry(a.engine, a.window)
Expand Down Expand Up @@ -290,8 +297,9 @@ func (a *Application) Run() error {
defer a.engine.Shutdown()

for !a.window.ShouldClose() {
glfw.WaitEventsTimeout(0.016) // timeout to get 60fps-ish iterations
embedder.FlutterEngineFlushPendingTasksNow()
eventLoop.WaitForEvents(func(duration float64) {
glfw.WaitEventsTimeout(duration)
})
defaultPlatformPlugin.glfwTasker.ExecuteTasks()
messenger.engineTasker.ExecuteTasks()
}
Expand Down
25 changes: 18 additions & 7 deletions embedder/embedder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type FlutterOpenGLTexture struct {
Format uint32
}

// FlutterTask is a type alias to C.FlutterTask
type FlutterTask = C.FlutterTask

// FlutterEngine corresponds to the C.FlutterEngine with his associated callback's method.
type FlutterEngine struct {
// Flutter Engine.
Expand All @@ -83,6 +86,10 @@ type FlutterEngine struct {
GLProcResolver func(procName string) unsafe.Pointer
GLExternalTextureFrameCallback func(textureID int64, width int, height int) *FlutterOpenGLTexture

// task runner interop
TaskRunnerRunOnCurrentThread func() bool
TaskRunnerPostTask func(trask FlutterTask, targetTimeNanos uint64)

// platform message callback function
PlatfromMessage func(message *PlatformMessage)

Expand Down Expand Up @@ -310,6 +317,12 @@ func (flu *FlutterEngine) SendPlatformMessageResponse(
return (Result)(res)
}

// RunTask inform the engine to run the specified task.
func (flu *FlutterEngine) RunTask(task *FlutterTask) Result {
res := C.FlutterEngineRunTask(flu.Engine, task)
return (Result)(res)
}

// RegisterExternalTexture registers an external texture with a unique identifier.
func (flu *FlutterEngine) RegisterExternalTexture(textureID int64) Result {
flu.sync.Lock()
Expand Down Expand Up @@ -344,10 +357,8 @@ func (flu *FlutterEngine) MarkExternalTextureFrameAvailable(textureID int64) Res
return (Result)(res)
}

// FlutterEngineFlushPendingTasksNow flush tasks on a message loop not
// controlled by the Flutter engine.
//
// deprecated soon.
func FlutterEngineFlushPendingTasksNow() {
C.__FlutterEngineFlushPendingTasksNow()
}
// Get the current time in nanoseconds from the clock used by the flutter
// engine.
func FlutterEngineGetCurrentTime() uint64 {
return uint64(C.FlutterEngineGetCurrentTime())
}
17 changes: 17 additions & 0 deletions embedder/embedder_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ bool proxy_gl_external_texture_frame_callback(void *user_data,
size_t height,
FlutterOpenGLTexture *texture);

bool proxy_runs_task_on_current_thread_callback(void *user_data);
void proxy_post_task_callback(FlutterTask task, uint64_t target_time_nanos,
void *user_data);

// C helper
FlutterEngineResult runFlutter(void *user_data, FlutterEngine *engine,
FlutterProjectArgs *Args,
Expand All @@ -38,6 +42,19 @@ FlutterEngineResult runFlutter(void *user_data, FlutterEngine *engine,
Args->command_line_argv = vmArgs;
Args->platform_message_callback = proxy_platform_message_callback;

// Configure task runner interop
FlutterTaskRunnerDescription platform_task_runner = {};
platform_task_runner.struct_size = sizeof(FlutterTaskRunnerDescription);
platform_task_runner.user_data = user_data;
platform_task_runner.runs_task_on_current_thread_callback =
proxy_runs_task_on_current_thread_callback;
platform_task_runner.post_task_callback = proxy_post_task_callback;

FlutterCustomTaskRunners custom_task_runners = {};
custom_task_runners.struct_size = sizeof(FlutterCustomTaskRunners);
custom_task_runners.platform_task_runner = &platform_task_runner;
Args->custom_task_runners = &custom_task_runners;

return FlutterEngineRun(FLUTTER_ENGINE_VERSION, &config, Args, user_data,
engine);
}
Expand Down
14 changes: 14 additions & 0 deletions embedder/embedder_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,17 @@ func proxy_gl_external_texture_frame_callback(userData unsafe.Pointer,
texture.format = C.uint32_t(embedderGLTexture.Format)
return C.bool(true)
}

//export proxy_runs_task_on_current_thread_callback
func proxy_runs_task_on_current_thread_callback(userData unsafe.Pointer) C.bool {
flutterEnginePointer := *(*uintptr)(userData)
flutterEngine := (*FlutterEngine)(unsafe.Pointer(flutterEnginePointer))
return C.bool(flutterEngine.TaskRunnerRunOnCurrentThread())
}

//export proxy_post_task_callback
func proxy_post_task_callback(task C.FlutterTask, targetTimeNanos C.uint64_t, userData unsafe.Pointer) {
flutterEnginePointer := *(*uintptr)(userData)
flutterEngine := (*FlutterEngine)(unsafe.Pointer(flutterEnginePointer))
flutterEngine.TaskRunnerPostTask(task, uint64(targetTimeNanos))
}
120 changes: 120 additions & 0 deletions event-loop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package flutter

import (
"container/heap"
"fmt"
"math"
"runtime"
"time"

"github.com/go-flutter-desktop/go-flutter/embedder"
"github.com/go-flutter-desktop/go-flutter/internal/priorityqueue"
)

// EventLoop is a event loop for the main thread that allows for delayed task
// execution.()
type EventLoop struct {
// store the task (event) by their priorities
priorityqueue *priorityqueue.PriorityQueue
// called when a task has been received, used to Wakeup the rendering event loop
postEmptyEvent func()

onExpiredTask func(*embedder.FlutterTask) embedder.Result

// timeout for non-Rendering events that needs to be processed in a polling manner
maxWaitDuration time.Duration
}

func newEventLoop(postEmptyEvent func(), onExpiredTask func(*embedder.FlutterTask) embedder.Result) *EventLoop {
pq := priorityqueue.NewPriorityQueue()
heap.Init(pq)
return &EventLoop{
priorityqueue: pq,
postEmptyEvent: postEmptyEvent,
onExpiredTask: onExpiredTask,

// 25 Millisecond is arbitrary value, not too high (adds too much delay to
// platform messages) and not too low (heavy CPU consumption).
// This value isn't related to FPS, as rendering events are process in a
// waiting manner.
maxWaitDuration: time.Duration(25) * time.Millisecond,
}
}

// RunOnCurrentThread returns if the current thread is the thread used
// by the GLFW event loop.
func (t *EventLoop) RunOnCurrentThread() bool {
// We call runtime.LockOSThread, this ensure the current the
// goroutine will always execute in that thread.
return true
}

// PostTask posts a Flutter engine tasks to the event loop for delayed execution.
func (t *EventLoop) PostTask(task embedder.FlutterTask, targetTimeNanos uint64) {
runtime.LockOSThread()

taskDuration := time.Duration(targetTimeNanos) * time.Nanosecond
engineDuration := time.Duration(embedder.FlutterEngineGetCurrentTime())

t.priorityqueue.Lock()
item := &priorityqueue.Item{
Value: task,
FireTime: time.Now().Add(taskDuration - engineDuration),
}
heap.Push(t.priorityqueue, item)
t.priorityqueue.Unlock()

t.postEmptyEvent()
}

// WaitForEvents waits for an any Rendering or pending Flutter Engine events
// and returns when either is encountered.
// Expired engine events are processed
func (t *EventLoop) WaitForEvents(rendererWaitEvents func(float64)) {
now := time.Now()

expiredTasks := make([]*priorityqueue.Item, 0)
var top *priorityqueue.Item

t.priorityqueue.Lock()
for t.priorityqueue.Len() > 0 {

// Remove the item from the delayed tasks queue.
top = heap.Pop(t.priorityqueue).(*priorityqueue.Item)

// If this task (and all tasks after this) has not yet expired, there is
// nothing more to do. Quit iterating.
if top.FireTime.After(now) {
heap.Push(t.priorityqueue, top) // push the item back into the queue
break
}

// Make a record of the expired task. Do NOT service the task here
// because we are still holding onto the task queue mutex. We don't want
// other threads to block on posting tasks onto this thread till we are
// done processing expired tasks.
expiredTasks = append(expiredTasks, top)

}
t.priorityqueue.Unlock()

// Fire expired tasks.
for _, item := range expiredTasks {
task := item.Value.(embedder.FlutterTask)
if t.onExpiredTask(&task) != embedder.ResultSuccess {
fmt.Printf("go-flutter: couldn't process task %v\n", task)
}
}

// Sleep till the next task needs to be processed. If a new task comes
// along, the rendererWaitEvents will be resolved early because PostTask
// posts an empty event.
if t.priorityqueue.Len() == 0 {
rendererWaitEvents(t.maxWaitDuration.Seconds())
} else {
if top.FireTime.After(now) {
durationWait := math.Min(top.FireTime.Sub(now).Seconds(), t.maxWaitDuration.Seconds())
rendererWaitEvents(durationWait)
}
}
}
60 changes: 60 additions & 0 deletions internal/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package priorityqueue

import (
"sync"
"time"
)

// An Item is something we manage in a priority queue.
type Item struct {
Value interface{} // The value of the item; arbitrary.
FireTime time.Time // The priority of the item in the queue.

// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}

// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue struct {
queue []*Item
sync.Mutex
}

// NewPriorityQueue create a new PriorityQueue
func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{}
pq.queue = make([]*Item, 0)
return pq
}

func (pq *PriorityQueue) Len() int { return len(pq.queue) }

func (pq *PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the lowest, not highest, priority so we use lower
// than here.
return pq.queue[i].FireTime.Before(pq.queue[j].FireTime)
}

func (pq *PriorityQueue) Swap(i, j int) {
pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
pq.queue[i].index = i
pq.queue[j].index = j
}

// Push add a new priority/value pair in the queue. 0 priority = max.
func (pq *PriorityQueue) Push(x interface{}) {
n := len(pq.queue)
item := x.(*Item)
item.index = n
pq.queue = append(pq.queue, item)
}

// Pop Remove and return the highest item (lowest priority)
func (pq *PriorityQueue) Pop() interface{} {
old := pq.queue
n := len(old)
item := old[n-1]
item.index = -1 // for safety
pq.queue = old[0 : n-1]
return item
}