Skip to content

Commit a8269bc

Browse files
test(task): add platform adapter tests
Also fix a handful of segfaults caused by improperly retained byte slices from bolt, and combine two view transactions into one to avoid data race when a delete happened between them.
1 parent 745376a commit a8269bc

File tree

7 files changed

+538
-62
lines changed

7 files changed

+538
-62
lines changed

task/adaptertest/adaptertest.go

Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
// Package adaptertest provides tests to ensure that implementations of
2+
// platform/task/backend.Store and platform/task/backend.LogReader meet the requirements of platform.TaskService.
3+
//
4+
// Consumers of this package must import query/builtin.
5+
// This package does not import it directly, to avoid requiring it too early.
6+
package adaptertest
7+
8+
import (
9+
"bytes"
10+
"context"
11+
"fmt"
12+
"math"
13+
"runtime"
14+
"sync"
15+
"testing"
16+
"time"
17+
18+
"github.com/influxdata/platform"
19+
"github.com/influxdata/platform/snowflake"
20+
"github.com/influxdata/platform/task"
21+
"github.com/influxdata/platform/task/backend"
22+
"github.com/influxdata/platform/task/options"
23+
)
24+
25+
func init() {
26+
// TODO(mr): remove as part of https://github.com/influxdata/platform/issues/484.
27+
options.EnableScriptCacheForTest()
28+
}
29+
30+
// BackendComponentFactory is supplied by consumers of the adaptertestpackage,
31+
// to provide the values required to constitute a PlatformAdapter.
32+
// The provided context.CancelFunc is called after the test,
33+
// and it is the implementer's responsibility to clean up after that is called.
34+
//
35+
// If creating the System fails, the implementer should call t.Fatal.
36+
type BackendComponentFactory func(t *testing.T) (*System, context.CancelFunc)
37+
38+
// TestTaskService should be called by consumers of the adaptertest package.
39+
// This will call fn once to create a single platform.TaskService
40+
// used across all subtests in TestTaskService.
41+
func TestTaskService(t *testing.T, fn BackendComponentFactory) {
42+
sys, cancel := fn(t)
43+
defer cancel()
44+
sys.ts = task.PlatformAdapter(sys.S, sys.LR)
45+
46+
t.Run("TaskService", func(t *testing.T) {
47+
// We're running the subtests in parallel, but if we don't use this wrapper,
48+
// the defer cancel() call above would return before the parallel subtests completed.
49+
//
50+
// Running the subtests in parallel might make them slightly faster,
51+
// but more importantly, it should exercise concurrency to catch data races.
52+
53+
t.Run("Task CRUD", func(t *testing.T) {
54+
t.Parallel()
55+
testTaskCRUD(t, sys)
56+
})
57+
58+
t.Run("Task Runs", func(t *testing.T) {
59+
t.Parallel()
60+
testTaskRuns(t, sys)
61+
})
62+
63+
t.Run("Task Concurrency", func(t *testing.T) {
64+
t.Parallel()
65+
testTaskConcurrency(t, sys)
66+
})
67+
})
68+
}
69+
70+
// System, as in "system under test", encapsulates the required parts of a platform.TaskAdapter
71+
// (the underlying Store, LogReader, and LogWriter) for low-level operations.
72+
type System struct {
73+
S backend.Store
74+
LR backend.LogReader
75+
LW backend.LogWriter
76+
77+
// Set this context, to be used in tests, so that any spawned goroutines watching Ctx.Done()
78+
// will clean up after themselves.
79+
Ctx context.Context
80+
81+
ts platform.TaskService
82+
}
83+
84+
func testTaskCRUD(t *testing.T, sys *System) {
85+
orgID := idGen.ID()
86+
userID := idGen.ID()
87+
88+
// Create a task.
89+
task := &platform.Task{Organization: orgID, Owner: platform.User{ID: userID}, Flux: fmt.Sprintf(scriptFmt, 0)}
90+
if err := sys.ts.CreateTask(sys.Ctx, task); err != nil {
91+
t.Fatal(err)
92+
}
93+
if task.ID == nil {
94+
t.Fatal("no task ID set")
95+
}
96+
97+
// Look up a task the different ways we can.
98+
// Map of method name to found task.
99+
found := make(map[string]*platform.Task)
100+
101+
// Find by ID should return the right task.
102+
f, err := sys.ts.FindTaskByID(sys.Ctx, task.ID)
103+
if err != nil {
104+
t.Fatal(err)
105+
}
106+
found["FindTaskByID"] = f
107+
108+
fs, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &orgID})
109+
if err != nil {
110+
t.Fatal(err)
111+
}
112+
if len(fs) != 1 {
113+
t.Fatalf("expected 1 task returned, got %d: %#v", len(fs), fs)
114+
}
115+
found["FindTasks with Organization filter"] = fs[0]
116+
117+
fs, _, err = sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{User: &userID})
118+
if err != nil {
119+
t.Fatal(err)
120+
}
121+
if len(fs) != 1 {
122+
t.Fatalf("expected 1 task returned, got %d: %#v", len(fs), fs)
123+
}
124+
found["FindTasks with User filter"] = fs[0]
125+
126+
for fn, f := range found {
127+
if !bytes.Equal(f.Organization, orgID) {
128+
t.Fatalf("%s: wrong organization returned; want %s, got %s", fn, orgID.String(), f.Organization.String())
129+
}
130+
if !bytes.Equal(f.Owner.ID, userID) {
131+
t.Fatalf("%s: wrong user returned; want %s, got %s", fn, userID.String(), f.Owner.ID.String())
132+
}
133+
134+
if f.Name != "task #0" {
135+
t.Fatalf(`%s: wrong name returned; want "task #0", got %q`, fn, f.Name)
136+
}
137+
if f.Cron != "* * * * *" {
138+
t.Fatalf(`%s: wrong cron returned; want "* * * * *", got %q`, fn, f.Cron)
139+
}
140+
if f.Every != "" {
141+
t.Fatalf(`%s: wrong every returned; want "", got %q`, fn, f.Every)
142+
}
143+
}
144+
145+
// Update task.
146+
newFlux := fmt.Sprintf(scriptFmt, 99)
147+
origID := append(platform.ID(nil), f.ID...)
148+
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Flux: &newFlux})
149+
if err != nil {
150+
t.Fatal(err)
151+
}
152+
153+
if !bytes.Equal(origID, f.ID) {
154+
t.Fatalf("task ID unexpectedly changed during update, from %s to %s", origID.String(), f.ID.String())
155+
}
156+
if f.Flux != newFlux {
157+
t.Fatalf("wrong flux from update; want %q, got %q", newFlux, f.Flux)
158+
}
159+
160+
// Delete task.
161+
if err := sys.ts.DeleteTask(sys.Ctx, origID); err != nil {
162+
t.Fatal(err)
163+
}
164+
165+
// Task should not be returned.
166+
if _, err := sys.ts.FindTaskByID(sys.Ctx, origID); err != nil {
167+
t.Fatal(err)
168+
}
169+
}
170+
171+
func testTaskRuns(t *testing.T, sys *System) {
172+
orgID := idGen.ID()
173+
userID := idGen.ID()
174+
175+
task := &platform.Task{Organization: orgID, Owner: platform.User{ID: userID}, Flux: fmt.Sprintf(scriptFmt, 0)}
176+
if err := sys.ts.CreateTask(sys.Ctx, task); err != nil {
177+
t.Fatal(err)
178+
}
179+
180+
const requestedAtUnix = 1000
181+
if err := sys.S.ManuallyRunTimeRange(sys.Ctx, task.ID, 60, 300, requestedAtUnix); err != nil {
182+
t.Fatal(err)
183+
}
184+
185+
// Create a run.
186+
rc, err := sys.S.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix+1)
187+
if err != nil {
188+
t.Fatal(err)
189+
}
190+
if !bytes.Equal(rc.Created.TaskID, task.ID) {
191+
t.Fatalf("unexpected created run: got %s, want %s", rc.Created.TaskID.String(), task.ID.String())
192+
}
193+
runID := rc.Created.RunID
194+
195+
// Set the run state to started.
196+
st, err := sys.S.FindTaskByID(sys.Ctx, task.ID)
197+
if err != nil {
198+
t.Fatal(err)
199+
}
200+
startedAt := time.Now()
201+
if err := sys.LW.UpdateRunState(sys.Ctx, st, runID, startedAt, backend.RunStarted); err != nil {
202+
t.Fatal(err)
203+
}
204+
205+
// Find runs, to see the started run.
206+
runs, n, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &orgID, Task: &task.ID})
207+
if err != nil {
208+
t.Fatal(err)
209+
}
210+
if n != len(runs) {
211+
t.Fatalf("expected n=%d, got %d", len(runs), n)
212+
}
213+
if len(runs) != 1 {
214+
t.Fatalf("expected 1 run returned, got %d", len(runs))
215+
}
216+
217+
r := runs[0]
218+
if !bytes.Equal(r.ID, runID) {
219+
t.Fatalf("expected to find run with ID %s, got %s", runID.String(), r.ID.String())
220+
}
221+
if !bytes.Equal(r.TaskID, task.ID) {
222+
t.Fatalf("expected run to have task ID %s, got %s", task.ID.String(), r.TaskID.String())
223+
}
224+
if want := startedAt.UTC().Format(time.RFC3339); r.StartedAt != want {
225+
t.Fatalf("expected run to be started at %q, got %q", want, r.StartedAt)
226+
}
227+
if want := time.Unix(rc.Created.Now, 0).UTC().Format(time.RFC3339); r.ScheduledFor != want {
228+
// Not yet expected to match. Change to t.Fatalf as part of addressing https://github.com/influxdata/platform/issues/753.
229+
t.Logf("TODO(#753): expected run to be scheduled for %q, got %q", want, r.ScheduledFor)
230+
}
231+
if want := time.Unix(requestedAtUnix, 0).UTC().Format(time.RFC3339); r.RequestedAt != want {
232+
// Not yet expected to match. Change to t.Fatalf as part of addressing https://github.com/influxdata/platform/issues/753.
233+
t.Logf("TODO(#753): expected run to be requested at %q, got %q", want, r.RequestedAt)
234+
}
235+
if r.FinishedAt != "" {
236+
t.Fatalf("expected run not be finished, got %q", r.FinishedAt)
237+
}
238+
}
239+
240+
func testTaskConcurrency(t *testing.T, sys *System) {
241+
orgID := idGen.ID()
242+
userID := idGen.ID()
243+
244+
const numTasks = 300
245+
taskCh := make(chan *platform.Task, numTasks)
246+
247+
var createWg sync.WaitGroup
248+
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
249+
createWg.Add(1)
250+
go func() {
251+
defer createWg.Done()
252+
for task := range taskCh {
253+
if err := sys.ts.CreateTask(sys.Ctx, task); err != nil {
254+
t.Errorf("error creating task: %v", err)
255+
}
256+
}
257+
}()
258+
}
259+
260+
// Signal for non-creator goroutines to stop.
261+
quitCh := make(chan struct{})
262+
go func() {
263+
createWg.Wait()
264+
close(quitCh)
265+
}()
266+
267+
var extraWg sync.WaitGroup
268+
// Get all the tasks, and delete the first one we find.
269+
extraWg.Add(1)
270+
go func() {
271+
defer extraWg.Done()
272+
273+
deleted := 0
274+
defer func() {
275+
t.Logf("Concurrently deleted %d tasks", deleted)
276+
}()
277+
for {
278+
// Check if we need to quit.
279+
select {
280+
case <-quitCh:
281+
return
282+
default:
283+
}
284+
285+
// Get all the tasks.
286+
tasks, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &orgID})
287+
if err != nil {
288+
t.Errorf("error finding tasks: %v", err)
289+
return
290+
}
291+
if len(tasks) == 0 {
292+
continue
293+
}
294+
295+
// Check again if we need to quit.
296+
select {
297+
case <-quitCh:
298+
return
299+
default:
300+
}
301+
302+
// Delete the first task we found.
303+
if err := sys.ts.DeleteTask(sys.Ctx, tasks[0].ID); err != nil {
304+
t.Errorf("error deleting task: %v", err)
305+
return
306+
}
307+
deleted++
308+
309+
// Wait just a tiny bit.
310+
time.Sleep(time.Millisecond)
311+
}
312+
}()
313+
314+
extraWg.Add(1)
315+
go func() {
316+
defer extraWg.Done()
317+
318+
runsCreated := 0
319+
defer func() {
320+
t.Logf("Concurrently created %d runs", runsCreated)
321+
}()
322+
for {
323+
// Check if we need to quit.
324+
select {
325+
case <-quitCh:
326+
return
327+
default:
328+
}
329+
330+
// Get all the tasks.
331+
tasks, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &orgID})
332+
if err != nil {
333+
t.Errorf("error finding tasks: %v", err)
334+
return
335+
}
336+
if len(tasks) == 0 {
337+
continue
338+
}
339+
340+
// Check again if we need to quit.
341+
select {
342+
case <-quitCh:
343+
return
344+
default:
345+
}
346+
347+
// Create a run for the last task we found.
348+
// The script should run every minute, so use max now.
349+
tid := tasks[len(tasks)-1].ID
350+
if _, err := sys.S.CreateNextRun(sys.Ctx, tid, math.MaxInt64); err != nil {
351+
// This may have errored due to the task being deleted. Check if the task still exists.
352+
if t, err2 := sys.S.FindTaskByID(sys.Ctx, tid); err2 == nil && t == nil {
353+
// It was deleted. Just continue.
354+
continue
355+
}
356+
// Otherwise, we were able to find the task, so something went wrong here.
357+
t.Errorf("error creating next run: %v", err)
358+
return
359+
}
360+
runsCreated++
361+
362+
// Wait just a tiny bit.
363+
time.Sleep(time.Millisecond)
364+
}
365+
}()
366+
367+
// Start adding tasks.
368+
for i := 0; i < numTasks; i++ {
369+
taskCh <- &platform.Task{
370+
Organization: orgID,
371+
Owner: platform.User{ID: userID},
372+
Flux: fmt.Sprintf(scriptFmt, i),
373+
}
374+
}
375+
376+
// Done adding. Wait for cleanup.
377+
close(taskCh)
378+
createWg.Wait()
379+
extraWg.Wait()
380+
}
381+
382+
const scriptFmt = `option task = {
383+
name: "task #%d",
384+
cron: "* * * * *",
385+
concurrency: 100,
386+
}
387+
from(bucket:"b") |> toHTTP(url:"http://example.com")`
388+
389+
var idGen = snowflake.NewIDGenerator()

0 commit comments

Comments
 (0)