Skip to content

Commit e9f46cc

Browse files
committedOct 2, 2024
revert: "fix: refactor coder logger to allow flush without deadlock (#375)" (#376)
(cherry picked from commit a1e8f3c)
·
v1.0.6v1.0.0
1 parent b33c64c commit e9f46cc

File tree

6 files changed

+86
-316
lines changed

6 files changed

+86
-316
lines changed
 

‎cmd/envbuilder/main.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,6 @@ func envbuilderCmd() serpent.Command {
3737
Options: o.CLI(),
3838
Handler: func(inv *serpent.Invocation) error {
3939
o.SetDefaults()
40-
var preExecs []func()
41-
preExec := func() {
42-
for _, fn := range preExecs {
43-
fn()
44-
}
45-
preExecs = nil
46-
}
47-
defer preExec() // Ensure cleanup in case of error.
48-
4940
o.Logger = log.New(os.Stderr, o.Verbose)
5041
if o.CoderAgentURL != "" {
5142
if o.CoderAgentToken == "" {
@@ -58,10 +49,7 @@ func envbuilderCmd() serpent.Command {
5849
coderLog, closeLogs, err := log.Coder(inv.Context(), u, o.CoderAgentToken)
5950
if err == nil {
6051
o.Logger = log.Wrap(o.Logger, coderLog)
61-
preExecs = append(preExecs, func() {
62-
o.Logger(log.LevelInfo, "Closing logs")
63-
closeLogs()
64-
})
52+
defer closeLogs()
6553
// This adds the envbuilder subsystem.
6654
// If telemetry is enabled in a Coder deployment,
6755
// this will be reported and help us understand
@@ -90,7 +78,7 @@ func envbuilderCmd() serpent.Command {
9078
return nil
9179
}
9280

93-
err := envbuilder.Run(inv.Context(), o, preExec)
81+
err := envbuilder.Run(inv.Context(), o)
9482
if err != nil {
9583
o.Logger(log.LevelError, "error: %s", err)
9684
}

‎envbuilder.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,7 @@ type execArgsInfo struct {
8484
// Logger is the logf to use for all operations.
8585
// Filesystem is the filesystem to use for all operations.
8686
// Defaults to the host filesystem.
87-
// preExec are any functions that should be called before exec'ing the init
88-
// command. This is useful for ensuring that defers get run.
89-
func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
87+
func Run(ctx context.Context, opts options.Options) error {
9088
var args execArgsInfo
9189
// Run in a separate function to ensure all defers run before we
9290
// setuid or exec.
@@ -105,9 +103,6 @@ func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
105103
}
106104

107105
opts.Logger(log.LevelInfo, "=== Running the init command %s %+v as the %q user...", opts.InitCommand, args.InitArgs, args.UserInfo.user.Username)
108-
for _, fn := range preExec {
109-
fn()
110-
}
111106

112107
err = syscall.Exec(args.InitCommand, append([]string{args.InitCommand}, args.InitArgs...), args.Environ)
113108
if err != nil {

‎go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ require (
2525
github.com/gliderlabs/ssh v0.3.7
2626
github.com/go-git/go-billy/v5 v5.5.0
2727
github.com/go-git/go-git/v5 v5.12.0
28-
github.com/google/go-cmp v0.6.0
2928
github.com/google/go-containerregistry v0.20.1
3029
github.com/google/uuid v1.6.0
3130
github.com/hashicorp/go-multierror v1.1.1
@@ -150,6 +149,7 @@ require (
150149
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
151150
github.com/golang/protobuf v1.5.4 // indirect
152151
github.com/google/btree v1.1.2 // indirect
152+
github.com/google/go-cmp v0.6.0 // indirect
153153
github.com/google/nftables v0.2.0 // indirect
154154
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect
155155
github.com/gorilla/handlers v1.5.1 // indirect

‎integration/integration_test.go

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
"testing"
2424
"time"
2525

26-
"github.com/coder/coder/v2/codersdk"
27-
"github.com/coder/coder/v2/codersdk/agentsdk"
2826
"github.com/coder/envbuilder"
2927
"github.com/coder/envbuilder/devcontainer/features"
3028
"github.com/coder/envbuilder/internal/magicdir"
@@ -60,71 +58,6 @@ const (
6058
testImageUbuntu = "localhost:5000/envbuilder-test-ubuntu:latest"
6159
)
6260

63-
func TestLogs(t *testing.T) {
64-
t.Parallel()
65-
66-
token := uuid.NewString()
67-
logsDone := make(chan struct{})
68-
69-
logHandler := func(w http.ResponseWriter, r *http.Request) {
70-
switch r.URL.Path {
71-
case "/api/v2/buildinfo":
72-
w.Header().Set("Content-Type", "application/json")
73-
_, _ = w.Write([]byte(`{"version": "v2.8.9"}`))
74-
return
75-
case "/api/v2/workspaceagents/me/logs":
76-
w.WriteHeader(http.StatusOK)
77-
tokHdr := r.Header.Get(codersdk.SessionTokenHeader)
78-
assert.Equal(t, token, tokHdr)
79-
var req agentsdk.PatchLogs
80-
err := json.NewDecoder(r.Body).Decode(&req)
81-
if err != nil {
82-
http.Error(w, err.Error(), http.StatusBadRequest)
83-
return
84-
}
85-
for _, log := range req.Logs {
86-
t.Logf("got log: %+v", log)
87-
if strings.Contains(log.Output, "Closing logs") {
88-
close(logsDone)
89-
return
90-
}
91-
}
92-
return
93-
default:
94-
t.Errorf("unexpected request to %s", r.URL.Path)
95-
w.WriteHeader(http.StatusNotFound)
96-
return
97-
}
98-
}
99-
logSrv := httptest.NewServer(http.HandlerFunc(logHandler))
100-
defer logSrv.Close()
101-
102-
// Ensures that a Git repository with a devcontainer.json is cloned and built.
103-
srv := gittest.CreateGitServer(t, gittest.Options{
104-
Files: map[string]string{
105-
"devcontainer.json": `{
106-
"build": {
107-
"dockerfile": "Dockerfile"
108-
},
109-
}`,
110-
"Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu),
111-
},
112-
})
113-
_, err := runEnvbuilder(t, runOpts{env: []string{
114-
envbuilderEnv("GIT_URL", srv.URL),
115-
"CODER_AGENT_URL=" + logSrv.URL,
116-
"CODER_AGENT_TOKEN=" + token,
117-
}})
118-
require.NoError(t, err)
119-
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
120-
defer cancel()
121-
select {
122-
case <-ctx.Done():
123-
t.Fatal("timed out waiting for logs")
124-
case <-logsDone:
125-
}
126-
}
127-
12861
func TestInitScriptInitCommand(t *testing.T) {
12962
t.Parallel()
13063

‎log/coder.go

Lines changed: 45 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"net/url"
88
"os"
9-
"sync"
109
"time"
1110

1211
"cdr.dev/slog"
@@ -28,14 +27,13 @@ var (
2827
minAgentAPIV2 = "v2.9"
2928
)
3029

31-
// Coder establishes a connection to the Coder instance located at coderURL and
32-
// authenticates using token. It then establishes a dRPC connection to the Agent
33-
// API and begins sending logs. If the version of Coder does not support the
34-
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is
35-
// used to close the logger and to wait at most logSendGracePeriod for logs to
36-
// be sent. Cancelling the context will close the logs immediately without
37-
// waiting for logs to be sent.
38-
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) {
30+
// Coder establishes a connection to the Coder instance located at
31+
// coderURL and authenticates using token. It then establishes a
32+
// dRPC connection to the Agent API and begins sending logs.
33+
// If the version of Coder does not support the Agent API, it will
34+
// fall back to using the PatchLogs endpoint.
35+
// The returned function is used to block until all logs are sent.
36+
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) {
3937
// To troubleshoot issues, we need some way of logging.
4038
metaLogger := slog.Make(sloghuman.Sink(os.Stderr))
4139
defer metaLogger.Sync()
@@ -46,26 +44,18 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, c
4644
}
4745
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 {
4846
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version))
49-
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
50-
return logger, closer, nil
47+
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
48+
return sendLogs, flushLogs, nil
5149
}
52-
// Note that ctx passed to initRPC will be inherited by the
53-
// underlying connection, nothing we can do about that here.
5450
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc"))
5551
if err != nil {
5652
// Logged externally
5753
return nil, nil, fmt.Errorf("init coder rpc client: %w", err)
5854
}
5955
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender"))
6056
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version))
61-
logger, closer = sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
62-
var closeOnce sync.Once
63-
return logger, func() {
64-
closer()
65-
closeOnce.Do(func() {
66-
_ = dac.DRPCConn().Close()
67-
})
68-
}, nil
57+
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
58+
return sendLogs, doneFunc, nil
6959
}
7060

7161
type coderLogSender interface {
@@ -84,7 +74,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
8474
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) {
8575
var c proto.DRPCAgentClient20
8676
var err error
87-
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout)
77+
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout)
8878
defer retryCancel()
8979
attempts := 0
9080
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); {
@@ -105,67 +95,65 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto
10595

10696
// sendLogsV1 uses the PatchLogs endpoint to send logs.
10797
// This is deprecated, but required for backward compatibility with older versions of Coder.
108-
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) {
98+
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) {
10999
// nolint: staticcheck // required for backwards compatibility
110-
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
111-
var mu sync.Mutex
100+
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
112101
return func(lvl Level, msg string, args ...any) {
113102
log := agentsdk.Log{
114103
CreatedAt: time.Now(),
115104
Output: fmt.Sprintf(msg, args...),
116105
Level: codersdk.LogLevel(lvl),
117106
}
118-
mu.Lock()
119-
defer mu.Unlock()
120-
if err := sendLog(ctx, log); err != nil {
107+
if err := sendLogs(ctx, log); err != nil {
121108
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
122109
}
123110
}, func() {
124-
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
125-
defer cancel()
126-
if err := flushAndClose(ctx); err != nil {
111+
if err := flushLogs(ctx); err != nil {
127112
l.Warn(ctx, "failed to flush logs", slog.Error(err))
128113
}
129114
}
130115
}
131116

132117
// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
133-
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) {
134-
sendCtx, sendCancel := context.WithCancel(ctx)
118+
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
135119
done := make(chan struct{})
136120
uid := uuid.New()
137121
go func() {
138122
defer close(done)
139-
if err := ls.SendLoop(sendCtx, dest); err != nil {
123+
if err := ls.SendLoop(ctx, dest); err != nil {
140124
if !errors.Is(err, context.Canceled) {
141125
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
142126
}
143127
}
128+
129+
// Wait for up to 10 seconds for logs to finish sending.
130+
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
131+
defer sendCancel()
132+
// Try once more to send any pending logs
133+
if err := ls.SendLoop(sendCtx, dest); err != nil {
134+
if !errors.Is(err, context.DeadlineExceeded) {
135+
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err))
136+
}
137+
}
138+
ls.Flush(uid)
139+
if err := ls.WaitUntilEmpty(sendCtx); err != nil {
140+
if !errors.Is(err, context.DeadlineExceeded) {
141+
l.Warn(ctx, "log sender did not empty", slog.Error(err))
142+
}
143+
}
144144
}()
145145

146-
var closeOnce sync.Once
147-
return func(l Level, msg string, args ...any) {
148-
ls.Enqueue(uid, agentsdk.Log{
149-
CreatedAt: time.Now(),
150-
Output: fmt.Sprintf(msg, args...),
151-
Level: codersdk.LogLevel(l),
152-
})
153-
}, func() {
154-
closeOnce.Do(func() {
155-
// Trigger a flush and wait for logs to be sent.
156-
ls.Flush(uid)
157-
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
158-
defer cancel()
159-
err := ls.WaitUntilEmpty(ctx)
160-
if err != nil {
161-
l.Warn(ctx, "log sender did not empty", slog.Error(err))
162-
}
146+
logFunc := func(l Level, msg string, args ...any) {
147+
ls.Enqueue(uid, agentsdk.Log{
148+
CreatedAt: time.Now(),
149+
Output: fmt.Sprintf(msg, args...),
150+
Level: codersdk.LogLevel(l),
151+
})
152+
}
163153

164-
// Stop the send loop.
165-
sendCancel()
166-
})
154+
doneFunc := func() {
155+
<-done
156+
}
167157

168-
// Wait for the send loop to finish.
169-
<-done
170-
}
158+
return logFunc, doneFunc
171159
}

‎log/coder_internal_test.go

Lines changed: 37 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"math/rand"
87
"net/http"
98
"net/http/httptest"
109
"net/url"
@@ -39,8 +38,10 @@ func TestCoder(t *testing.T) {
3938
defer closeOnce.Do(func() { close(gotLogs) })
4039
tokHdr := r.Header.Get(codersdk.SessionTokenHeader)
4140
assert.Equal(t, token, tokHdr)
42-
req, ok := decodeV1Logs(t, w, r)
43-
if !ok {
41+
var req agentsdk.PatchLogs
42+
err := json.NewDecoder(r.Body).Decode(&req)
43+
if err != nil {
44+
http.Error(w, err.Error(), http.StatusBadRequest)
4445
return
4546
}
4647
if assert.Len(t, req.Logs, 1) {
@@ -53,44 +54,15 @@ func TestCoder(t *testing.T) {
5354

5455
ctx, cancel := context.WithCancel(context.Background())
5556
defer cancel()
56-
57-
logger, _ := newCoderLogger(ctx, t, srv.URL, token)
58-
logger(LevelInfo, "hello %s", "world")
57+
u, err := url.Parse(srv.URL)
58+
require.NoError(t, err)
59+
log, closeLog, err := Coder(ctx, u, token)
60+
require.NoError(t, err)
61+
defer closeLog()
62+
log(LevelInfo, "hello %s", "world")
5963
<-gotLogs
6064
})
6165

62-
t.Run("V1/Close", func(t *testing.T) {
63-
t.Parallel()
64-
65-
var got []agentsdk.Log
66-
handler := func(w http.ResponseWriter, r *http.Request) {
67-
if r.URL.Path == "/api/v2/buildinfo" {
68-
w.Header().Set("Content-Type", "application/json")
69-
_, _ = w.Write([]byte(`{"version": "v2.8.9"}`))
70-
return
71-
}
72-
req, ok := decodeV1Logs(t, w, r)
73-
if !ok {
74-
return
75-
}
76-
got = append(got, req.Logs...)
77-
}
78-
srv := httptest.NewServer(http.HandlerFunc(handler))
79-
defer srv.Close()
80-
81-
ctx, cancel := context.WithCancel(context.Background())
82-
defer cancel()
83-
84-
logger, closer := newCoderLogger(ctx, t, srv.URL, uuid.NewString())
85-
logger(LevelInfo, "1")
86-
logger(LevelInfo, "2")
87-
closer()
88-
logger(LevelInfo, "3")
89-
require.Len(t, got, 2)
90-
assert.Equal(t, "1", got[0].Output)
91-
assert.Equal(t, "2", got[1].Output)
92-
})
93-
9466
t.Run("V1/ErrUnauthorized", func(t *testing.T) {
9567
t.Parallel()
9668

@@ -168,31 +140,42 @@ func TestCoder(t *testing.T) {
168140
require.Len(t, ld.logs, 10)
169141
})
170142

171-
// In this test, we just fake out the DRPC server.
172-
t.Run("V2/Close", func(t *testing.T) {
143+
// In this test, we just stand up an endpoint that does not
144+
// do dRPC. We'll try to connect, fail to websocket upgrade
145+
// and eventually give up.
146+
t.Run("V2/Err", func(t *testing.T) {
173147
t.Parallel()
174148

175-
ctx, cancel := context.WithCancel(context.Background())
176-
defer cancel()
177-
178-
ld := &fakeLogDest{t: t}
179-
ls := agentsdk.NewLogSender(slogtest.Make(t, nil))
180-
logger, closer := sendLogsV2(ctx, ld, ls, slogtest.Make(t, nil))
181-
defer closer()
182-
183-
logger(LevelInfo, "1")
184-
logger(LevelInfo, "2")
185-
closer()
186-
logger(LevelInfo, "3")
149+
token := uuid.NewString()
150+
handlerDone := make(chan struct{})
151+
var closeOnce sync.Once
152+
handler := func(w http.ResponseWriter, r *http.Request) {
153+
if r.URL.Path == "/api/v2/buildinfo" {
154+
w.Header().Set("Content-Type", "application/json")
155+
_, _ = w.Write([]byte(`{"version": "v2.9.0"}`))
156+
return
157+
}
158+
defer closeOnce.Do(func() { close(handlerDone) })
159+
w.WriteHeader(http.StatusOK)
160+
}
161+
srv := httptest.NewServer(http.HandlerFunc(handler))
162+
defer srv.Close()
187163

188-
require.Len(t, ld.logs, 2)
164+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
165+
defer cancel()
166+
u, err := url.Parse(srv.URL)
167+
require.NoError(t, err)
168+
_, _, err = Coder(ctx, u, token)
169+
require.ErrorContains(t, err, "failed to WebSocket dial")
170+
require.ErrorIs(t, err, context.DeadlineExceeded)
171+
<-handlerDone
189172
})
190173

191174
// In this test, we validate that a 401 error on the initial connect
192175
// results in a retry. When envbuilder initially attempts to connect
193176
// using the Coder agent token, the workspace build may not yet have
194177
// completed.
195-
t.Run("V2/Retry", func(t *testing.T) {
178+
t.Run("V2Retry", func(t *testing.T) {
196179
t.Parallel()
197180
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
198181
defer cancel()
@@ -238,99 +221,6 @@ func TestCoder(t *testing.T) {
238221
})
239222
}
240223

241-
//nolint:paralleltest // We need to replace a global timeout.
242-
func TestCoderRPCTimeout(t *testing.T) {
243-
// This timeout is picked with the current subtests in mind, it
244-
// should not be changed without good reason.
245-
testReplaceTimeout(t, &rpcConnectTimeout, 500*time.Millisecond)
246-
247-
// In this test, we just stand up an endpoint that does not
248-
// do dRPC. We'll try to connect, fail to websocket upgrade
249-
// and eventually give up after rpcConnectTimeout.
250-
t.Run("V2/Err", func(t *testing.T) {
251-
t.Parallel()
252-
253-
token := uuid.NewString()
254-
handlerDone := make(chan struct{})
255-
handlerWait := make(chan struct{})
256-
var closeOnce sync.Once
257-
handler := func(w http.ResponseWriter, r *http.Request) {
258-
if r.URL.Path == "/api/v2/buildinfo" {
259-
w.Header().Set("Content-Type", "application/json")
260-
_, _ = w.Write([]byte(`{"version": "v2.9.0"}`))
261-
return
262-
}
263-
defer closeOnce.Do(func() { close(handlerDone) })
264-
<-handlerWait
265-
w.WriteHeader(http.StatusOK)
266-
}
267-
srv := httptest.NewServer(http.HandlerFunc(handler))
268-
defer srv.Close()
269-
270-
ctx, cancel := context.WithTimeout(context.Background(), rpcConnectTimeout/2)
271-
defer cancel()
272-
u, err := url.Parse(srv.URL)
273-
require.NoError(t, err)
274-
_, _, err = Coder(ctx, u, token)
275-
require.ErrorContains(t, err, "failed to WebSocket dial")
276-
require.ErrorIs(t, err, context.DeadlineExceeded)
277-
close(handlerWait)
278-
<-handlerDone
279-
})
280-
281-
t.Run("V2/Timeout", func(t *testing.T) {
282-
t.Parallel()
283-
284-
token := uuid.NewString()
285-
handlerDone := make(chan struct{})
286-
handlerWait := make(chan struct{})
287-
var closeOnce sync.Once
288-
handler := func(w http.ResponseWriter, r *http.Request) {
289-
if r.URL.Path == "/api/v2/buildinfo" {
290-
w.Header().Set("Content-Type", "application/json")
291-
_, _ = w.Write([]byte(`{"version": "v2.9.0"}`))
292-
return
293-
}
294-
defer closeOnce.Do(func() { close(handlerDone) })
295-
<-handlerWait
296-
w.WriteHeader(http.StatusOK)
297-
}
298-
srv := httptest.NewServer(http.HandlerFunc(handler))
299-
defer srv.Close()
300-
301-
ctx, cancel := context.WithTimeout(context.Background(), rpcConnectTimeout*2)
302-
defer cancel()
303-
u, err := url.Parse(srv.URL)
304-
require.NoError(t, err)
305-
_, _, err = Coder(ctx, u, token)
306-
require.ErrorContains(t, err, "failed to WebSocket dial")
307-
require.ErrorIs(t, err, context.DeadlineExceeded)
308-
close(handlerWait)
309-
<-handlerDone
310-
})
311-
}
312-
313-
func decodeV1Logs(t *testing.T, w http.ResponseWriter, r *http.Request) (agentsdk.PatchLogs, bool) {
314-
t.Helper()
315-
var req agentsdk.PatchLogs
316-
err := json.NewDecoder(r.Body).Decode(&req)
317-
if !assert.NoError(t, err) {
318-
http.Error(w, err.Error(), http.StatusBadRequest)
319-
return req, false
320-
}
321-
return req, true
322-
}
323-
324-
func newCoderLogger(ctx context.Context, t *testing.T, us string, token string) (Func, func()) {
325-
t.Helper()
326-
u, err := url.Parse(us)
327-
require.NoError(t, err)
328-
logger, closer, err := Coder(ctx, u, token)
329-
require.NoError(t, err)
330-
t.Cleanup(closer)
331-
return logger, closer
332-
}
333-
334224
type fakeLogDest struct {
335225
t testing.TB
336226
logs []*proto.Log
@@ -341,27 +231,3 @@ func (d *fakeLogDest) BatchCreateLogs(ctx context.Context, request *proto.BatchC
341231
d.logs = append(d.logs, request.Logs...)
342232
return &proto.BatchCreateLogsResponse{}, nil
343233
}
344-
345-
func testReplaceTimeout(t *testing.T, v *time.Duration, d time.Duration) {
346-
t.Helper()
347-
if isParallel(t) {
348-
t.Fatal("cannot replace timeout in parallel test")
349-
}
350-
old := *v
351-
*v = d
352-
t.Cleanup(func() { *v = old })
353-
}
354-
355-
func isParallel(t *testing.T) (ret bool) {
356-
t.Helper()
357-
// This is a hack to determine if the test is running in parallel
358-
// via property of t.Setenv.
359-
defer func() {
360-
if r := recover(); r != nil {
361-
ret = true
362-
}
363-
}()
364-
// Random variable name to avoid collisions.
365-
t.Setenv(fmt.Sprintf("__TEST_CHECK_IS_PARALLEL_%d", rand.Int()), "1")
366-
return false
367-
}

0 commit comments

Comments
 (0)
Please sign in to comment.