Skip to content

Commit 0de6f67

Browse files
workflow/consumer: Simplify consumer pattern and change paused retry to consumer (#95)
* workflow/consumer: Clean up consumer pattern and convert paused retry to consumer * make consume unexported * add more unit tests and test cases * clean up file naming * clean up naming
1 parent 7548142 commit 0de6f67

36 files changed

+964
-1347
lines changed

adapters/kafkastreamer/kafka.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ type StreamConstructor struct {
2222
brokers []string
2323
}
2424

25-
func (s StreamConstructor) NewProducer(ctx context.Context, topic string) (workflow.Producer, error) {
26-
return &Producer{
25+
func (s StreamConstructor) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) {
26+
return &Sender{
2727
Topic: topic,
2828
Writer: &kafka.Writer{
2929
Addr: kafka.TCP(s.brokers...),
@@ -35,15 +35,15 @@ func (s StreamConstructor) NewProducer(ctx context.Context, topic string) (workf
3535
}, nil
3636
}
3737

38-
type Producer struct {
38+
type Sender struct {
3939
Topic string
4040
Writer *kafka.Writer
4141
WriterTimeout time.Duration
4242
}
4343

44-
var _ workflow.Producer = (*Producer)(nil)
44+
var _ workflow.EventSender = (*Sender)(nil)
4545

46-
func (p *Producer) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error {
46+
func (p *Sender) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error {
4747
for ctx.Err() == nil {
4848
ctx, cancel := context.WithTimeout(ctx, p.WriterTimeout)
4949
defer cancel()
@@ -76,12 +76,17 @@ func (p *Producer) Send(ctx context.Context, foreignID string, statusType int, h
7676
return ctx.Err()
7777
}
7878

79-
func (p *Producer) Close() error {
79+
func (p *Sender) Close() error {
8080
return p.Writer.Close()
8181
}
8282

83-
func (s StreamConstructor) NewConsumer(ctx context.Context, topic string, name string, opts ...workflow.ConsumerOption) (workflow.Consumer, error) {
84-
var copts workflow.ConsumerOptions
83+
func (s StreamConstructor) NewReceiver(
84+
ctx context.Context,
85+
topic string,
86+
name string,
87+
opts ...workflow.ReceiverOption,
88+
) (workflow.EventReceiver, error) {
89+
var copts workflow.ReceiverOptions
8590
for _, opt := range opts {
8691
opt(&copts)
8792
}
@@ -101,25 +106,25 @@ func (s StreamConstructor) NewConsumer(ctx context.Context, topic string, name s
101106
MaxWait: time.Second,
102107
})
103108

104-
return &Consumer{
109+
return &Receiver{
105110
topic: topic,
106111
name: name,
107112
reader: kafkaReader,
108113
options: copts,
109114
}, nil
110115
}
111116

112-
type Consumer struct {
117+
type Receiver struct {
113118
topic string
114119
name string
115120
reader *kafka.Reader
116-
options workflow.ConsumerOptions
121+
options workflow.ReceiverOptions
117122
}
118123

119-
func (c *Consumer) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) {
124+
func (r *Receiver) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) {
120125
var commit []kafka.Message
121126
for ctx.Err() == nil {
122-
m, err := c.reader.FetchMessage(ctx)
127+
m, err := r.reader.FetchMessage(ctx)
123128
if err != nil {
124129
return nil, nil, err
125130
}
@@ -147,16 +152,16 @@ func (c *Consumer) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err
147152

148153
return event,
149154
func() error {
150-
return c.reader.CommitMessages(ctx, commit...)
155+
return r.reader.CommitMessages(ctx, commit...)
151156
},
152157
nil
153158
}
154159

155160
return nil, nil, ctx.Err()
156161
}
157162

158-
func (c *Consumer) Close() error {
159-
return c.reader.Close()
163+
func (r *Receiver) Close() error {
164+
return r.reader.Close()
160165
}
161166

162-
var _ workflow.Consumer = (*Consumer)(nil)
167+
var _ workflow.EventReceiver = (*Receiver)(nil)

adapters/memstreamer/connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type consumer struct {
5353
cursorStore *cursorStore
5454
cursorName string
5555
clock clock.Clock
56-
options workflow.ConsumerOptions
56+
options workflow.ReceiverOptions
5757
}
5858

5959
func (c *consumer) Recv(ctx context.Context) (*workflow.ConnectorEvent, workflow.Ack, error) {

adapters/memstreamer/memstreamer.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type StreamConstructor struct {
5151
cursorStore *cursorStore
5252
}
5353

54-
func (s StreamConstructor) NewProducer(ctx context.Context, topic string) (workflow.Producer, error) {
54+
func (s StreamConstructor) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) {
5555
s.stream.mu.Lock()
5656
defer s.stream.mu.Unlock()
5757

@@ -63,11 +63,16 @@ func (s StreamConstructor) NewProducer(ctx context.Context, topic string) (workf
6363
}, nil
6464
}
6565

66-
func (s StreamConstructor) NewConsumer(ctx context.Context, topic string, name string, opts ...workflow.ConsumerOption) (workflow.Consumer, error) {
66+
func (s StreamConstructor) NewReceiver(
67+
ctx context.Context,
68+
topic string,
69+
name string,
70+
opts ...workflow.ReceiverOption,
71+
) (workflow.EventReceiver, error) {
6772
s.stream.mu.Lock()
6873
defer s.stream.mu.Unlock()
6974

70-
var options workflow.ConsumerOptions
75+
var options workflow.ReceiverOptions
7176
for _, opt := range opts {
7277
opt(&options)
7378
}
@@ -92,7 +97,7 @@ type Stream struct {
9297
topic string
9398
name string
9499
clock clock.Clock
95-
options workflow.ConsumerOptions
100+
options workflow.ReceiverOptions
96101
}
97102

98103
func (s *Stream) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error {
@@ -147,8 +152,8 @@ func (s *Stream) Close() error {
147152
}
148153

149154
var (
150-
_ workflow.Producer = (*Stream)(nil)
151-
_ workflow.Consumer = (*Stream)(nil)
155+
_ workflow.EventSender = (*Stream)(nil)
156+
_ workflow.EventReceiver = (*Stream)(nil)
152157
)
153158

154159
func newCursorStore() *cursorStore {

adapters/reflexstreamer/reflex.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type constructor struct {
3434
registerGapFiller sync.Once
3535
}
3636

37-
func (c *constructor) NewProducer(ctx context.Context, topic string) (workflow.Producer, error) {
37+
func (c *constructor) NewSender(ctx context.Context, topic string) (workflow.EventSender, error) {
3838
return &Producer{
3939
topic: topic,
4040
writer: c.writer,
@@ -79,8 +79,13 @@ func (p *Producer) Close() error {
7979
return nil
8080
}
8181

82-
func (c *constructor) NewConsumer(ctx context.Context, topic string, name string, opts ...workflow.ConsumerOption) (workflow.Consumer, error) {
83-
var copts workflow.ConsumerOptions
82+
func (c *constructor) NewReceiver(
83+
ctx context.Context,
84+
topic string,
85+
name string,
86+
opts ...workflow.ReceiverOption,
87+
) (workflow.EventReceiver, error) {
88+
var copts workflow.ReceiverOptions
8489
for _, opt := range opts {
8590
opt(&copts)
8691
}
@@ -123,7 +128,7 @@ type Consumer struct {
123128
cursor reflex.CursorStore
124129
reader *sql.DB
125130
streamClient reflex.StreamClient
126-
options workflow.ConsumerOptions
131+
options workflow.ReceiverOptions
127132
}
128133

129134
func (c *Consumer) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) {

adapters/reflexstreamer/reflex_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestStreamerGapFiller(t *testing.T) {
3131
jtest.RequireNil(t, err)
3232

3333
constructor := reflexstreamer.New(dbc, dbc, eventsTable, cTable.ToStore(dbc))
34-
consumer, err := constructor.NewConsumer(context.Background(), "", "")
34+
consumer, err := constructor.NewReceiver(context.Background(), "", "")
3535
jtest.RequireNil(t, err)
3636

3737
for range 2 {

autopause.go

Lines changed: 74 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"k8s.io/utils/clock"
88

99
"github.com/luno/workflow/internal/errorcounter"
10-
"github.com/luno/workflow/internal/metrics"
1110
)
1211

1312
// maybePause will either return a nil error if it has failed to pause the record and should be retried. A non-nil
@@ -20,7 +19,7 @@ func maybePause[Type any, Status StatusType](
2019
originalErr error,
2120
processName string,
2221
run *Run[Type, Status],
23-
logger *logger,
22+
logger Logger,
2423
) (paused bool, err error) {
2524
// Only keep track of errors only if we need to
2625
if pauseAfterErrCount == 0 {
@@ -37,7 +36,7 @@ func maybePause[Type any, Status StatusType](
3736
return false, err
3837
}
3938

40-
logger.maybeDebug(ctx, "paused record after exceeding allowed error count", map[string]string{
39+
logger.Debug(ctx, "paused record after exceeding allowed error count", map[string]string{
4140
"workflow_name": run.WorkflowName,
4241
"foreign_id": run.ForeignID,
4342
"run_id": run.RunID,
@@ -48,88 +47,94 @@ func maybePause[Type any, Status StatusType](
4847
return true, nil
4948
}
5049

51-
type autoPauseRetryConfig struct {
52-
enabled bool
53-
// limit determines the number of records in one lookup cycle.
54-
limit int
55-
// pollingFrequency is the frequency of the lookup cycle that looks up paused records that have met
56-
// or exceeded the resumeAfter duration.
57-
pollingFrequency time.Duration
58-
// resumeAfter is the duration that the record should remain paused for.
59-
resumeAfter time.Duration
60-
}
61-
62-
func defaultAutoPauseRetryConfig() autoPauseRetryConfig {
63-
return autoPauseRetryConfig{
64-
enabled: true,
65-
limit: 10,
66-
pollingFrequency: time.Minute,
67-
resumeAfter: time.Hour,
68-
}
69-
}
50+
func pausedRecordsRetryConsumer[Type any, Status StatusType](w *Workflow[Type, Status]) {
51+
role := makeRole(
52+
w.Name(),
53+
"paused",
54+
"records",
55+
"retry",
56+
"consumer",
57+
)
58+
59+
processName := makeRole(
60+
"paused",
61+
"records",
62+
"retry",
63+
"consumer",
64+
)
65+
w.run(role, processName, func(ctx context.Context) error {
66+
topic := RunStateChangeTopic(w.Name())
67+
stream, err := w.eventStreamer.NewReceiver(
68+
ctx,
69+
topic,
70+
role,
71+
WithReceiverPollFrequency(w.defaultOpts.pollingFrequency),
72+
)
73+
if err != nil {
74+
return err
75+
}
76+
defer stream.Close()
7077

71-
func autoRetryPausedRecordsForever[Type any, Status StatusType](w *Workflow[Type, Status]) {
72-
role := makeRole(w.Name(), "paused-records-retry")
73-
processName := role
78+
lagAlert := w.pausedRecordsRetry.resumeAfter * 3
79+
if lagAlert < time.Minute {
80+
lagAlert = w.pausedRecordsRetry.resumeAfter + time.Minute*5
81+
}
7482

75-
w.run(role, processName, func(ctx context.Context) error {
76-
for {
77-
err := retryPausedRecords(
78-
ctx,
79-
w.Name(),
80-
w.recordStore.List,
83+
return consume(
84+
ctx,
85+
w.Name(),
86+
processName,
87+
stream,
88+
autoRetryConsumer(
89+
w.recordStore.Lookup,
8190
w.recordStore.Store,
8291
w.clock,
83-
processName,
84-
w.autoPauseRetryConfig.limit,
85-
w.autoPauseRetryConfig.resumeAfter,
86-
)
87-
if err != nil {
88-
return err
89-
}
90-
91-
select {
92-
case <-ctx.Done():
93-
return ctx.Err()
94-
case <-w.clock.After(w.autoPauseRetryConfig.pollingFrequency): // Slow and constant drip feed of paused records back into running state.
95-
continue
96-
}
97-
}
92+
w.pausedRecordsRetry.resumeAfter,
93+
),
94+
w.clock,
95+
w.pausedRecordsRetry.resumeAfter,
96+
lagAlert,
97+
)
9898
}, w.defaultOpts.errBackOff)
9999
}
100100

101-
type listFunc func(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error)
102-
103-
func retryPausedRecords(
104-
ctx context.Context,
105-
workflowName string,
106-
list listFunc,
101+
func autoRetryConsumer(
102+
lookupFn lookupFunc,
107103
store storeFunc,
108104
clock clock.Clock,
109-
processName string,
110-
limit int,
111105
retryInterval time.Duration,
112-
) error {
113-
t0 := clock.Now()
114-
115-
rs, err := list(ctx, workflowName, 0, limit, OrderTypeAscending, FilterByRunState(RunStatePaused))
116-
if err != nil {
117-
return err
118-
}
106+
) func(ctx context.Context, e *Event) error {
107+
return func(ctx context.Context, e *Event) error {
108+
record, err := lookupFn(ctx, e.ForeignID)
109+
if err != nil {
110+
return err
111+
}
119112

120-
threshold := clock.Now().Add(-retryInterval)
121-
for _, r := range rs {
122-
if r.UpdatedAt.After(threshold) {
123-
continue
113+
threshold := clock.Now().Add(-retryInterval)
114+
if record.UpdatedAt.After(threshold) {
115+
return nil
124116
}
125117

126-
controller := NewRunStateController(store, &r)
127-
err := controller.Resume(ctx)
118+
controller := NewRunStateController(store, record)
119+
err = controller.Resume(ctx)
128120
if err != nil {
129121
return err
130122
}
123+
124+
return nil
131125
}
126+
}
132127

133-
metrics.ProcessLatency.WithLabelValues(workflowName, processName).Observe(clock.Since(t0).Seconds())
134-
return nil
128+
type pausedRecordsRetry struct {
129+
enabled bool
130+
// resumeAfter is the duration that the record should remain paused for.
131+
resumeAfter time.Duration
132+
}
133+
134+
func defaultPausedRecordsRetry() pausedRecordsRetry {
135+
resumeAfter := time.Hour
136+
return pausedRecordsRetry{
137+
enabled: true,
138+
resumeAfter: resumeAfter,
139+
}
135140
}

0 commit comments

Comments
 (0)