-
Notifications
You must be signed in to change notification settings - Fork 45
fix: refactor coder logger to allow flush without deadlock #375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
eb45adc
cf30d8b
1a33ece
e0f0caf
5243969
2235879
89183b3
e5e74c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import ( | |
"fmt" | ||
"net/url" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"cdr.dev/slog" | ||
|
@@ -27,13 +28,14 @@ var ( | |
minAgentAPIV2 = "v2.9" | ||
) | ||
|
||
// Coder establishes a connection to the Coder instance located at | ||
// coderURL and authenticates using token. It then establishes a | ||
// dRPC connection to the Agent API and begins sending logs. | ||
// If the version of Coder does not support the Agent API, it will | ||
// fall back to using the PatchLogs endpoint. | ||
// The returned function is used to block until all logs are sent. | ||
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) { | ||
// Coder establishes a connection to the Coder instance located at coderURL and | ||
// authenticates using token. It then establishes a dRPC connection to the Agent | ||
// API and begins sending logs. If the version of Coder does not support the | ||
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is | ||
// used to close the logger and to wait at most logSendGracePeriod for logs to | ||
// be sent. Cancelling the context will close the logs immediately without | ||
// waiting for logs to be sent. | ||
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) { | ||
// To troubleshoot issues, we need some way of logging. | ||
metaLogger := slog.Make(sloghuman.Sink(os.Stderr)) | ||
defer metaLogger.Sync() | ||
|
@@ -44,18 +46,20 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), | |
} | ||
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 { | ||
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version)) | ||
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1")) | ||
return sendLogs, flushLogs, nil | ||
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice 👍 I found those previous names hard to reason about. |
||
return logger, closer, nil | ||
} | ||
// Note that ctx passed to initRPC will be inherited by the | ||
// underlying connection, nothing we can do about that here. | ||
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc")) | ||
if err != nil { | ||
// Logged externally | ||
return nil, nil, fmt.Errorf("init coder rpc client: %w", err) | ||
} | ||
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender")) | ||
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version)) | ||
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2")) | ||
return sendLogs, doneFunc, nil | ||
logger, closer = sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2")) | ||
return logger, closer, nil | ||
} | ||
|
||
type coderLogSender interface { | ||
|
@@ -74,13 +78,14 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client { | |
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) { | ||
var c proto.DRPCAgentClient20 | ||
var err error | ||
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout) | ||
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout) | ||
defer retryCancel() | ||
attempts := 0 | ||
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); { | ||
attempts++ | ||
// Maximize compatibility. | ||
c, err = client.ConnectRPC20(ctx) | ||
l.Info(ctx, "Connecting to Coder", slog.F("attempt", attempts), slog.F("error", err)) | ||
mafredri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
l.Debug(ctx, "Failed to connect to Coder", slog.F("error", err), slog.F("attempt", attempts)) | ||
continue | ||
|
@@ -95,65 +100,67 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto | |
|
||
// sendLogsV1 uses the PatchLogs endpoint to send logs. | ||
// This is deprecated, but required for backward compatibility with older versions of Coder. | ||
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) { | ||
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) { | ||
// nolint: staticcheck // required for backwards compatibility | ||
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{}) | ||
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{}) | ||
var mu sync.Mutex | ||
return func(lvl Level, msg string, args ...any) { | ||
log := agentsdk.Log{ | ||
CreatedAt: time.Now(), | ||
Output: fmt.Sprintf(msg, args...), | ||
Level: codersdk.LogLevel(lvl), | ||
} | ||
if err := sendLogs(ctx, log); err != nil { | ||
mu.Lock() | ||
defer mu.Unlock() | ||
if err := sendLog(ctx, log); err != nil { | ||
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err)) | ||
} | ||
}, func() { | ||
if err := flushLogs(ctx); err != nil { | ||
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod) | ||
defer cancel() | ||
if err := flushAndClose(ctx); err != nil { | ||
l.Warn(ctx, "failed to flush logs", slog.Error(err)) | ||
} | ||
} | ||
} | ||
|
||
// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9. | ||
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) { | ||
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) { | ||
sendCtx, sendCancel := context.WithCancel(ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, we want to keep this context alive until we've flushed or hit the timeout in the |
||
done := make(chan struct{}) | ||
uid := uuid.New() | ||
go func() { | ||
defer close(done) | ||
if err := ls.SendLoop(ctx, dest); err != nil { | ||
if err := ls.SendLoop(sendCtx, dest); err != nil { | ||
if !errors.Is(err, context.Canceled) { | ||
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err)) | ||
} | ||
} | ||
|
||
// Wait for up to 10 seconds for logs to finish sending. | ||
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod) | ||
defer sendCancel() | ||
// Try once more to send any pending logs | ||
if err := ls.SendLoop(sendCtx, dest); err != nil { | ||
if !errors.Is(err, context.DeadlineExceeded) { | ||
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err)) | ||
} | ||
} | ||
ls.Flush(uid) | ||
if err := ls.WaitUntilEmpty(sendCtx); err != nil { | ||
if !errors.Is(err, context.DeadlineExceeded) { | ||
l.Warn(ctx, "log sender did not empty", slog.Error(err)) | ||
} | ||
} | ||
}() | ||
|
||
logFunc := func(l Level, msg string, args ...any) { | ||
ls.Enqueue(uid, agentsdk.Log{ | ||
CreatedAt: time.Now(), | ||
Output: fmt.Sprintf(msg, args...), | ||
Level: codersdk.LogLevel(l), | ||
}) | ||
} | ||
var closeOnce sync.Once | ||
return func(l Level, msg string, args ...any) { | ||
ls.Enqueue(uid, agentsdk.Log{ | ||
CreatedAt: time.Now(), | ||
Output: fmt.Sprintf(msg, args...), | ||
Level: codersdk.LogLevel(l), | ||
}) | ||
}, func() { | ||
closeOnce.Do(func() { | ||
// Trigger a flush and wait for logs to be sent. | ||
ls.Flush(uid) | ||
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should we merge There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's doable, but I feel their purpose is clearer as-is. We could actually make sendCtx not inherit function ctx instead but it's also pointless because |
||
defer cancel() | ||
err := ls.WaitUntilEmpty(ctx) | ||
if err != nil { | ||
l.Warn(ctx, "log sender did not empty", slog.Error(err)) | ||
} | ||
|
||
doneFunc := func() { | ||
<-done | ||
} | ||
// Stop the send loop. | ||
sendCancel() | ||
}) | ||
|
||
return logFunc, doneFunc | ||
// Wait for the send loop to finish. | ||
<-done | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.