Skip to content

Commit c151869

Browse files
committed
add buffer mechanism & bugfix
Signed-off-by: wrongerror <[email protected]>
1 parent 81d893b commit c151869

File tree

6 files changed

+187
-62
lines changed

6 files changed

+187
-62
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/OpenFunction/dapr-proxy
33
go 1.18
44

55
require (
6-
github.com/OpenFunction/functions-framework-go v0.0.0-20220925145105-86f7bcc9cf8c
6+
github.com/OpenFunction/functions-framework-go v0.5.0
77
github.com/dapr/components-contrib v1.8.1-rc.1
88
github.com/dapr/dapr v1.8.3
99
github.com/dapr/go-sdk v1.5.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
5757
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
5858
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
5959
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
60-
github.com/OpenFunction/functions-framework-go v0.0.0-20220925145105-86f7bcc9cf8c h1:TFstl4SFFVZpd2+2yhVGjW15em/lV9NJ7NGoM6kaUHQ=
61-
github.com/OpenFunction/functions-framework-go v0.0.0-20220925145105-86f7bcc9cf8c/go.mod h1:ussM725MZuGmAH4PPmwGdHoDxIT4oSx7VK5Wiibg/No=
60+
github.com/OpenFunction/functions-framework-go v0.5.0 h1:s2L4PyazI8EoPzrfW0Es1esCSZS2SFXD6KNXyDKaJKc=
61+
github.com/OpenFunction/functions-framework-go v0.5.0/go.mod h1:+uYjTEYmn2uqIyViZtg9OF+bUNdjbkWNd7jrQWc7iEc=
6262
github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI=
6363
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
6464
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=

main.go

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@ import (
66
"strconv"
77
"time"
88

9-
proxyruntime "github.com/OpenFunction/dapr-proxy/pkg/runtime"
10-
"github.com/OpenFunction/dapr-proxy/pkg/utils"
119
ofctx "github.com/OpenFunction/functions-framework-go/context"
1210
"github.com/OpenFunction/functions-framework-go/framework"
11+
"github.com/cenkalti/backoff/v4"
1312
diag "github.com/dapr/dapr/pkg/diagnostics"
1413
"github.com/dapr/dapr/pkg/modes"
1514
"github.com/dapr/dapr/pkg/runtime"
1615
"github.com/pkg/errors"
1716
"k8s.io/klog/v2"
17+
18+
proxyruntime "github.com/OpenFunction/dapr-proxy/pkg/runtime"
19+
"github.com/OpenFunction/dapr-proxy/pkg/utils"
1820
)
1921

2022
const (
@@ -48,17 +50,20 @@ func main() {
4850
port, _ := strconv.Atoi(funcContext.GetPort())
4951
protocol := utils.GetEnvVar(protocolEnvVar, defaultAppProtocol)
5052
config := &proxyruntime.Config{
51-
Protocol: runtime.Protocol(protocol),
52-
Host: host,
53-
Port: port,
54-
Mode: modes.KubernetesMode,
53+
Protocol: runtime.Protocol(protocol),
54+
Host: host,
55+
Port: port,
56+
Mode: modes.KubernetesMode,
57+
MaxBufferSize: 1000000,
5558
}
5659

5760
FuncRuntime = proxyruntime.NewFuncRuntime(config, funcContext)
5861
if err := FuncRuntime.CreateFuncChannel(); err != nil {
5962
klog.Exit(err)
6063
}
6164

65+
go FuncRuntime.ProcessEvents()
66+
6267
if err := fwk.Register(ctx, EventHandler); err != nil {
6368
klog.Exit(err)
6469
}
@@ -75,35 +80,37 @@ func EventHandler(ctx ofctx.Context, in []byte) (ofctx.Out, error) {
7580
klog.V(4).Infof("Input: %s - Event Forwarding Elapsed: %vms", ctx.GetInputName(), elapsed)
7681
}()
7782

78-
// Forwarding BindingEvent
83+
c := ctx.GetNativeContext()
84+
85+
// Handle BindingEvent
7986
bindingEvent := ctx.GetBindingEvent()
8087
if bindingEvent != nil {
81-
data, err := FuncRuntime.OnBindingEvent(ctx, bindingEvent)
82-
if err != nil {
83-
klog.Error(err)
84-
return ctx.ReturnOnInternalError(), err
85-
} else {
86-
out := new(ofctx.FunctionOut)
87-
out.WithData(data)
88-
out.WithCode(ofctx.Success)
89-
return out, nil
90-
}
88+
FuncRuntime.EnqueueBindingEvent(&c, bindingEvent)
9189
}
9290

93-
// Forwarding TopicEvent
91+
// Handle TopicEvent
9492
topicEvent := ctx.GetTopicEvent()
9593
if topicEvent != nil {
96-
err := FuncRuntime.OnTopicEvent(ctx, topicEvent)
97-
if err != nil {
98-
klog.Error(err)
99-
return ctx.ReturnOnInternalError(), err
100-
} else {
101-
out := new(ofctx.FunctionOut)
102-
out.WithCode(ofctx.Success)
103-
return out, nil
104-
}
94+
FuncRuntime.EnqueueTopicEvent(&c, topicEvent)
10595
}
10696

107-
err := errors.New("Only Binding and Pubsub events are supported")
108-
return ctx.ReturnOnInternalError(), err
97+
var resp *proxyruntime.EventResponse
98+
err := backoff.Retry(func() error {
99+
resp = FuncRuntime.GetEventResponse(&c)
100+
if resp == nil {
101+
return errors.New("Failed to get event response")
102+
}
103+
return nil
104+
}, utils.NewExponentialBackOff())
105+
106+
if err != nil {
107+
e := errors.New("Processing event timeout")
108+
klog.Error(e)
109+
return ctx.ReturnOnInternalError(), e
110+
} else {
111+
out := new(ofctx.FunctionOut)
112+
out.WithData(resp.Data)
113+
out.WithCode(ofctx.Success)
114+
return out, nil
115+
}
109116
}

pkg/grpc/grpc.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"sync"
1111
"time"
1212

13-
"github.com/OpenFunction/dapr-proxy/pkg/lb"
1413
"github.com/dapr/dapr/pkg/channel"
1514
diag "github.com/dapr/dapr/pkg/diagnostics"
1615
"github.com/dapr/dapr/pkg/modes"
@@ -20,6 +19,8 @@ import (
2019
"google.golang.org/grpc/connectivity"
2120
"google.golang.org/grpc/credentials"
2221
"k8s.io/klog/v2"
22+
23+
"github.com/OpenFunction/dapr-proxy/pkg/lb"
2324
)
2425

2526
type endpoint string
@@ -89,7 +90,7 @@ func (g *Manager) StartEndpointsDetection() {
8990
endpoints[ep] = true
9091
}
9192
} else {
92-
klog.Error(err)
93+
klog.V(4).Info(err)
9394
}
9495

9596
oldEndpoints := g.balancer.All()
@@ -107,24 +108,22 @@ func (g *Manager) StartEndpointsDetection() {
107108
klog.Error(err)
108109
} else if state == connectivity.Ready || state == connectivity.Idle {
109110
g.balancer.Add(ep)
110-
} else {
111-
g.balancer.Remove(ep)
112111
}
113112
}
114-
time.Sleep(time.Second)
113+
time.Sleep(200 * time.Millisecond)
115114
}
116115
}()
117116
}
118117

119118
func (g *Manager) GetGRPCConnection() (*grpc.ClientConn, func(), error) {
120119
address, _ := g.balancer.Next(lb.DummyFactor)
121120
if address == nil {
122-
return nil, nil, errors.Errorf("No available endpoints")
121+
return nil, func() {}, errors.Errorf("No available endpoints")
123122
}
124123

125124
conn, teardown, err := g.getGRPCConnection(context.TODO(), address.String(), "", "", true, false, false)
126125
if err != nil {
127-
return nil, nil, errors.Errorf("error establishing connection to app grpc on address %s: %s", address, err)
126+
return nil, teardown, errors.Errorf("error establishing connection to app grpc on address %s: %s", address, err)
128127
}
129128

130129
return conn, func() {

0 commit comments

Comments
 (0)