Skip to content

fix: always flush data to apm before shutting down and rework agent done signal #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions apm-lambda-extension/apmproxy/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,25 @@ func (c *Client) EnqueueAPMData(agentData AgentData) {
c.logger.Warn("Channel full: dropping a subset of agent data")
}
}

// ShouldFlush returns true if the client should flush APM data after processing the event.
func (c *Client) ShouldFlush() bool {
return c.sendStrategy == SyncFlush
}

// ResetFlush resets the client's "agent flushed" state, such that
// subsequent calls to WaitForFlush will block until another request
// is received from the agent indicating it has flushed.
func (c *Client) ResetFlush() {
c.flushMutex.Lock()
defer c.flushMutex.Unlock()
c.flushCh = make(chan struct{})
}

// WaitForFlush returns a channel that is closed when the agent has signalled that
// the Lambda invocation has completed, and there is no more APM data coming.
func (c *Client) WaitForFlush() <-chan struct{} {
c.flushMutex.Lock()
defer c.flushMutex.Unlock()
return c.flushCh
}
19 changes: 18 additions & 1 deletion apm-lambda-extension/apmproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,19 @@ import (
"go.uber.org/zap"
)

// SendStrategy represents the type of sending strategy the extension uses
type SendStrategy string

const (
// Background send strategy allows the extension to send remaining buffered
// agent data on the next function invocation
Background SendStrategy = "background"

// SyncFlush send strategy indicates that the extension will synchronously
// flush remaining buffered agent data when it receives a signal that the
// function is complete
SyncFlush SendStrategy = "syncflush"

defaultDataReceiverTimeout time.Duration = 15 * time.Second
defaultDataForwarderTimeout time.Duration = 3 * time.Second
defaultReceiverAddr = ":8200"
Expand All @@ -40,7 +52,6 @@ const (
type Client struct {
mu sync.Mutex
bufferPool sync.Pool
AgentDoneSignal chan struct{}
DataChannel chan AgentData
client *http.Client
Status Status
Expand All @@ -49,7 +60,11 @@ type Client struct {
ServerSecretToken string
serverURL string
receiver *http.Server
sendStrategy SendStrategy
logger *zap.SugaredLogger

flushMutex sync.Mutex
flushCh chan struct{}
}

func NewClient(opts ...Option) (*Client, error) {
Expand All @@ -69,6 +84,8 @@ func NewClient(opts ...Option) (*Client, error) {
WriteTimeout: defaultDataReceiverTimeout,
MaxHeaderBytes: 1 << 20,
},
sendStrategy: SyncFlush,
flushCh: make(chan struct{}),
}

c.client.Timeout = defaultDataForwarderTimeout
Expand Down
7 changes: 7 additions & 0 deletions apm-lambda-extension/apmproxy/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ func WithReceiverAddress(addr string) Option {
}
}

// WithSendStrategy sets the sendstrategy.
func WithSendStrategy(strategy SendStrategy) Option {
return func(c *Client) {
c.sendStrategy = strategy
}
}

// WithAgentDataBufferSize sets the agent data buffer size.
func WithAgentDataBufferSize(size int) Option {
return func(c *Client) {
Expand Down
31 changes: 24 additions & 7 deletions apm-lambda-extension/apmproxy/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,34 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ
return
}

if len(rawBytes) > 0 {
agentData := AgentData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
}
agentFlushed := r.URL.Query().Get("flushed") == "true"

agentData := AgentData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
}

if len(agentData.Data) != 0 {
c.EnqueueAPMData(agentData)
}

if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" {
c.AgentDoneSignal <- struct{}{}
if agentFlushed {
c.flushMutex.Lock()

select {
case <-c.flushCh:
// the channel is closed.
// the extension received at least a flush request already but the
// data have not been flushed yet.
// We can reuse the closed channel.
default:
// no pending flush requests
// close the channel to signal a flush request has
// been received.
close(c.flushCh)
}

c.flushMutex.Unlock()
}

w.WriteHeader(http.StatusAccepted)
Expand Down
14 changes: 4 additions & 10 deletions apm-lambda-extension/apmproxy/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.AgentDoneSignal = make(chan struct{}, 1)
require.NoError(t, apmClient.StartReceiver())
defer func() {
require.NoError(t, apmClient.Shutdown())
Expand All @@ -203,11 +202,9 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
}()

select {
case <-apmClient.AgentDoneSignal:
<-apmClient.DataChannel
case <-apmClient.WaitForFlush():
case <-time.After(1 * time.Second):
t.Log("Timed out waiting for server to send FuncDone signal")
t.Fail()
t.Fatal("Timed out waiting for server to send flush signal")
}
}

Expand All @@ -227,7 +224,6 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.AgentDoneSignal = make(chan struct{}, 1)
require.NoError(t, apmClient.StartReceiver())
defer func() {
require.NoError(t, apmClient.Shutdown())
Expand Down Expand Up @@ -271,7 +267,6 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.AgentDoneSignal = make(chan struct{}, 1)
require.NoError(t, apmClient.StartReceiver())
defer func() {
require.NoError(t, apmClient.Shutdown())
Expand All @@ -292,9 +287,8 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
}()

select {
case <-apmClient.AgentDoneSignal:
case <-apmClient.WaitForFlush():
case <-time.After(1 * time.Second):
t.Log("Timed out waiting for server to send FuncDone signal")
t.Fail()
t.Fatal("Timed out waiting for server to send flush signal")
}
}
16 changes: 16 additions & 0 deletions apm-lambda-extension/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"time"

"go.elastic.co/ecszap"
Expand Down Expand Up @@ -97,6 +98,10 @@ func New(opts ...configOption) (*App, error) {
apmOpts = append(apmOpts, apmproxy.WithReceiverAddress(fmt.Sprintf(":%s", port)))
}

if strategy, ok := parseStrategy(os.Getenv("ELASTIC_APM_SEND_STRATEGY")); ok {
apmOpts = append(apmOpts, apmproxy.WithSendStrategy(strategy))
}

if bufferSize := os.Getenv("ELASTIC_APM_LAMBDA_AGENT_DATA_BUFFER_SIZE"); bufferSize != "" {
size, err := strconv.Atoi(bufferSize)
if err != nil {
Expand Down Expand Up @@ -132,6 +137,17 @@ func getIntFromEnvIfAvailable(name string) (int, error) {
return value, nil
}

func parseStrategy(value string) (apmproxy.SendStrategy, bool) {
switch strings.ToLower(value) {
case "background":
return apmproxy.Background, true
case "syncflush":
return apmproxy.SyncFlush, true
}

return "", false
}

func buildLogger(level string) (*zap.SugaredLogger, error) {
if level == "" {
level = "info"
Expand Down
19 changes: 14 additions & 5 deletions apm-lambda-extension/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ func (app *App) Run(ctx context.Context) error {
}
}

// Flush all data before shutting down.
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

app.apmClient.FlushAPMData(ctx)
}()

// The previous event id is used to validate the received Lambda metrics
var prevEvent *extension.NextEventResponse
// This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each
Expand Down Expand Up @@ -114,7 +122,7 @@ func (app *App) Run(ctx context.Context) error {
}
app.logger.Debug("Waiting for background data send to end")
backgroundDataSendWg.Wait()
if config.SendStrategy == extension.SyncFlush {
if app.apmClient.ShouldFlush() {
// Flush APM data now that the function invocation has completed
app.apmClient.FlushAPMData(ctx)
}
Expand All @@ -129,6 +137,8 @@ func (app *App) processEvent(
prevEvent *extension.NextEventResponse,
metadataContainer *apmproxy.MetadataContainer,
) (*extension.NextEventResponse, error) {
// Reset flush state for future events.
defer app.apmClient.ResetFlush()

// Invocation context
invocationCtx, invocationCancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -161,8 +171,6 @@ func (app *App) processEvent(
}

// APM Data Processing
app.apmClient.AgentDoneSignal = make(chan struct{})
defer close(app.apmClient.AgentDoneSignal)
backgroundDataSendWg.Add(1)
go func() {
defer backgroundDataSendWg.Done()
Expand Down Expand Up @@ -201,9 +209,10 @@ func (app *App) processEvent(
// 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline.
// This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down.

select {
case <-app.apmClient.AgentDoneSignal:
app.logger.Debug("Received agent done signal")
case <-app.apmClient.WaitForFlush():
app.logger.Debug("APM client has pending flush signals")
case <-runtimeDone:
app.logger.Debug("Received runtimeDone signal")
case <-timer.C:
Expand Down