Skip to content

Commit e823c99

Browse files
dmitshurgopherbot
authored andcommitted
internal/workflow: run at most one expansion at a time
We've inadvertently been running expansions concurrently during minor releases. For better or worse, it happened to be seemingly fine, and problems started to be noticeable only when needing to restart or approve tasks, which failed with puzzling "unknown task" errors. It's useful to be able to plan builders for the two Go releases in parallel, and it's completely fine for it not to happen concurrently. Instead of getting the workflow to arrange for that, it seems we can do it in the workflow package itself. The new TestManualRetryMultipleExpansions test fails before the change, and passes after. Fixes golang/go#70249. Change-Id: Id87323f77f573d9ac364010dfc0b8581e57ce9b8 Reviewed-on: https://go-review.googlesource.com/c/build/+/626335 Auto-Submit: Dmitri Shuralyov <[email protected]> Reviewed-by: Dmitri Shuralyov <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Carlos Amedee <[email protected]>
1 parent 0f31c95 commit e823c99

File tree

2 files changed

+78
-4
lines changed

2 files changed

+78
-4
lines changed

internal/workflow/workflow.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
// definition rather than producing an output. Unlike Actions and Tasks, they
3636
// execute multiple times and must produce exactly the same workflow
3737
// modifications each time. As such, they should be pure functions of their
38-
// inputs. Producing different modifications, or running multiple expansions
39-
// concurrently, is an error that will corrupt the workflow's state.
38+
// inputs. Producing different modifications is an error that will corrupt
39+
// the workflow's state. A workflow will run at most one expansion at a time.
4040
//
4141
// Once a Definition is complete, call Start to set its parameters and
4242
// instantiate it into a Workflow. Call Run to execute the workflow until
@@ -486,8 +486,7 @@ func (d *dependency) ready(w *Workflow) bool {
486486
// Unlike normal tasks, expansions may run multiple times and must produce
487487
// the exact same changes to the definition each time.
488488
//
489-
// Running more than one expansion concurrently is an error and will corrupt
490-
// the workflow.
489+
// A workflow will run at most one expansion at a time.
491490
func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), opts ...TaskOption) Value[O1] {
492491
return addExpansion[O1](d, name, f, nil, opts)
493492
}
@@ -807,6 +806,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
807806
doneOnce := ctx.Done()
808807
for {
809808
running := 0
809+
runningExpansion := false // Whether an expansion is running, and hasn't completed yet.
810810
allDone := true
811811
for _, task := range w.tasks {
812812
if !task.created {
@@ -834,11 +834,16 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
834834
if !ready {
835835
continue
836836
}
837+
if task.def.isExpansion && runningExpansion {
838+
// Don't start a new expansion until the currently running one completes.
839+
continue
840+
}
837841
task.started = true
838842
running++
839843
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
840844
taskCopy := *task
841845
if task.def.isExpansion {
846+
runningExpansion = true
842847
defCopy := w.def.shallowClone()
843848
go func() { stateChan <- runExpansion(defCopy, taskCopy, args) }()
844849
} else {
@@ -861,6 +866,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
861866
case state := <-stateChan:
862867
if state.def.isExpansion && state.finished && state.err == nil {
863868
state.err = w.expand(state.expanded)
869+
runningExpansion = false
864870
}
865871
listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
866872
w.tasks[state.def] = &state

internal/workflow/workflow_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,74 @@ func TestManualRetry(t *testing.T) {
332332
}
333333
}
334334

335+
// Test that manual retry works on tasks that come from different expansions.
336+
//
337+
// This is similar to how the Go minor release workflow plans builders for
338+
// both releases. It previously failed due to expansions racing with with other,
339+
// leading to "unknown task" errors when retrying. See go.dev/issue/70249.
340+
func TestManualRetryMultipleExpansions(t *testing.T) {
341+
// Create two sub-workflows, each one with an expansion that adds one work task.
342+
// The work tasks fail on the first try, and require being successfully restarted
343+
// for the workflow to complete.
344+
var counters, retried [2]int
345+
wd := wf.New(wf.ACL{})
346+
sub1 := wd.Sub("sub1")
347+
sub2 := wd.Sub("sub2")
348+
for i, wd := range []*wf.Definition{sub1, sub2} {
349+
out := wf.Expand0(wd, fmt.Sprintf("expand %d", i+1), func(wd *wf.Definition) (wf.Value[string], error) {
350+
return wf.Task0(wd, fmt.Sprintf("work %d", i+1), func(ctx *wf.TaskContext) (string, error) {
351+
ctx.DisableRetries()
352+
counters[i]++
353+
if counters[i] == 1 {
354+
return "", fmt.Errorf("first try fail")
355+
}
356+
return "", nil
357+
}), nil
358+
})
359+
wf.Output(wd, "out", out)
360+
}
361+
362+
w := startWorkflow(t, wd, nil)
363+
listener := &errorListener{
364+
taskName: "work 1",
365+
callback: func(string) {
366+
go func() {
367+
retried[0]++
368+
err := w.RetryTask(context.Background(), "work 1")
369+
if err != nil {
370+
t.Errorf(`RetryTask("work 1") failed: %v`, err)
371+
}
372+
}()
373+
},
374+
Listener: &errorListener{
375+
taskName: "work 2",
376+
callback: func(string) {
377+
go func() {
378+
retried[1]++
379+
err := w.RetryTask(context.Background(), "work 2")
380+
if err != nil {
381+
t.Errorf(`RetryTask("work 2") failed: %v`, err)
382+
}
383+
}()
384+
},
385+
Listener: &verboseListener{t},
386+
},
387+
}
388+
runWorkflow(t, w, listener)
389+
if counters[0] != 2 {
390+
t.Errorf("sub1 task ran %v times, wanted 2", counters[0])
391+
}
392+
if retried[0] != 1 {
393+
t.Errorf("sub1 task was retried %v times, wanted 1", retried[0])
394+
}
395+
if counters[1] != 2 {
396+
t.Errorf("sub2 task ran %v times, wanted 2", counters[1])
397+
}
398+
if retried[1] != 1 {
399+
t.Errorf("sub2 task was retried %v times, wanted 1", retried[1])
400+
}
401+
}
402+
335403
func TestAutomaticRetry(t *testing.T) {
336404
counter := 0
337405
needsRetry := func(ctx *wf.TaskContext) (string, error) {

0 commit comments

Comments
 (0)