This repository was archived by the owner on Apr 11, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathevent.go
103 lines (86 loc) · 2.43 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main
import (
"fmt"
"github.com/Kong/go-pdk"
"time"
)
// Incoming data for a new event.
// TODO: add some relevant data to reduce number of callbacks.
type StartEventData struct {
InstanceId int // Instance ID to start the event
EventName string // event name (not handler method name)
// ....
}
type eventData struct {
id int // event id
instance *instanceData // plugin instance
ipc chan interface{} // communication channel (TODO: use decoded structs)
pdk *pdk.PDK // go-pdk instance
}
// HandleEvent starts the call/{callback/response}*/finish cycle.
// More than one event can be run concurrenty for a single plugin instance,
// they all receive the same object instance, so should be careful if it's
// mutated or holds references to mutable data.
//
// RPC exported method
func (s *PluginServer) HandleEvent(in StartEventData, out *StepData) error {
s.lock.RLock()
instance, ok := s.instances[in.InstanceId]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No plugin instance %d", in.InstanceId)
}
h, ok := instance.handlers[in.EventName]
if !ok {
return fmt.Errorf("undefined method %s on plugin %s",
in.EventName, instance.plugin.name)
}
ipc := make(chan interface{})
event := eventData{
instance: instance,
ipc: ipc,
pdk: pdk.Init(ipc),
}
s.lock.Lock()
event.id = s.nextEventId
s.nextEventId++
s.events[event.id] = &event
s.lock.Unlock()
//log.Printf("Will launch goroutine for key %d / operation %s\n", key, op)
go func() {
_ = <-ipc
h(event.pdk)
func() {
defer func() { recover() }()
ipc <- "ret"
}()
s.lock.Lock()
defer s.lock.Unlock()
event.instance.lastEvent = time.Now()
delete(s.events, event.id)
}()
ipc <- "run" // kickstart the handler
*out = StepData{EventId: event.id, Data: <-ipc}
return nil
}
// A callback's response/request.
type StepData struct {
EventId int // event cycle to which this belongs
Data interface{} // carried data
}
// Step carries a callback's anser back from Kong to the plugin,
// the return value is either a new callback request or a finish signal.
//
// RPC exported method
func (s *PluginServer) Step(in StepData, out *StepData) error {
s.lock.RLock()
event, ok := s.events[in.EventId]
s.lock.RUnlock()
if !ok {
return fmt.Errorf("No running event %d", in.EventId)
}
event.ipc <- in.Data
outStr := <-event.ipc
*out = StepData{EventId: in.EventId, Data: outStr}
return nil
}