From 7d5dc7154cc198b489e09b539d5d5ae2e7c280d3 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Mon, 1 Aug 2022 04:40:16 +0200 Subject: [PATCH 01/10] fix: always flush data to apm before shutting down and rework agent done signal Add a defer statement to make sure that we always flush data to the apm server before shutting down. Remove agent done signal channel and avoid leaking implementation details. The channel was being recreated and closed on each event, racing with the intake handler that was sending to the channel. The channel is now used internally by the apm client and external packages can call 'Done()' to check whether the agent has sent the final intake request. See https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing --- apm-lambda-extension/apmproxy/apmserver.go | 15 +++++++++++++++ apm-lambda-extension/apmproxy/client.go | 17 ++++++++++++++++- apm-lambda-extension/apmproxy/option.go | 7 +++++++ apm-lambda-extension/apmproxy/receiver.go | 2 +- apm-lambda-extension/apmproxy/receiver_test.go | 7 ++----- apm-lambda-extension/app/app.go | 16 ++++++++++++++++ apm-lambda-extension/app/run.go | 14 ++++++++++---- 7 files changed, 67 insertions(+), 11 deletions(-) diff --git a/apm-lambda-extension/apmproxy/apmserver.go b/apm-lambda-extension/apmproxy/apmserver.go index 6b394a05..51003ace 100644 --- a/apm-lambda-extension/apmproxy/apmserver.go +++ b/apm-lambda-extension/apmproxy/apmserver.go @@ -212,3 +212,18 @@ func (c *Client) EnqueueAPMData(agentData AgentData) { extension.Log.Warn("Channel full: dropping a subset of agent data") } } + +// ShouldFlush returns true if the client should flush APM data after processing the event. +func (c *Client) ShouldFlush() bool { + select { + case <-c.done: + return true + default: + return c.sendStrategy == SyncFlush + } +} + +// Done returns a channel that's closed when work done on behalf of the agent is completed. +func (c *Client) Done() <-chan struct{} { + return c.done +} diff --git a/apm-lambda-extension/apmproxy/client.go b/apm-lambda-extension/apmproxy/client.go index d9309354..fb03fea4 100644 --- a/apm-lambda-extension/apmproxy/client.go +++ b/apm-lambda-extension/apmproxy/client.go @@ -26,7 +26,19 @@ import ( "time" ) +// SendStrategy represents the type of sending strategy the extension uses +type SendStrategy string + const ( + // Background send strategy allows the extension to send remaining buffered + // agent data on the next function invocation + Background SendStrategy = "background" + + // SyncFlush send strategy indicates that the extension will synchronously + // flush remaining buffered agent data when it receives a signal that the + // function is complete + SyncFlush SendStrategy = "syncflush" + defaultDataReceiverTimeout time.Duration = 15 * time.Second defaultDataForwarderTimeout time.Duration = 3 * time.Second defaultReceiverAddr = ":8200" @@ -36,7 +48,6 @@ const ( type Client struct { mu sync.Mutex bufferPool sync.Pool - AgentDoneSignal chan struct{} DataChannel chan AgentData client *http.Client Status Status @@ -46,6 +57,8 @@ type Client struct { serverURL string dataForwarderTimeout time.Duration receiver *http.Server + sendStrategy SendStrategy + done chan struct{} } func NewClient(opts ...Option) (*Client, error) { @@ -66,6 +79,8 @@ func NewClient(opts ...Option) (*Client, error) { WriteTimeout: defaultDataReceiverTimeout, MaxHeaderBytes: 1 << 20, }, + sendStrategy: SyncFlush, + done: make(chan struct{}), } for _, opt := range opts { diff --git a/apm-lambda-extension/apmproxy/option.go b/apm-lambda-extension/apmproxy/option.go index 08ef47ed..32eeec6c 100644 --- a/apm-lambda-extension/apmproxy/option.go +++ b/apm-lambda-extension/apmproxy/option.go @@ -65,3 +65,10 @@ func WithReceiverAddress(addr string) Option { c.receiver.Addr = addr } } + +// WithSendStrategy sets the sendstrategy. +func WithSendStrategy(strategy SendStrategy) Option { + return func(c *Client) { + c.sendStrategy = strategy + } +} diff --git a/apm-lambda-extension/apmproxy/receiver.go b/apm-lambda-extension/apmproxy/receiver.go index 60fb1faa..33a073ac 100644 --- a/apm-lambda-extension/apmproxy/receiver.go +++ b/apm-lambda-extension/apmproxy/receiver.go @@ -127,7 +127,7 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ } if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" { - c.AgentDoneSignal <- struct{}{} + close(c.done) } w.WriteHeader(http.StatusAccepted) diff --git a/apm-lambda-extension/apmproxy/receiver_test.go b/apm-lambda-extension/apmproxy/receiver_test.go index 793c0df5..3fe51a46 100644 --- a/apm-lambda-extension/apmproxy/receiver_test.go +++ b/apm-lambda-extension/apmproxy/receiver_test.go @@ -177,7 +177,6 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { apmproxy.WithReceiverTimeout(15*time.Second), ) require.NoError(t, err) - apmClient.AgentDoneSignal = make(chan struct{}, 1) require.NoError(t, apmClient.StartReceiver()) defer func() { require.NoError(t, apmClient.Shutdown()) @@ -198,7 +197,7 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { }() select { - case <-apmClient.AgentDoneSignal: + case <-apmClient.Done(): <-apmClient.DataChannel case <-time.After(1 * time.Second): t.Log("Timed out waiting for server to send FuncDone signal") @@ -221,7 +220,6 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { apmproxy.WithReceiverTimeout(15*time.Second), ) require.NoError(t, err) - apmClient.AgentDoneSignal = make(chan struct{}, 1) require.NoError(t, apmClient.StartReceiver()) defer func() { require.NoError(t, apmClient.Shutdown()) @@ -264,7 +262,6 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { apmproxy.WithReceiverTimeout(15*time.Second), ) require.NoError(t, err) - apmClient.AgentDoneSignal = make(chan struct{}, 1) require.NoError(t, apmClient.StartReceiver()) defer func() { require.NoError(t, apmClient.Shutdown()) @@ -285,7 +282,7 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { }() select { - case <-apmClient.AgentDoneSignal: + case <-apmClient.Done(): case <-time.After(1 * time.Second): t.Log("Timed out waiting for server to send FuncDone signal") t.Fail() diff --git a/apm-lambda-extension/app/app.go b/apm-lambda-extension/app/app.go index d1cf74d4..9c1e1e85 100644 --- a/apm-lambda-extension/app/app.go +++ b/apm-lambda-extension/app/app.go @@ -24,6 +24,7 @@ import ( "fmt" "os" "strconv" + "strings" "time" ) @@ -84,6 +85,10 @@ func New(opts ...configOption) (*App, error) { apmOpts = append(apmOpts, apmproxy.WithReceiverAddress(fmt.Sprintf(":%s", port))) } + if strategy, ok := parseStrategy(os.Getenv("ELASTIC_APM_SEND_STRATEGY")); ok { + apmOpts = append(apmOpts, apmproxy.WithSendStrategy(strategy)) + } + apmOpts = append(apmOpts, apmproxy.WithURL(os.Getenv("ELASTIC_APM_LAMBDA_APM_SERVER"))) ac, err := apmproxy.NewClient(apmOpts...) @@ -109,3 +114,14 @@ func getIntFromEnvIfAvailable(name string) (int, error) { } return value, nil } + +func parseStrategy(value string) (apmproxy.SendStrategy, bool) { + switch strings.ToLower(value) { + case "background": + return apmproxy.Background, true + case "syncflush": + return apmproxy.SyncFlush, true + } + + return "", false +} diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go index 7c14a331..a0e08d7a 100644 --- a/apm-lambda-extension/app/run.go +++ b/apm-lambda-extension/app/run.go @@ -88,6 +88,14 @@ func (app *App) Run(ctx context.Context) error { } } + // Flush all data before shutting down. + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + app.apmClient.FlushAPMData(ctx) + }() + // The previous event id is used to validate the received Lambda metrics var prevEvent *extension.NextEventResponse // This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each @@ -115,7 +123,7 @@ func (app *App) Run(ctx context.Context) error { } extension.Log.Debug("Waiting for background data send to end") backgroundDataSendWg.Wait() - if config.SendStrategy == extension.SyncFlush { + if app.apmClient.ShouldFlush() { // Flush APM data now that the function invocation has completed app.apmClient.FlushAPMData(ctx) } @@ -162,8 +170,6 @@ func (app *App) processEvent( } // APM Data Processing - app.apmClient.AgentDoneSignal = make(chan struct{}) - defer close(app.apmClient.AgentDoneSignal) backgroundDataSendWg.Add(1) go func() { defer backgroundDataSendWg.Done() @@ -203,7 +209,7 @@ func (app *App) processEvent( // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline. // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. select { - case <-app.apmClient.AgentDoneSignal: + case <-app.apmClient.Done(): extension.Log.Debug("Received agent done signal") case <-runtimeDone: extension.Log.Debug("Received runtimeDone signal") From 0c358354bbe15bbc2420f56584276561c62e6deb Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Thu, 4 Aug 2022 06:58:55 +0200 Subject: [PATCH 02/10] sigh --- apm-lambda-extension/apmproxy/option.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apm-lambda-extension/apmproxy/option.go b/apm-lambda-extension/apmproxy/option.go index 1364eba5..6238f6b3 100644 --- a/apm-lambda-extension/apmproxy/option.go +++ b/apm-lambda-extension/apmproxy/option.go @@ -70,6 +70,8 @@ func WithReceiverAddress(addr string) Option { func WithSendStrategy(strategy SendStrategy) Option { return func(c *Client) { c.sendStrategy = strategy + } +} // WithAgentDataBufferSize sets the agent data buffer size. func WithAgentDataBufferSize(size int) Option { From 617c2b356045eae94fe1ed25f26617b186890411 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Sat, 13 Aug 2022 00:49:44 +0200 Subject: [PATCH 03/10] fix: avoid panic from closing the channel twice on warm starts During multiple invocations the lambda can reuse the environment if a warm start is taking place. We cannot assume a request with 'flushed=true' will be the last one for the lifetime of the application. Replace the channel with a counter that is increased when we receive a request with 'flushed=true' and it is decreased if we meet such request in the buffered data while sending to the APM server. --- apm-lambda-extension/apmproxy/apmserver.go | 31 +++++++++++++------ apm-lambda-extension/apmproxy/client.go | 4 +-- apm-lambda-extension/apmproxy/receiver.go | 20 +++++++----- .../apmproxy/receiver_test.go | 15 ++------- apm-lambda-extension/app/run.go | 7 +++-- 5 files changed, 42 insertions(+), 35 deletions(-) diff --git a/apm-lambda-extension/apmproxy/apmserver.go b/apm-lambda-extension/apmproxy/apmserver.go index 8bcb67e9..298ab481 100644 --- a/apm-lambda-extension/apmproxy/apmserver.go +++ b/apm-lambda-extension/apmproxy/apmserver.go @@ -43,6 +43,12 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata c.logger.Debug("Invocation context cancelled, not processing any more agent data") return nil case agentData := <-c.DataChannel: + if agentData.Flushed { + c.flushMutex.Lock() + c.flushCount-- + c.flushMutex.Unlock() + continue + } if metadataContainer.Metadata == nil { metadata, err := ProcessMetadata(agentData) if err != nil { @@ -67,6 +73,12 @@ func (c *Client) FlushAPMData(ctx context.Context) { for { select { case agentData := <-c.DataChannel: + if agentData.Flushed { + c.flushMutex.Lock() + c.flushCount-- + c.flushMutex.Unlock() + continue + } c.logger.Debug("Flush in progress - Processing agent data") if err := c.PostToApmServer(ctx, agentData); err != nil { c.logger.Errorf("Error sending to APM server, skipping: %v", err) @@ -210,26 +222,25 @@ func (c *Client) ComputeGracePeriod() time.Duration { // EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send // to the APM server. -func (c *Client) EnqueueAPMData(agentData AgentData) { +func (c *Client) EnqueueAPMData(agentData AgentData) bool { select { case c.DataChannel <- agentData: c.logger.Debug("Adding agent data to buffer to be sent to apm server") + return true default: c.logger.Warn("Channel full: dropping a subset of agent data") + return false } } // ShouldFlush returns true if the client should flush APM data after processing the event. func (c *Client) ShouldFlush() bool { - select { - case <-c.done: - return true - default: - return c.sendStrategy == SyncFlush - } + return c.sendStrategy == SyncFlush || c.HasPendingFlush() } -// Done returns a channel that's closed when work done on behalf of the agent is completed. -func (c *Client) Done() <-chan struct{} { - return c.done +// HasPendingFlush returns true if the client has received a signal to flush the buffered APM data. +func (c *Client) HasPendingFlush() bool { + c.flushMutex.Lock() + defer c.flushMutex.Unlock() + return c.flushCount != 0 } diff --git a/apm-lambda-extension/apmproxy/client.go b/apm-lambda-extension/apmproxy/client.go index 410e8c40..2611add0 100644 --- a/apm-lambda-extension/apmproxy/client.go +++ b/apm-lambda-extension/apmproxy/client.go @@ -62,8 +62,9 @@ type Client struct { dataForwarderTimeout time.Duration receiver *http.Server sendStrategy SendStrategy - done chan struct{} logger *zap.SugaredLogger + flushCount int + flushMutex sync.Mutex } func NewClient(opts ...Option) (*Client, error) { @@ -85,7 +86,6 @@ func NewClient(opts ...Option) (*Client, error) { MaxHeaderBytes: 1 << 20, }, sendStrategy: SyncFlush, - done: make(chan struct{}), } for _, opt := range opts { diff --git a/apm-lambda-extension/apmproxy/receiver.go b/apm-lambda-extension/apmproxy/receiver.go index 7d07ed9f..e2d94a28 100644 --- a/apm-lambda-extension/apmproxy/receiver.go +++ b/apm-lambda-extension/apmproxy/receiver.go @@ -32,6 +32,7 @@ import ( type AgentData struct { Data []byte ContentEncoding string + Flushed bool } // StartHttpServer starts the server listening for APM agent data. @@ -121,17 +122,20 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ return } - if len(rawBytes) > 0 { - agentData := AgentData{ - Data: rawBytes, - ContentEncoding: r.Header.Get("Content-Encoding"), - } + flushed := r.URL.Query().Get("flushed") == "true" - c.EnqueueAPMData(agentData) + agentData := AgentData{ + Data: rawBytes, + ContentEncoding: r.Header.Get("Content-Encoding"), + Flushed: flushed, } - if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" { - close(c.done) + enqueued := c.EnqueueAPMData(agentData) + + if enqueued && flushed { + c.flushMutex.Lock() + c.flushCount++ + c.flushMutex.Unlock() } w.WriteHeader(http.StatusAccepted) diff --git a/apm-lambda-extension/apmproxy/receiver_test.go b/apm-lambda-extension/apmproxy/receiver_test.go index 3523f928..094a6843 100644 --- a/apm-lambda-extension/apmproxy/receiver_test.go +++ b/apm-lambda-extension/apmproxy/receiver_test.go @@ -201,13 +201,7 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { require.NoError(t, err) }() - select { - case <-apmClient.Done(): - <-apmClient.DataChannel - case <-time.After(1 * time.Second): - t.Log("Timed out waiting for server to send FuncDone signal") - t.Fail() - } + require.Eventually(t, apmClient.HasPendingFlush, 1*time.Second, 50*time.Millisecond) } func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { @@ -288,10 +282,5 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { require.NoError(t, err) }() - select { - case <-apmClient.Done(): - case <-time.After(1 * time.Second): - t.Log("Timed out waiting for server to send FuncDone signal") - t.Fail() - } + require.Eventually(t, apmClient.HasPendingFlush, 1*time.Second, 50*time.Millisecond) } diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go index ff708f30..c25bdcbd 100644 --- a/apm-lambda-extension/app/run.go +++ b/apm-lambda-extension/app/run.go @@ -207,9 +207,12 @@ func (app *App) processEvent( // 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline. // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. + + if app.apmClient.HasPendingFlush() { + return event, nil + } + select { - case <-app.apmClient.Done(): - app.logger.Debug("Received agent done signal") case <-runtimeDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: From 82eebf04f12c0f279a56d4a8a0980f5a8232d32a Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Wed, 17 Aug 2022 00:23:53 +0200 Subject: [PATCH 04/10] fix: try to return as soon as possible on flush signal The flush signal is received on a separate goroutine (http handler) so we cannot assume anything on its relationship with the event processed by other goroutines. If we just check once we might miss the signal and hang until the runtimeDone or timeout event is received. To prevent this, create a channel and periodically check the flush counter to minimize latency. --- apm-lambda-extension/apmproxy/apmserver.go | 27 ++++++++++++++++--- .../apmproxy/receiver_test.go | 13 +++++++-- apm-lambda-extension/app/run.go | 6 ++--- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/apm-lambda-extension/apmproxy/apmserver.go b/apm-lambda-extension/apmproxy/apmserver.go index 298ab481..2279e790 100644 --- a/apm-lambda-extension/apmproxy/apmserver.go +++ b/apm-lambda-extension/apmproxy/apmserver.go @@ -235,11 +235,32 @@ func (c *Client) EnqueueAPMData(agentData AgentData) bool { // ShouldFlush returns true if the client should flush APM data after processing the event. func (c *Client) ShouldFlush() bool { - return c.sendStrategy == SyncFlush || c.HasPendingFlush() + return c.sendStrategy == SyncFlush || c.hasPendingFlush() } -// HasPendingFlush returns true if the client has received a signal to flush the buffered APM data. -func (c *Client) HasPendingFlush() bool { +// WaitForFlush returns a channel that is closed if the client has received a signal to flush +// the buffered APM data. +func (c *Client) WaitForFlush(ctx context.Context) <-chan struct{} { + ch := make(chan struct{}) + go func() { + t := time.NewTimer(10 * time.Millisecond) + defer t.Stop() + for { + select { + case <-ctx.Done(): + case <-t.C: + if c.hasPendingFlush() { + close(ch) + return + } + } + } + }() + + return ch +} + +func (c *Client) hasPendingFlush() bool { c.flushMutex.Lock() defer c.flushMutex.Unlock() return c.flushCount != 0 diff --git a/apm-lambda-extension/apmproxy/receiver_test.go b/apm-lambda-extension/apmproxy/receiver_test.go index 094a6843..0d857f20 100644 --- a/apm-lambda-extension/apmproxy/receiver_test.go +++ b/apm-lambda-extension/apmproxy/receiver_test.go @@ -19,6 +19,7 @@ package apmproxy_test import ( "bytes" + "context" "elastic/apm-lambda-extension/apmproxy" "io" "net" @@ -201,7 +202,11 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { require.NoError(t, err) }() - require.Eventually(t, apmClient.HasPendingFlush, 1*time.Second, 50*time.Millisecond) + select { + case <-apmClient.WaitForFlush(context.Background()): + case <-time.After(1 * time.Second): + t.Fatal("Timed out waiting for server to send flush signal") + } } func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { @@ -282,5 +287,9 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { require.NoError(t, err) }() - require.Eventually(t, apmClient.HasPendingFlush, 1*time.Second, 50*time.Millisecond) + select { + case <-apmClient.WaitForFlush(context.Background()): + case <-time.After(1 * time.Second): + t.Fatal("Timed out waiting for server to send flush signal") + } } diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go index c25bdcbd..79f4e38d 100644 --- a/apm-lambda-extension/app/run.go +++ b/apm-lambda-extension/app/run.go @@ -208,11 +208,9 @@ func (app *App) processEvent( // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline. // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. - if app.apmClient.HasPendingFlush() { - return event, nil - } - select { + case <-app.apmClient.WaitForFlush(invocationCtx): + app.logger.Debug("APM client has pending flush signals") case <-runtimeDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: From 5e16d0f022b48d528ce90e3696ed230edbe6982d Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Wed, 17 Aug 2022 14:49:54 +0200 Subject: [PATCH 05/10] refactor: remove busy loop and rely on channels to signal flush requests --- apm-lambda-extension/apmproxy/apmserver.go | 55 ++++++++++--------- apm-lambda-extension/apmproxy/client.go | 2 + apm-lambda-extension/apmproxy/receiver.go | 16 ++++++ .../apmproxy/receiver_test.go | 5 +- apm-lambda-extension/app/run.go | 2 +- 5 files changed, 50 insertions(+), 30 deletions(-) diff --git a/apm-lambda-extension/apmproxy/apmserver.go b/apm-lambda-extension/apmproxy/apmserver.go index 2279e790..db96df2e 100644 --- a/apm-lambda-extension/apmproxy/apmserver.go +++ b/apm-lambda-extension/apmproxy/apmserver.go @@ -44,10 +44,11 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata return nil case agentData := <-c.DataChannel: if agentData.Flushed { - c.flushMutex.Lock() - c.flushCount-- - c.flushMutex.Unlock() - continue + c.updateFlushCount() + + if len(agentData.Data) == 0 { + continue + } } if metadataContainer.Metadata == nil { metadata, err := ProcessMetadata(agentData) @@ -74,10 +75,11 @@ func (c *Client) FlushAPMData(ctx context.Context) { select { case agentData := <-c.DataChannel: if agentData.Flushed { - c.flushMutex.Lock() - c.flushCount-- - c.flushMutex.Unlock() - continue + c.updateFlushCount() + + if len(agentData.Data) == 0 { + continue + } } c.logger.Debug("Flush in progress - Processing agent data") if err := c.PostToApmServer(ctx, agentData); err != nil { @@ -90,6 +92,21 @@ func (c *Client) FlushAPMData(ctx context.Context) { } } +func (c *Client) updateFlushCount() { + c.flushMutex.Lock() + defer c.flushMutex.Unlock() + + // A flush request is beng forwarded. + // Decrement the counter + c.flushCount-- + + // Reset the flush channel if there are no + // more flush requests. + if c.flushCount == 0 { + c.flushCh = make(chan struct{}) + } +} + // PostToApmServer takes a chunk of APM agent data and posts it to the APM server. // // The function compresses the APM agent data, if it's not already compressed. @@ -240,24 +257,10 @@ func (c *Client) ShouldFlush() bool { // WaitForFlush returns a channel that is closed if the client has received a signal to flush // the buffered APM data. -func (c *Client) WaitForFlush(ctx context.Context) <-chan struct{} { - ch := make(chan struct{}) - go func() { - t := time.NewTimer(10 * time.Millisecond) - defer t.Stop() - for { - select { - case <-ctx.Done(): - case <-t.C: - if c.hasPendingFlush() { - close(ch) - return - } - } - } - }() - - return ch +func (c *Client) WaitForFlush() <-chan struct{} { + c.flushMutex.Lock() + defer c.flushMutex.Unlock() + return c.flushCh } func (c *Client) hasPendingFlush() bool { diff --git a/apm-lambda-extension/apmproxy/client.go b/apm-lambda-extension/apmproxy/client.go index 2611add0..6fdae539 100644 --- a/apm-lambda-extension/apmproxy/client.go +++ b/apm-lambda-extension/apmproxy/client.go @@ -64,6 +64,7 @@ type Client struct { sendStrategy SendStrategy logger *zap.SugaredLogger flushCount int + flushCh chan struct{} flushMutex sync.Mutex } @@ -86,6 +87,7 @@ func NewClient(opts ...Option) (*Client, error) { MaxHeaderBytes: 1 << 20, }, sendStrategy: SyncFlush, + flushCh: make(chan struct{}), } for _, opt := range opts { diff --git a/apm-lambda-extension/apmproxy/receiver.go b/apm-lambda-extension/apmproxy/receiver.go index e2d94a28..61e01278 100644 --- a/apm-lambda-extension/apmproxy/receiver.go +++ b/apm-lambda-extension/apmproxy/receiver.go @@ -134,7 +134,23 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ if enqueued && flushed { c.flushMutex.Lock() + + // increment flush request count c.flushCount++ + + select { + case <-c.flushCh: + // the channel is closed. + // the extension received at least a flush request already but the + // data have not been flushed yet. + // We can reuse the closed channel. + default: + // no pending flush requests + // close the channel to signal a flush request has + // been received. + close(c.flushCh) + } + c.flushMutex.Unlock() } diff --git a/apm-lambda-extension/apmproxy/receiver_test.go b/apm-lambda-extension/apmproxy/receiver_test.go index 0d857f20..f4e681ac 100644 --- a/apm-lambda-extension/apmproxy/receiver_test.go +++ b/apm-lambda-extension/apmproxy/receiver_test.go @@ -19,7 +19,6 @@ package apmproxy_test import ( "bytes" - "context" "elastic/apm-lambda-extension/apmproxy" "io" "net" @@ -203,7 +202,7 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { }() select { - case <-apmClient.WaitForFlush(context.Background()): + case <-apmClient.WaitForFlush(): case <-time.After(1 * time.Second): t.Fatal("Timed out waiting for server to send flush signal") } @@ -288,7 +287,7 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { }() select { - case <-apmClient.WaitForFlush(context.Background()): + case <-apmClient.WaitForFlush(): case <-time.After(1 * time.Second): t.Fatal("Timed out waiting for server to send flush signal") } diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go index 79f4e38d..7dc390c0 100644 --- a/apm-lambda-extension/app/run.go +++ b/apm-lambda-extension/app/run.go @@ -209,7 +209,7 @@ func (app *App) processEvent( // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. select { - case <-app.apmClient.WaitForFlush(invocationCtx): + case <-app.apmClient.WaitForFlush(): app.logger.Debug("APM client has pending flush signals") case <-runtimeDone: app.logger.Debug("Received runtimeDone signal") From 5e1f58bdd80a540a9bf33b2c452302756099f383 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Thu, 18 Aug 2022 09:09:45 +0200 Subject: [PATCH 06/10] fix: update behaviour based on flush strategy only synchronously flush on sendstrategy == syncflush. Do not flush just because there are unhandled flushed=true requests. --- apm-lambda-extension/apmproxy/apmserver.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/apm-lambda-extension/apmproxy/apmserver.go b/apm-lambda-extension/apmproxy/apmserver.go index db96df2e..ecc6b73c 100644 --- a/apm-lambda-extension/apmproxy/apmserver.go +++ b/apm-lambda-extension/apmproxy/apmserver.go @@ -252,7 +252,7 @@ func (c *Client) EnqueueAPMData(agentData AgentData) bool { // ShouldFlush returns true if the client should flush APM data after processing the event. func (c *Client) ShouldFlush() bool { - return c.sendStrategy == SyncFlush || c.hasPendingFlush() + return c.sendStrategy == SyncFlush } // WaitForFlush returns a channel that is closed if the client has received a signal to flush @@ -262,9 +262,3 @@ func (c *Client) WaitForFlush() <-chan struct{} { defer c.flushMutex.Unlock() return c.flushCh } - -func (c *Client) hasPendingFlush() bool { - c.flushMutex.Lock() - defer c.flushMutex.Unlock() - return c.flushCount != 0 -} From acbb700ecee9e41332df341e3c466d2bb7db78b8 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Sat, 20 Aug 2022 23:49:29 +0200 Subject: [PATCH 07/10] fix: update flush logic and remove flush count Go back to a less disruptive change. Remove flush count, don't keep track of multiple flushed requests but reset the channel before processing the event. --- apm-lambda-extension/apmproxy/apmserver.go | 44 ++++++++-------------- apm-lambda-extension/apmproxy/client.go | 6 +-- apm-lambda-extension/apmproxy/receiver.go | 5 --- apm-lambda-extension/app/run.go | 1 + 4 files changed, 19 insertions(+), 37 deletions(-) diff --git a/apm-lambda-extension/apmproxy/apmserver.go b/apm-lambda-extension/apmproxy/apmserver.go index ecc6b73c..39ee0aa8 100644 --- a/apm-lambda-extension/apmproxy/apmserver.go +++ b/apm-lambda-extension/apmproxy/apmserver.go @@ -43,12 +43,8 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata c.logger.Debug("Invocation context cancelled, not processing any more agent data") return nil case agentData := <-c.DataChannel: - if agentData.Flushed { - c.updateFlushCount() - - if len(agentData.Data) == 0 { - continue - } + if len(agentData.Data) == 0 { + continue } if metadataContainer.Metadata == nil { metadata, err := ProcessMetadata(agentData) @@ -74,12 +70,8 @@ func (c *Client) FlushAPMData(ctx context.Context) { for { select { case agentData := <-c.DataChannel: - if agentData.Flushed { - c.updateFlushCount() - - if len(agentData.Data) == 0 { - continue - } + if len(agentData.Data) == 0 { + continue } c.logger.Debug("Flush in progress - Processing agent data") if err := c.PostToApmServer(ctx, agentData); err != nil { @@ -92,21 +84,6 @@ func (c *Client) FlushAPMData(ctx context.Context) { } } -func (c *Client) updateFlushCount() { - c.flushMutex.Lock() - defer c.flushMutex.Unlock() - - // A flush request is beng forwarded. - // Decrement the counter - c.flushCount-- - - // Reset the flush channel if there are no - // more flush requests. - if c.flushCount == 0 { - c.flushCh = make(chan struct{}) - } -} - // PostToApmServer takes a chunk of APM agent data and posts it to the APM server. // // The function compresses the APM agent data, if it's not already compressed. @@ -255,8 +232,17 @@ func (c *Client) ShouldFlush() bool { return c.sendStrategy == SyncFlush } -// WaitForFlush returns a channel that is closed if the client has received a signal to flush -// the buffered APM data. +// ResetFlush resets the client's "agent flushed" state, such that +// subsequent calls to WaitForFlush will block until another request +// is received from the agent indicating it has flushed. +func (c *Client) ResetFlush() { + c.flushMutex.Lock() + defer c.flushMutex.Unlock() + c.flushCh = make(chan struct{}) +} + +// WaitForFlush returns a channel that is closed when the agent has signalled that +// the Lambda invocation has completed, and there is no more APM data coming. func (c *Client) WaitForFlush() <-chan struct{} { c.flushMutex.Lock() defer c.flushMutex.Unlock() diff --git a/apm-lambda-extension/apmproxy/client.go b/apm-lambda-extension/apmproxy/client.go index 6fdae539..3f161b52 100644 --- a/apm-lambda-extension/apmproxy/client.go +++ b/apm-lambda-extension/apmproxy/client.go @@ -63,9 +63,9 @@ type Client struct { receiver *http.Server sendStrategy SendStrategy logger *zap.SugaredLogger - flushCount int - flushCh chan struct{} - flushMutex sync.Mutex + + flushMutex sync.Mutex + flushCh chan struct{} } func NewClient(opts ...Option) (*Client, error) { diff --git a/apm-lambda-extension/apmproxy/receiver.go b/apm-lambda-extension/apmproxy/receiver.go index 61e01278..407ea42a 100644 --- a/apm-lambda-extension/apmproxy/receiver.go +++ b/apm-lambda-extension/apmproxy/receiver.go @@ -32,7 +32,6 @@ import ( type AgentData struct { Data []byte ContentEncoding string - Flushed bool } // StartHttpServer starts the server listening for APM agent data. @@ -127,7 +126,6 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ agentData := AgentData{ Data: rawBytes, ContentEncoding: r.Header.Get("Content-Encoding"), - Flushed: flushed, } enqueued := c.EnqueueAPMData(agentData) @@ -135,9 +133,6 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ if enqueued && flushed { c.flushMutex.Lock() - // increment flush request count - c.flushCount++ - select { case <-c.flushCh: // the channel is closed. diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go index 7dc390c0..9175de9f 100644 --- a/apm-lambda-extension/app/run.go +++ b/apm-lambda-extension/app/run.go @@ -111,6 +111,7 @@ func (app *App) Run(ctx context.Context) error { // Use a wait group to ensure the background go routine sending to the APM server // completes before signaling that the extension is ready for the next invocation. var backgroundDataSendWg sync.WaitGroup + app.apmClient.ResetFlush() event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent, &metadataContainer) if err != nil { return err From 987d08a9fe0fe12fddf948af7f70b16150762df6 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Sun, 21 Aug 2022 21:23:27 +0200 Subject: [PATCH 08/10] refactor: move flush reset inside process event --- apm-lambda-extension/app/run.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go index 9175de9f..281d6a4b 100644 --- a/apm-lambda-extension/app/run.go +++ b/apm-lambda-extension/app/run.go @@ -111,7 +111,6 @@ func (app *App) Run(ctx context.Context) error { // Use a wait group to ensure the background go routine sending to the APM server // completes before signaling that the extension is ready for the next invocation. var backgroundDataSendWg sync.WaitGroup - app.apmClient.ResetFlush() event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent, &metadataContainer) if err != nil { return err @@ -212,6 +211,9 @@ func (app *App) processEvent( select { case <-app.apmClient.WaitForFlush(): app.logger.Debug("APM client has pending flush signals") + + // Reset flush state for future events. + app.apmClient.ResetFlush() case <-runtimeDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: From e096ec752bea092fe185e0550e188e83ddc666e6 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Mon, 22 Aug 2022 03:32:13 +0200 Subject: [PATCH 09/10] refactor: revert queue changes Co-authored-by: Andrew Wilkins --- apm-lambda-extension/apmproxy/apmserver.go | 10 +--------- apm-lambda-extension/apmproxy/receiver.go | 8 +++++--- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/apm-lambda-extension/apmproxy/apmserver.go b/apm-lambda-extension/apmproxy/apmserver.go index 39ee0aa8..08a06ec4 100644 --- a/apm-lambda-extension/apmproxy/apmserver.go +++ b/apm-lambda-extension/apmproxy/apmserver.go @@ -43,9 +43,6 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata c.logger.Debug("Invocation context cancelled, not processing any more agent data") return nil case agentData := <-c.DataChannel: - if len(agentData.Data) == 0 { - continue - } if metadataContainer.Metadata == nil { metadata, err := ProcessMetadata(agentData) if err != nil { @@ -70,9 +67,6 @@ func (c *Client) FlushAPMData(ctx context.Context) { for { select { case agentData := <-c.DataChannel: - if len(agentData.Data) == 0 { - continue - } c.logger.Debug("Flush in progress - Processing agent data") if err := c.PostToApmServer(ctx, agentData); err != nil { c.logger.Errorf("Error sending to APM server, skipping: %v", err) @@ -216,14 +210,12 @@ func (c *Client) ComputeGracePeriod() time.Duration { // EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send // to the APM server. -func (c *Client) EnqueueAPMData(agentData AgentData) bool { +func (c *Client) EnqueueAPMData(agentData AgentData) { select { case c.DataChannel <- agentData: c.logger.Debug("Adding agent data to buffer to be sent to apm server") - return true default: c.logger.Warn("Channel full: dropping a subset of agent data") - return false } } diff --git a/apm-lambda-extension/apmproxy/receiver.go b/apm-lambda-extension/apmproxy/receiver.go index 407ea42a..41c6d6c6 100644 --- a/apm-lambda-extension/apmproxy/receiver.go +++ b/apm-lambda-extension/apmproxy/receiver.go @@ -121,16 +121,18 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ return } - flushed := r.URL.Query().Get("flushed") == "true" + agentFlushed := r.URL.Query().Get("flushed") == "true" agentData := AgentData{ Data: rawBytes, ContentEncoding: r.Header.Get("Content-Encoding"), } - enqueued := c.EnqueueAPMData(agentData) + if len(agentData.Data) != 0 { + c.EnqueueAPMData(agentData) + } - if enqueued && flushed { + if agentFlushed { c.flushMutex.Lock() select { From 76974939c82b86397315f75867cd5b9de1b54bdc Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Mon, 22 Aug 2022 03:38:52 +0200 Subject: [PATCH 10/10] fix: move reset flush to a defer call --- apm-lambda-extension/app/run.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go index 281d6a4b..85766f5c 100644 --- a/apm-lambda-extension/app/run.go +++ b/apm-lambda-extension/app/run.go @@ -137,6 +137,8 @@ func (app *App) processEvent( prevEvent *extension.NextEventResponse, metadataContainer *apmproxy.MetadataContainer, ) (*extension.NextEventResponse, error) { + // Reset flush state for future events. + defer app.apmClient.ResetFlush() // Invocation context invocationCtx, invocationCancel := context.WithCancel(ctx) @@ -211,9 +213,6 @@ func (app *App) processEvent( select { case <-app.apmClient.WaitForFlush(): app.logger.Debug("APM client has pending flush signals") - - // Reset flush state for future events. - app.apmClient.ResetFlush() case <-runtimeDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: