Skip to content

Commit 81845ae

Browse files
Miguel Varela RamosRobertLuciandeliahu
authored
Add concurrency to dequeuer (#2376)
* Add concurrency to dequeuer * Fix dequeuer test * Address golangci-lint issues * Add max concurrency validation for async api kinds * Pass-in argument for number of workers to dequeuer sidecar * Add max concurrency to async configuration.md docs * Return nil when worker shuts down * Update default value for target_in_flight and docs Co-authored-by: Robert Lucian Chiriac <[email protected]> Co-authored-by: David Eliahu <[email protected]>
1 parent bb5a754 commit 81845ae

File tree

11 files changed

+165
-28
lines changed

11 files changed

+165
-28
lines changed

cmd/dequeuer/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func main() {
5050
statsdAddress string
5151
apiKind string
5252
adminPort int
53+
workers int
5354
)
5455
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")
5556
flag.StringVar(&clusterUID, "cluster-uid", "", "cluster unique identifier")
@@ -61,6 +62,7 @@ func main() {
6162
flag.StringVar(&statsdAddress, "statsd-address", "", "address to push statsd metrics")
6263
flag.IntVar(&userContainerPort, "user-port", 8080, "target port to which the dequeued messages will be sent to")
6364
flag.IntVar(&adminPort, "admin-port", 0, "port where the admin server (for the probes) will be exposed")
65+
flag.IntVar(&workers, "workers", 1, "number of workers pulling from the queue")
6466

6567
flag.Parse()
6668

@@ -166,6 +168,7 @@ func main() {
166168
Region: clusterConfig.Region,
167169
QueueURL: queueURL,
168170
StopIfNoMessages: true,
171+
Workers: workers,
169172
}
170173

171174
case userconfig.AsyncAPIKind.String():
@@ -186,6 +189,7 @@ func main() {
186189
Region: clusterConfig.Region,
187190
QueueURL: queueURL,
188191
StopIfNoMessages: false,
192+
Workers: workers,
189193
}
190194

191195
// report prometheus metrics for async api kinds

docs/workloads/async/autoscaling.md

+12-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@ Cortex auto-scales AsyncAPIs on a per-API basis based on your configuration.
44

55
## Autoscaling replicas
66

7+
### Relevant pod configuration
8+
9+
In addition to the autoscaling configuration options (described below), there is one field in the pod configuration which is relevant to replica autoscaling:
10+
11+
**`max_concurrency`** (default: 1): The maximum number of requests that will be concurrently sent into the container by Cortex. If your web server is designed to handle multiple concurrent requests, increasing `max_concurrency` will increase the throughput of a replica (and result in fewer total replicas for a given load).
12+
13+
<br>
14+
715
### Autoscaling configuration
816

917
**`min_replicas`** (default: 1): The lower bound on how many replicas can be running for an API. Scale-to-zero is supported.
@@ -14,13 +22,13 @@ Cortex auto-scales AsyncAPIs on a per-API basis based on your configuration.
1422

1523
<br>
1624

17-
**`target_in_flight`** (default: 1): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. The number of in-flight requests is simply how many requests have been submitted and are not yet finished being processed. Therefore, this number includes requests which are actively being processed as well as requests which are waiting in the queue.
25+
**`target_in_flight`** (default: `max_concurrency` in the pod configuration): This is the desired number of in-flight requests per replica, and is the metric which the autoscaler uses to make scaling decisions. The number of in-flight requests is simply how many requests have been submitted and are not yet finished being processed. Therefore, this number includes requests which are actively being processed as well as requests which are waiting in the queue.
1826

1927
The autoscaler uses this formula to determine the number of desired replicas:
2028

2129
`desired replicas = total in-flight requests / target_in_flight`
2230

23-
For example, setting `target_in_flight` to 1 (the default) causes the cluster to adjust the number of replicas so that on average, there are no requests waiting in the queue.
31+
For example, setting `target_in_flight` to `max_concurrency` (the default) causes the cluster to adjust the number of replicas so that on average, there are no requests waiting in the queue.
2432

2533
<br>
2634

@@ -58,9 +66,9 @@ Cortex spins up and down instances based on the aggregate resource requests of a
5866

5967
## Overprovisioning
6068

61-
The default value for `target_in_flight` is 1, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity).
69+
The default value for `target_in_flight` is `max_concurrency`, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value relative to the expected replica's concurrency. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity).
6270

63-
For example, if you wanted to overprovision by 25%, you could set `target_in_flight` to 0.8. If your API has an average of 4 concurrent requests, the autoscaler would maintain 5 live replicas (4/0.8 = 5).
71+
For example, if you've determined that each replica in your API can efficiently handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5).
6472

6573
## Autoscaling responsiveness
6674

docs/workloads/async/configuration.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
kind: AsyncAPI # must be "AsyncAPI" for async APIs (required)
66
pod: # pod configuration (required)
77
port: <int> # port to which requests will be sent (default: 8080; exported as $CORTEX_PORT)
8+
max_concurrency: <int> # maximum number of requests that will be concurrently sent into the container (default: 1, max allowed: 100)
89
containers: # configurations for the containers to run (at least one constainer must be provided)
910
- name: <string> # name of the container (required)
1011
image: <string> # docker image to use for the container (required)
@@ -45,7 +46,7 @@
4546
min_replicas: <int> # minimum number of replicas (default: 1; min value: 0)
4647
max_replicas: <int> # maximum number of replicas (default: 100)
4748
init_replicas: <int> # initial number of replicas (default: <min_replicas>)
48-
target_in_flight: <float> # desired number of in-flight requests per replica (including requests actively being processed as well as queued), which the autoscaler tries to maintain (default: 1)
49+
target_in_flight: <float> # desired number of in-flight requests per replica (including requests actively being processed as well as queued), which the autoscaler tries to maintain (default: <max_concurrency>)
4950
window: <duration> # duration over which to average the API's in-flight requests per replica (default: 60s)
5051
downscale_stabilization_period: <duration> # the API will not scale below the highest recommendation made during this period (default: 5m)
5152
upscale_stabilization_period: <duration> # the API will not scale above the lowest recommendation made during this period (default: 1m)

docs/workloads/realtime/autoscaling.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ Cortex spins up and down instances based on the aggregate resource requests of a
7272

7373
The default value for `target_in_flight` is `max_concurrency`, which behaves well in many situations (see above for an explanation of how `target_in_flight` affects autoscaling). However, if your application is sensitive to spikes in traffic or if creating new replicas takes too long (see below), you may find it helpful to maintain extra capacity to handle the increased traffic while new replicas are being created. This can be accomplished by setting `target_in_flight` to a lower value relative to the expected replica's concurrency. The smaller `target_in_flight` is, the more unused capacity your API will have, and the more room it will have to handle sudden increased load. The increased request rate will still trigger the autoscaler, and your API will stabilize again (maintaining the overprovisioned capacity).
7474

75-
For example, if you've determined that each replica in your API can handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5).
75+
For example, if you've determined that each replica in your API can efficiently handle 2 concurrent requests, you would typically set `target_in_flight` to 2. In a scenario where your API is receiving 8 concurrent requests on average, the autoscaler would maintain 4 live replicas (8/2 = 4). If you wanted to overprovision by 25%, you could set `target_in_flight` to 1.6, causing the autoscaler maintain 5 live replicas (8/1.6 = 5).
7676

7777
## Autoscaling responsiveness
7878

pkg/dequeuer/batch_handler.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (h *BatchMessageHandler) handleBatch(message *sqs.Message) error {
158158
return nil
159159
}
160160

161-
endTime := time.Now().Sub(startTime)
161+
endTime := time.Since(startTime)
162162

163163
err = h.recordSuccess()
164164
if err != nil {
@@ -175,7 +175,7 @@ func (h *BatchMessageHandler) handleBatch(message *sqs.Message) error {
175175
func (h *BatchMessageHandler) onJobComplete(message *sqs.Message) error {
176176
shouldRunOnJobComplete := false
177177
h.log.Info("received job_complete message")
178-
for true {
178+
for {
179179
queueAttributes, err := GetQueueAttributes(h.aws, h.config.QueueURL)
180180
if err != nil {
181181
return err
@@ -223,8 +223,6 @@ func (h *BatchMessageHandler) onJobComplete(message *sqs.Message) error {
223223

224224
time.Sleep(h.jobCompleteMessageDelay)
225225
}
226-
227-
return nil
228226
}
229227

230228
func isOnJobCompleteMessage(message *sqs.Message) bool {

pkg/dequeuer/dequeuer.go

+31-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/aws/aws-sdk-go/service/sqs"
2424
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
2525
"github.com/cortexlabs/cortex/pkg/lib/errors"
26+
"github.com/cortexlabs/cortex/pkg/lib/math"
2627
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
2728
"go.uber.org/zap"
2829
)
@@ -40,6 +41,7 @@ type SQSDequeuerConfig struct {
4041
Region string
4142
QueueURL string
4243
StopIfNoMessages bool
44+
Workers int
4345
}
4446

4547
type SQSDequeuer struct {
@@ -96,12 +98,37 @@ func (d *SQSDequeuer) ReceiveMessage() (*sqs.Message, error) {
9698
}
9799

98100
func (d *SQSDequeuer) Start(messageHandler MessageHandler, readinessProbeFunc func() bool) error {
101+
numWorkers := math.MaxInt(d.config.Workers, 1)
102+
103+
d.log.Infof("Starting %d workers", numWorkers)
104+
errCh := make(chan error)
105+
doneChs := make([]chan struct{}, d.config.Workers)
106+
for i := 0; i < numWorkers; i++ {
107+
doneChs[i] = make(chan struct{})
108+
go func(i int) {
109+
errCh <- d.worker(messageHandler, readinessProbeFunc, doneChs[i])
110+
}(i)
111+
}
112+
113+
select {
114+
case err := <-errCh:
115+
return err
116+
case <-d.done:
117+
for _, doneCh := range doneChs {
118+
doneCh <- struct{}{}
119+
}
120+
}
121+
122+
return nil
123+
}
124+
125+
func (d SQSDequeuer) worker(messageHandler MessageHandler, readinessProbeFunc func() bool, workerDone chan struct{}) error {
99126
noMessagesInPreviousIteration := false
100127

101128
loop:
102129
for {
103130
select {
104-
case <-d.done:
131+
case <-workerDone:
105132
break loop
106133
default:
107134
if !readinessProbeFunc() {
@@ -134,8 +161,8 @@ loop:
134161

135162
noMessagesInPreviousIteration = false
136163
receiptHandle := *message.ReceiptHandle
137-
done := d.StartMessageRenewer(receiptHandle)
138-
err = d.handleMessage(message, messageHandler, done)
164+
renewerDone := d.StartMessageRenewer(receiptHandle)
165+
err = d.handleMessage(message, messageHandler, renewerDone)
139166
if err != nil {
140167
d.log.Error(err)
141168
telemetry.Error(err)
@@ -196,7 +223,7 @@ func (d *SQSDequeuer) StartMessageRenewer(receiptHandle string) chan struct{} {
196223
startTime := time.Now()
197224
go func() {
198225
defer ticker.Stop()
199-
for true {
226+
for {
200227
select {
201228
case <-done:
202229
return

pkg/dequeuer/dequeuer_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/aws/aws-sdk-go/service/sqs"
3232
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
3333
"github.com/cortexlabs/cortex/pkg/lib/random"
34+
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
3435
"github.com/ory/dockertest/v3"
3536
dc "github.com/ory/dockertest/v3/docker"
3637
"github.com/stretchr/testify/require"
@@ -179,6 +180,7 @@ func TestSQSDequeuer_ReceiveMessage(t *testing.T) {
179180
Region: _localStackDefaultRegion,
180181
QueueURL: queueURL,
181182
StopIfNoMessages: true,
183+
Workers: 1,
182184
}, awsClient, logger,
183185
)
184186
require.NoError(t, err)
@@ -205,6 +207,7 @@ func TestSQSDequeuer_StartMessageRenewer(t *testing.T) {
205207
Region: _localStackDefaultRegion,
206208
QueueURL: queueURL,
207209
StopIfNoMessages: true,
210+
Workers: 1,
208211
}, awsClient, logger,
209212
)
210213
require.NoError(t, err)
@@ -253,6 +256,7 @@ func TestSQSDequeuerTerminationOnEmptyQueue(t *testing.T) {
253256
Region: _localStackDefaultRegion,
254257
QueueURL: queueURL,
255258
StopIfNoMessages: true,
259+
Workers: 1,
256260
}, awsClient, logger,
257261
)
258262
require.NoError(t, err)
@@ -303,6 +307,7 @@ func TestSQSDequeuer_Shutdown(t *testing.T) {
303307
Region: _localStackDefaultRegion,
304308
QueueURL: queueURL,
305309
StopIfNoMessages: true,
310+
Workers: 1,
306311
}, awsClient, logger,
307312
)
308313
require.NoError(t, err)
@@ -345,6 +350,7 @@ func TestSQSDequeuer_Start_HandlerError(t *testing.T) {
345350
Region: _localStackDefaultRegion,
346351
QueueURL: queueURL,
347352
StopIfNoMessages: true,
353+
Workers: 1,
348354
}, awsClient, logger,
349355
)
350356
require.NoError(t, err)
@@ -383,3 +389,72 @@ func TestSQSDequeuer_Start_HandlerError(t *testing.T) {
383389
return msg != nil
384390
}, 5*time.Second, time.Second)
385391
}
392+
393+
func TestSQSDequeuer_MultipleWorkers(t *testing.T) {
394+
t.Parallel()
395+
396+
awsClient := testAWSClient(t)
397+
queueURL := createQueue(t, awsClient)
398+
399+
numMessages := 3
400+
expectedMsgs := make([]string, numMessages)
401+
for i := 0; i < numMessages; i++ {
402+
message := fmt.Sprintf("%d", i)
403+
expectedMsgs[i] = message
404+
_, err := awsClient.SQS().SendMessage(&sqs.SendMessageInput{
405+
MessageBody: aws.String(message),
406+
MessageDeduplicationId: aws.String(message),
407+
MessageGroupId: aws.String(message),
408+
QueueUrl: aws.String(queueURL),
409+
})
410+
require.NoError(t, err)
411+
}
412+
413+
logger := newLogger(t)
414+
defer func() { _ = logger.Sync() }()
415+
416+
dq, err := NewSQSDequeuer(
417+
SQSDequeuerConfig{
418+
Region: _localStackDefaultRegion,
419+
QueueURL: queueURL,
420+
StopIfNoMessages: true,
421+
Workers: numMessages,
422+
}, awsClient, logger,
423+
)
424+
require.NoError(t, err)
425+
426+
dq.waitTimeSeconds = aws.Int64(0)
427+
dq.notFoundSleepTime = 0
428+
429+
msgCh := make(chan string, numMessages)
430+
handler := NewMessageHandlerFunc(
431+
func(message *sqs.Message) error {
432+
msgCh <- *message.Body
433+
return nil
434+
},
435+
)
436+
437+
errCh := make(chan error)
438+
go func() {
439+
errCh <- dq.Start(handler, func() bool { return true })
440+
}()
441+
442+
receivedMessages := make([]string, numMessages)
443+
for i := 0; i < numMessages; i++ {
444+
receivedMessages[i] = <-msgCh
445+
}
446+
dq.Shutdown()
447+
448+
// timeout test after 10 seconds
449+
time.AfterFunc(10*time.Second, func() {
450+
close(msgCh)
451+
errCh <- errors.New("test timed out")
452+
})
453+
454+
require.Len(t, receivedMessages, numMessages)
455+
456+
set := strset.FromSlice(receivedMessages)
457+
require.True(t, set.Has(expectedMsgs...))
458+
459+
require.NoError(t, <-errCh)
460+
}

pkg/dequeuer/probes.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ func ProbesFromFile(probesPath string, logger *zap.SugaredLogger) ([]*probe.Prob
3535
return nil, err
3636
}
3737

38-
var probesSlice []*probe.Probe
38+
probesSlice := make([]*probe.Probe, len(probesMap))
39+
var i int
3940
for _, p := range probesMap {
4041
auxProbe := p
41-
probesSlice = append(probesSlice, probe.NewProbe(&auxProbe, logger))
42+
probesSlice[i] = probe.NewProbe(&auxProbe, logger)
43+
i++
4244
}
4345
return probesSlice, nil
4446
}

pkg/types/spec/validations.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,19 @@ func podValidation(kind userconfig.Kind) *cr.StructFieldValidation {
189189
)
190190
}
191191

192+
if kind == userconfig.AsyncAPIKind {
193+
validation.StructValidation.StructFieldValidations = append(validation.StructValidation.StructFieldValidations,
194+
&cr.StructFieldValidation{
195+
StructField: "MaxConcurrency",
196+
Int64Validation: &cr.Int64Validation{
197+
Default: consts.DefaultMaxConcurrency,
198+
GreaterThan: pointer.Int64(0),
199+
LessThanOrEqualTo: pointer.Int64(100),
200+
},
201+
},
202+
)
203+
}
204+
192205
return validation
193206
}
194207

@@ -818,7 +831,7 @@ func validateAutoscaling(api *userconfig.API) error {
818831

819832
if api.Kind == userconfig.AsyncAPIKind {
820833
if autoscaling.TargetInFlight == nil {
821-
autoscaling.TargetInFlight = pointer.Float64(1)
834+
autoscaling.TargetInFlight = pointer.Float64(float64(pod.MaxConcurrency))
822835
}
823836
}
824837

pkg/types/userconfig/api.go

+8
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ func (api *API) ToK8sAnnotations() map[string]string {
164164
annotations[MaxQueueLengthAnnotationKey] = s.Int64(api.Pod.MaxQueueLength)
165165
}
166166

167+
if api.Pod != nil && api.Kind == AsyncAPIKind {
168+
annotations[MaxConcurrencyAnnotationKey] = s.Int64(api.Pod.MaxConcurrency)
169+
}
170+
167171
if api.Networking != nil {
168172
annotations[EndpointAnnotationKey] = *api.Networking.Endpoint
169173
}
@@ -339,6 +343,10 @@ func (pod *Pod) UserStr(kind Kind) string {
339343
sb.WriteString(fmt.Sprintf("%s: %s\n", MaxQueueLengthKey, s.Int64(pod.MaxQueueLength)))
340344
}
341345

346+
if kind == AsyncAPIKind {
347+
sb.WriteString(fmt.Sprintf("%s: %s\n", MaxConcurrencyKey, s.Int64(pod.MaxConcurrency)))
348+
}
349+
342350
sb.WriteString(fmt.Sprintf("%s:\n", ContainersKey))
343351
for _, container := range pod.Containers {
344352
containerUserStr := s.Indent(container.UserStr(), " ")

0 commit comments

Comments
 (0)