Skip to content

Commit a018037

Browse files
trigger: Remove need for specifying starting point (#107)
* trigger: Remove need for specifying starting point * clean up
1 parent e5125b8 commit a018037

23 files changed

+106
-84
lines changed

_examples/callback/callback_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestCallbackWorkflow(t *testing.T) {
2525
wf.Run(ctx)
2626

2727
foreignID := "andrew"
28-
runID, err := wf.Trigger(ctx, foreignID, callback.StatusStarted)
28+
runID, err := wf.Trigger(ctx, foreignID)
2929
require.Nil(t, err)
3030

3131
workflow.TriggerCallbackOn(t, wf, foreignID, runID, callback.StatusStarted, callback.EmailConfirmationResponse{

_examples/connector/connector.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func Workflow(d Deps) *workflow.Workflow[GettingStarted, Status] {
5454
_, err := api.Trigger(
5555
ctx,
5656
e.ForeignID,
57-
StatusStarted,
5857
workflow.WithInitialValue[GettingStarted, Status](&GettingStarted{
5958
ReadTheDocs: "✅",
6059
}),

_examples/gettingstarted/gettingstarted_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestWorkflow(t *testing.T) {
2727
wf.Run(ctx)
2828

2929
foreignID := "82347982374982374"
30-
_, err := wf.Trigger(ctx, foreignID, gettingstarted.StatusStarted)
30+
_, err := wf.Trigger(ctx, foreignID)
3131
require.Nil(t, err)
3232

3333
workflow.Require(t, wf, foreignID, gettingstarted.StatusReadTheDocs, gettingstarted.GettingStarted{

_examples/schedule/schedule_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestExampleWorkflow(t *testing.T) {
3636
foreignID := "hourly-run"
3737

3838
go func() {
39-
err := wf.Schedule(foreignID, schedule.StatusStarted, "@hourly")
39+
err := wf.Schedule(foreignID, "@hourly")
4040
require.Nil(t, err)
4141
}()
4242

_examples/timeout/timeout_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestTimeoutWorkflow(t *testing.T) {
3232
wf.Run(ctx)
3333

3434
foreignID := "andrew"
35-
runID, err := wf.Trigger(ctx, foreignID, timeout.StatusStarted)
35+
runID, err := wf.Trigger(ctx, foreignID)
3636
require.Nil(t, err)
3737

3838
workflow.AwaitTimeoutInsert(t, wf, foreignID, runID, timeout.StatusStarted)

_examples/webui/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func main() {
2727
"Customer 3",
2828
}
2929
for _, foreignID := range seed {
30-
_, err := w.Trigger(ctx, foreignID, StatusStart)
30+
_, err := w.Trigger(ctx, foreignID)
3131
if err != nil {
3232
panic(err)
3333
}

adapters/adaptertest/connector.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ func RunConnectorTest(t *testing.T, maker func(seedEvents []workflow.ConnectorEv
3838
_, err := api.Trigger(
3939
ctx,
4040
e.ForeignID,
41-
SyncStatusStarted,
4241
workflow.WithInitialValue[User, SyncStatus](&User{
4342
UID: e.ForeignID,
4443
}),

adapters/adaptertest/eventstreaming.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func RunEventStreamerTest(t *testing.T, factory func() workflow.EventStreamer) {
158158
u := User{
159159
CountryCode: "GB",
160160
}
161-
runId, err := wf.Trigger(ctx, foreignID, SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&u))
161+
runId, err := wf.Trigger(ctx, foreignID, workflow.WithInitialValue[User, SyncStatus](&u))
162162
require.Nil(t, err)
163163

164164
workflow.AwaitTimeoutInsert(t, wf, foreignID, runId, SyncStatusEmailSet)

adapters/reflexstreamer/streamfunc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestStreamFunc(t *testing.T) {
2424
wf, store, ctx, cancel := createTestWorkflow(t, dbc, eventsTable)
2525

2626
fid := "23847923847"
27-
_, err := wf.Trigger(ctx, fid, statusStart)
27+
_, err := wf.Trigger(ctx, fid)
2828
require.Nil(t, err)
2929

3030
workflow.Require(t, wf, fid, statusEnd, "Started and Completed in a Workflow")
@@ -66,7 +66,7 @@ func TestOnComplete(t *testing.T) {
6666
wf, store, ctx, cancel := createTestWorkflow(t, dbc, eventsTable)
6767

6868
fid := "23847923847"
69-
_, err := wf.Trigger(ctx, fid, statusStart)
69+
_, err := wf.Trigger(ctx, fid)
7070
require.Nil(t, err)
7171

7272
workflow.Require(t, wf, fid, statusEnd, "Started and Completed in a Workflow")

autopause_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestRetryOfPausedRecords(t *testing.T) {
4646
t.Cleanup(w.Stop)
4747

4848
fid := "12345"
49-
_, err := w.Trigger(ctx, fid, StatusStart)
49+
_, err := w.Trigger(ctx, fid)
5050
require.NoError(t, err)
5151

5252
workflow.Require(t, w, fid, StatusEnd, "")

await_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestAwait(t *testing.T) {
3333
wf.Run(ctx)
3434
t.Cleanup(wf.Stop)
3535

36-
runID, err := wf.Trigger(ctx, "1", StatusStart)
36+
runID, err := wf.Trigger(ctx, "1")
3737
require.Nil(t, err)
3838

3939
res, err := wf.Await(ctx, "1", runID, StatusEnd, workflow.WithAwaitPollingFrequency(10*time.Nanosecond))

builder.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,18 @@ func (b *Builder[Type, Status]) Build(
246246
}
247247

248248
if len(b.workflow.timeouts) > 0 && b.workflow.timeoutStore == nil {
249-
panic("cannot configure timeouts without providing TimeoutStore for workflow")
249+
panic("Cannot configure timeouts without providing TimeoutStore for workflow")
250250
}
251251

252+
graph := b.workflow.statusGraph.Info()
253+
if len(graph.StartingNodes) < 1 {
254+
panic(
255+
"Workflow requires at least one starting point. Please provide at least one Step, Callback, or Timeout to add a starting point.",
256+
)
257+
}
258+
259+
b.workflow.defaultStartingPoint = Status(graph.StartingNodes[0])
260+
252261
return b.workflow
253262
}
254263

builder_internal_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func TestWithClock(t *testing.T) {
103103
now := time.Now()
104104
clock := clock_testing.NewFakeClock(now)
105105
b := NewBuilder[string, testStatus]("determine starting points")
106+
b.AddStep(statusStart, nil, statusMiddle)
106107
wf := b.Build(nil, nil, nil, WithClock(clock))
107108

108109
clock.Step(time.Hour)
@@ -131,6 +132,7 @@ func TestBuildOptions(t *testing.T) {
131132
}
132133

133134
b := NewBuilder[string, testStatus]("determine starting points")
135+
b.AddStep(statusStart, nil, statusMiddle)
134136
w := b.Build(
135137
nil,
136138
nil,
@@ -207,6 +209,16 @@ func TestAddTimeoutPollingFrequency(t *testing.T) {
207209
require.Equal(t, time.Minute, b.workflow.timeouts[statusStart].pollingFrequency)
208210
}
209211

212+
func TestDefaultStartingPoint(t *testing.T) {
213+
require.PanicsWithValue(t,
214+
"Workflow requires at least one starting point. Please provide at least one Step, Callback, or Timeout to add a starting point.",
215+
func() {
216+
b := NewBuilder[string, testStatus]("")
217+
_ = b.Build(nil, nil, nil)
218+
},
219+
)
220+
}
221+
210222
func TestAddTimeoutDontAllowParallelCount(t *testing.T) {
211223
require.PanicsWithValue(t,
212224
"Cannot configure parallel timeout",
@@ -248,6 +260,7 @@ func TestConnectorConstruction(t *testing.T) {
248260
ErrBackOff(time.Hour*6),
249261
)
250262

263+
b.AddStep(statusStart, nil, statusEnd)
251264
w := b.Build(nil, nil, nil)
252265

253266
for _, config := range w.connectorConfigs {
@@ -374,7 +387,7 @@ func TestConfigureTimeoutWithoutTimeoutStore(t *testing.T) {
374387

375388
// Should panic as setting a second config of statusStart
376389
require.PanicsWithValue(t,
377-
"cannot configure timeouts without providing TimeoutStore for workflow",
390+
"Cannot configure timeouts without providing TimeoutStore for workflow",
378391
func() {
379392
b.Build(
380393
nil,

hook_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestWorkflow_OnPauseHook(t *testing.T) {
2929
})
3030

3131
foreignID := "andrew"
32-
_, err := wf.Trigger(context.Background(), foreignID, StatusStart)
32+
_, err := wf.Trigger(context.Background(), foreignID)
3333
require.Nil(t, err)
3434

3535
wg.Wait()
@@ -51,7 +51,7 @@ func TestWorkflow_OnCancelHook(t *testing.T) {
5151
})
5252

5353
foreignID := "andrew"
54-
_, err := wf.Trigger(context.Background(), foreignID, StatusStart)
54+
_, err := wf.Trigger(context.Background(), foreignID)
5555
require.Nil(t, err)
5656

5757
wg.Wait()
@@ -73,7 +73,7 @@ func TestWorkflow_OnCompleteHook(t *testing.T) {
7373
})
7474

7575
foreignID := "andrew"
76-
_, err := wf.Trigger(context.Background(), foreignID, StatusStart)
76+
_, err := wf.Trigger(context.Background(), foreignID)
7777
require.Nil(t, err)
7878

7979
wg.Wait()

metrics_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func TestRunStateChanges(t *testing.T) {
358358
w.Run(ctx)
359359
t.Cleanup(w.Stop)
360360

361-
_, err := w.Trigger(ctx, "983467934", StatusStart)
361+
_, err := w.Trigger(ctx, "983467934")
362362
require.Nil(t, err)
363363

364364
time.Sleep(time.Millisecond * 500)
@@ -391,13 +391,13 @@ func TestMetricProcessSkippedEvents(t *testing.T) {
391391
w.Run(ctx)
392392
t.Cleanup(w.Stop)
393393

394-
_, err := w.Trigger(ctx, "9834679343", StatusStart)
394+
_, err := w.Trigger(ctx, "9834679343")
395395
require.Nil(t, err)
396396

397-
_, err = w.Trigger(ctx, "2349839483", StatusStart)
397+
_, err = w.Trigger(ctx, "2349839483")
398398
require.Nil(t, err)
399399

400-
_, err = w.Trigger(ctx, "7548702398", StatusStart)
400+
_, err = w.Trigger(ctx, "7548702398")
401401
require.Nil(t, err)
402402

403403
time.Sleep(time.Millisecond * 500)

runstate_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestRunState(t *testing.T) {
6868
t.Cleanup(w.Stop)
6969

7070
// Trigger workflow before it's running to assert that the initial state is workflow.RunStateInitiated
71-
runID, err := w.Trigger(ctx, "fid", StatusStart)
71+
runID, err := w.Trigger(ctx, "fid")
7272
require.Nil(t, err)
7373

7474
time.Sleep(time.Second)

schedule.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"strconv"
87
"time"
98

109
"github.com/robfig/cron/v3"
@@ -13,24 +12,13 @@ import (
1312

1413
func (w *Workflow[Type, Status]) Schedule(
1514
foreignID string,
16-
startingStatus Status,
1715
spec string,
1816
opts ...ScheduleOption[Type, Status],
1917
) error {
2018
if !w.calledRun {
2119
return fmt.Errorf("schedule failed: workflow is not running")
2220
}
2321

24-
if !w.statusGraph.IsValid(int(startingStatus)) {
25-
w.logger.Debug(
26-
w.ctx,
27-
fmt.Sprintf("ensure %v is configured for workflow: %v", startingStatus, w.Name()),
28-
map[string]string{},
29-
)
30-
31-
return fmt.Errorf("schedule failed: status provided is not configured for workflow: %s", startingStatus)
32-
}
33-
3422
var options scheduleOpts[Type, Status]
3523
for _, opt := range opts {
3624
opt(&options)
@@ -41,8 +29,8 @@ func (w *Workflow[Type, Status]) Schedule(
4129
return err
4230
}
4331

44-
role := makeRole(w.Name(), strconv.FormatInt(int64(startingStatus), 10), foreignID, "scheduler", spec)
45-
processName := makeRole(startingStatus.String(), foreignID, "scheduler", spec)
32+
role := makeRole(w.Name(), foreignID, "scheduler", spec)
33+
processName := makeRole(foreignID, "scheduler", spec)
4634

4735
w.launching.Add(1)
4836
w.run(role, processName, func(ctx context.Context) error {
@@ -92,7 +80,7 @@ func (w *Workflow[Type, Status]) Schedule(
9280
return nil
9381
}
9482

95-
_, err = w.Trigger(ctx, foreignID, startingStatus, tOpts...)
83+
_, err = w.Trigger(ctx, foreignID, tOpts...)
9684
if errors.Is(err, ErrWorkflowInProgress) {
9785
// NoReturnErr: Fallthrough to schedule next workflow as there is already one in progress. If this
9886
// happens it is likely that we scheduled a workflow and were unable to schedule the next.

schedule_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestSchedule(t *testing.T) {
4646
t.Cleanup(wf.Stop)
4747

4848
go func() {
49-
err := wf.Schedule("andrew", StatusStart, "@monthly")
49+
err := wf.Schedule("andrew", "@monthly")
5050
require.Nil(t, err)
5151
}()
5252

@@ -105,7 +105,7 @@ func TestWorkflow_ScheduleShutdown(t *testing.T) {
105105
wg.Add(1)
106106
go func() {
107107
wg.Done()
108-
err := wf.Schedule("andrew", StatusStart, "@monthly")
108+
err := wf.Schedule("andrew", "@monthly")
109109
require.Nil(t, err)
110110
}()
111111

@@ -114,21 +114,21 @@ func TestWorkflow_ScheduleShutdown(t *testing.T) {
114114
time.Sleep(200 * time.Millisecond)
115115

116116
require.Equal(t, map[string]workflow.State{
117-
"start-andrew-scheduler-@monthly": workflow.StateRunning,
118-
"start-consumer-1-of-1": workflow.StateRunning,
119-
"outbox-consumer": workflow.StateRunning,
120-
"delete-consumer": workflow.StateRunning,
121-
"paused-records-retry-consumer": workflow.StateRunning,
117+
"andrew-scheduler-@monthly": workflow.StateRunning,
118+
"start-consumer-1-of-1": workflow.StateRunning,
119+
"outbox-consumer": workflow.StateRunning,
120+
"delete-consumer": workflow.StateRunning,
121+
"paused-records-retry-consumer": workflow.StateRunning,
122122
}, wf.States())
123123

124124
wf.Stop()
125125

126126
require.Equal(t, map[string]workflow.State{
127-
"start-andrew-scheduler-@monthly": workflow.StateShutdown,
128-
"start-consumer-1-of-1": workflow.StateShutdown,
129-
"outbox-consumer": workflow.StateShutdown,
130-
"delete-consumer": workflow.StateShutdown,
131-
"paused-records-retry-consumer": workflow.StateShutdown,
127+
"andrew-scheduler-@monthly": workflow.StateShutdown,
128+
"start-consumer-1-of-1": workflow.StateShutdown,
129+
"outbox-consumer": workflow.StateShutdown,
130+
"delete-consumer": workflow.StateShutdown,
131+
"paused-records-retry-consumer": workflow.StateShutdown,
132132
}, wf.States())
133133
}
134134

@@ -169,7 +169,7 @@ func TestWorkflow_ScheduleFilter(t *testing.T) {
169169
opt := workflow.WithScheduleFilter[MyType, status](filter)
170170

171171
go func() {
172-
err := wf.Schedule("andrew", StatusStart, "@monthly", opt)
172+
err := wf.Schedule("andrew", "@monthly", opt)
173173
require.Nil(t, err)
174174
}()
175175

testing_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestRequire(t *testing.T) {
110110
t.Cleanup(wf.Stop)
111111

112112
fid := "10298309123"
113-
_, err := wf.Trigger(ctx, fid, StatusStart)
113+
_, err := wf.Trigger(ctx, fid)
114114
require.Nil(t, err)
115115

116116
workflow.Require(t, wf, fid, StatusEnd, "Lower")
@@ -181,7 +181,7 @@ func TestWaitFor(t *testing.T) {
181181
t.Cleanup(wf.Stop)
182182

183183
fid := "10298309123"
184-
_, err := wf.Trigger(ctx, fid, StatusStart)
184+
_, err := wf.Trigger(ctx, fid)
185185
require.Nil(t, err)
186186

187187
workflow.WaitFor(t, wf, fid, func(r *workflow.Run[string, status]) (bool, error) {
@@ -197,11 +197,11 @@ func (a apiImpl[Type, Status]) Name() string {
197197
return "test"
198198
}
199199

200-
func (a apiImpl[Type, Status]) Trigger(ctx context.Context, foreignID string, startingStatus Status, opts ...workflow.TriggerOption[Type, Status]) (runID string, err error) {
200+
func (a apiImpl[Type, Status]) Trigger(ctx context.Context, foreignID string, opts ...workflow.TriggerOption[Type, Status]) (runID string, err error) {
201201
return "", nil
202202
}
203203

204-
func (a apiImpl[Type, Status]) Schedule(foreignID string, startingStatus Status, spec string, opts ...workflow.ScheduleOption[Type, Status]) error {
204+
func (a apiImpl[Type, Status]) Schedule(foreignID string, spec string, opts ...workflow.ScheduleOption[Type, Status]) error {
205205
return nil
206206
}
207207

0 commit comments

Comments
 (0)