Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion apm-lambda-extension/apmproxy/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata
c.logger.Debug("Invocation context cancelled, not processing any more agent data")
return nil
case agentData := <-c.DataChannel:
if agentData.Flushed {
c.flushMutex.Lock()
c.flushCount--
c.flushMutex.Unlock()
continue
}
if metadataContainer.Metadata == nil {
metadata, err := ProcessMetadata(agentData)
if err != nil {
Expand All @@ -67,6 +73,12 @@ func (c *Client) FlushAPMData(ctx context.Context) {
for {
select {
case agentData := <-c.DataChannel:
if agentData.Flushed {
c.flushMutex.Lock()
c.flushCount--
c.flushMutex.Unlock()
continue
}
c.logger.Debug("Flush in progress - Processing agent data")
if err := c.PostToApmServer(ctx, agentData); err != nil {
c.logger.Errorf("Error sending to APM server, skipping: %v", err)
Expand Down Expand Up @@ -210,11 +222,25 @@ func (c *Client) ComputeGracePeriod() time.Duration {

// EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send
// to the APM server.
func (c *Client) EnqueueAPMData(agentData AgentData) {
func (c *Client) EnqueueAPMData(agentData AgentData) bool {
select {
case c.DataChannel <- agentData:
c.logger.Debug("Adding agent data to buffer to be sent to apm server")
return true
default:
c.logger.Warn("Channel full: dropping a subset of agent data")
return false
}
}

// ShouldFlush returns true if the client should flush APM data after processing the event.
func (c *Client) ShouldFlush() bool {
return c.sendStrategy == SyncFlush || c.HasPendingFlush()
}

// HasPendingFlush returns true if the client has received a signal to flush the buffered APM data.
func (c *Client) HasPendingFlush() bool {
c.flushMutex.Lock()
defer c.flushMutex.Unlock()
return c.flushCount != 0
}
17 changes: 16 additions & 1 deletion apm-lambda-extension/apmproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,19 @@ import (
"go.uber.org/zap"
)

// SendStrategy represents the type of sending strategy the extension uses
type SendStrategy string

const (
// Background send strategy allows the extension to send remaining buffered
// agent data on the next function invocation
Background SendStrategy = "background"

// SyncFlush send strategy indicates that the extension will synchronously
// flush remaining buffered agent data when it receives a signal that the
// function is complete
SyncFlush SendStrategy = "syncflush"

defaultDataReceiverTimeout time.Duration = 15 * time.Second
defaultDataForwarderTimeout time.Duration = 3 * time.Second
defaultReceiverAddr = ":8200"
Expand All @@ -40,7 +52,6 @@ const (
type Client struct {
mu sync.Mutex
bufferPool sync.Pool
AgentDoneSignal chan struct{}
DataChannel chan AgentData
client *http.Client
Status Status
Expand All @@ -50,7 +61,10 @@ type Client struct {
serverURL string
dataForwarderTimeout time.Duration
receiver *http.Server
sendStrategy SendStrategy
logger *zap.SugaredLogger
flushCount int
flushMutex sync.Mutex
}

func NewClient(opts ...Option) (*Client, error) {
Expand All @@ -71,6 +85,7 @@ func NewClient(opts ...Option) (*Client, error) {
WriteTimeout: defaultDataReceiverTimeout,
MaxHeaderBytes: 1 << 20,
},
sendStrategy: SyncFlush,
}

for _, opt := range opts {
Expand Down
7 changes: 7 additions & 0 deletions apm-lambda-extension/apmproxy/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func WithReceiverAddress(addr string) Option {
}
}

// WithSendStrategy sets the sendstrategy.
func WithSendStrategy(strategy SendStrategy) Option {
return func(c *Client) {
c.sendStrategy = strategy
}
}

// WithAgentDataBufferSize sets the agent data buffer size.
func WithAgentDataBufferSize(size int) Option {
return func(c *Client) {
Expand Down
20 changes: 12 additions & 8 deletions apm-lambda-extension/apmproxy/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
type AgentData struct {
Data []byte
ContentEncoding string
Flushed bool
}

// StartHttpServer starts the server listening for APM agent data.
Expand Down Expand Up @@ -121,17 +122,20 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ
return
}

if len(rawBytes) > 0 {
agentData := AgentData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
}
flushed := r.URL.Query().Get("flushed") == "true"

c.EnqueueAPMData(agentData)
agentData := AgentData{
Data: rawBytes,
ContentEncoding: r.Header.Get("Content-Encoding"),
Flushed: flushed,
}

if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" {
c.AgentDoneSignal <- struct{}{}
enqueued := c.EnqueueAPMData(agentData)

if enqueued && flushed {
c.flushMutex.Lock()
c.flushCount++
c.flushMutex.Unlock()
}

w.WriteHeader(http.StatusAccepted)
Expand Down
18 changes: 2 additions & 16 deletions apm-lambda-extension/apmproxy/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.AgentDoneSignal = make(chan struct{}, 1)
require.NoError(t, apmClient.StartReceiver())
defer func() {
require.NoError(t, apmClient.Shutdown())
Expand All @@ -202,13 +201,7 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
require.NoError(t, err)
}()

select {
case <-apmClient.AgentDoneSignal:
<-apmClient.DataChannel
case <-time.After(1 * time.Second):
t.Log("Timed out waiting for server to send FuncDone signal")
t.Fail()
}
require.Eventually(t, apmClient.HasPendingFlush, 1*time.Second, 50*time.Millisecond)
}

func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
Expand All @@ -227,7 +220,6 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.AgentDoneSignal = make(chan struct{}, 1)
require.NoError(t, apmClient.StartReceiver())
defer func() {
require.NoError(t, apmClient.Shutdown())
Expand Down Expand Up @@ -271,7 +263,6 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.AgentDoneSignal = make(chan struct{}, 1)
require.NoError(t, apmClient.StartReceiver())
defer func() {
require.NoError(t, apmClient.Shutdown())
Expand All @@ -291,10 +282,5 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
require.NoError(t, err)
}()

select {
case <-apmClient.AgentDoneSignal:
case <-time.After(1 * time.Second):
t.Log("Timed out waiting for server to send FuncDone signal")
t.Fail()
}
require.Eventually(t, apmClient.HasPendingFlush, 1*time.Second, 50*time.Millisecond)
}
16 changes: 16 additions & 0 deletions apm-lambda-extension/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"time"

"go.elastic.co/ecszap"
Expand Down Expand Up @@ -97,6 +98,10 @@ func New(opts ...configOption) (*App, error) {
apmOpts = append(apmOpts, apmproxy.WithReceiverAddress(fmt.Sprintf(":%s", port)))
}

if strategy, ok := parseStrategy(os.Getenv("ELASTIC_APM_SEND_STRATEGY")); ok {
apmOpts = append(apmOpts, apmproxy.WithSendStrategy(strategy))
}

if bufferSize := os.Getenv("ELASTIC_APM_LAMBDA_AGENT_DATA_BUFFER_SIZE"); bufferSize != "" {
size, err := strconv.Atoi(bufferSize)
if err != nil {
Expand Down Expand Up @@ -132,6 +137,17 @@ func getIntFromEnvIfAvailable(name string) (int, error) {
return value, nil
}

func parseStrategy(value string) (apmproxy.SendStrategy, bool) {
switch strings.ToLower(value) {
case "background":
return apmproxy.Background, true
case "syncflush":
return apmproxy.SyncFlush, true
}

return "", false
}

func buildLogger(level string) (*zap.SugaredLogger, error) {
if level == "" {
level = "info"
Expand Down
19 changes: 14 additions & 5 deletions apm-lambda-extension/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ func (app *App) Run(ctx context.Context) error {
}
}

// Flush all data before shutting down.
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

app.apmClient.FlushAPMData(ctx)
}()

// The previous event id is used to validate the received Lambda metrics
var prevEvent *extension.NextEventResponse
// This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each
Expand Down Expand Up @@ -114,7 +122,7 @@ func (app *App) Run(ctx context.Context) error {
}
app.logger.Debug("Waiting for background data send to end")
backgroundDataSendWg.Wait()
if config.SendStrategy == extension.SyncFlush {
if app.apmClient.ShouldFlush() {
// Flush APM data now that the function invocation has completed
app.apmClient.FlushAPMData(ctx)
}
Expand Down Expand Up @@ -161,8 +169,6 @@ func (app *App) processEvent(
}

// APM Data Processing
app.apmClient.AgentDoneSignal = make(chan struct{})
defer close(app.apmClient.AgentDoneSignal)
backgroundDataSendWg.Add(1)
go func() {
defer backgroundDataSendWg.Done()
Expand Down Expand Up @@ -201,9 +207,12 @@ func (app *App) processEvent(
// 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline.
// This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down.

if app.apmClient.HasPendingFlush() {
return event, nil
}

select {
case <-app.apmClient.AgentDoneSignal:
app.logger.Debug("Received agent done signal")
case <-runtimeDone:
app.logger.Debug("Received runtimeDone signal")
case <-timer.C:
Expand Down