Skip to content

Commit 7007f01

Browse files
authored
Auto-scale DynamoDB provision based on Prometheus metrics (#841)
* Refactor: Move AWS-specific function into aws directory * Refactor: pull DynamoDB expected table results and fixture config out to functions * Refactor: Simplify test fixtures * Auto-scale DynamoDB provision based on Prometheus metrics Move old-style AWS autoscaling to a separate file; make metrics autoscaling implement the common interface. * Improve logging of DynamoDB settings updates * Warn but don't fail on limit-exceeded error, and bump failure counter * Change cooldown default from 50 to 30 minutes
1 parent 54406c3 commit 7007f01

17 files changed

+1571
-641
lines changed

Gopkg.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/table-manager/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ func main() {
3737
schemaConfig.IndexTables.WriteScale.Enabled ||
3838
schemaConfig.ChunkTables.InactiveWriteScale.Enabled ||
3939
schemaConfig.IndexTables.InactiveWriteScale.Enabled) &&
40-
storageConfig.AWSStorageConfig.ApplicationAutoScaling.URL == nil {
41-
level.Error(util.Logger).Log("msg", "WriteScale is enabled but no ApplicationAutoScaling URL has been provided")
40+
(storageConfig.AWSStorageConfig.ApplicationAutoScaling.URL == nil && storageConfig.AWSStorageConfig.Metrics.URL == "") {
41+
level.Error(util.Logger).Log("msg", "WriteScale is enabled but no ApplicationAutoScaling or Metrics URL has been provided")
4242
os.Exit(1)
4343
}
4444

pkg/chunk/aws/aws_autoscaling.go

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package aws
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/aws/aws-sdk-go/aws"
8+
"github.com/aws/aws-sdk-go/service/applicationautoscaling"
9+
"github.com/aws/aws-sdk-go/service/applicationautoscaling/applicationautoscalingiface"
10+
"github.com/go-kit/kit/log/level"
11+
"github.com/prometheus/client_golang/prometheus"
12+
13+
"github.com/weaveworks/common/instrument"
14+
"github.com/weaveworks/cortex/pkg/chunk"
15+
"github.com/weaveworks/cortex/pkg/util"
16+
)
17+
18+
const (
19+
autoScalingPolicyNamePrefix = "DynamoScalingPolicy_cortex_"
20+
)
21+
22+
var applicationAutoScalingRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
23+
Namespace: "cortex",
24+
Name: "application_autoscaling_request_duration_seconds",
25+
Help: "Time spent doing ApplicationAutoScaling requests.",
26+
27+
// AWS latency seems to range from a few ms to a few sec. So use 8 buckets
28+
// from 128us to 2s. TODO: Confirm that this is the case for ApplicationAutoScaling.
29+
Buckets: prometheus.ExponentialBuckets(0.000128, 4, 8),
30+
}, []string{"operation", "status_code"})
31+
32+
func init() {
33+
prometheus.MustRegister(applicationAutoScalingRequestDuration)
34+
}
35+
36+
type awsAutoscale struct {
37+
call callManager
38+
ApplicationAutoScaling applicationautoscalingiface.ApplicationAutoScalingAPI
39+
}
40+
41+
func newAWSAutoscale(cfg DynamoDBConfig, callManager callManager) (*awsAutoscale, error) {
42+
session, err := awsSessionFromURL(cfg.ApplicationAutoScaling.URL)
43+
if err != nil {
44+
return nil, err
45+
}
46+
return &awsAutoscale{
47+
call: callManager,
48+
ApplicationAutoScaling: applicationautoscaling.New(session),
49+
}, nil
50+
}
51+
52+
func (a *awsAutoscale) PostCreateTable(ctx context.Context, desc chunk.TableDesc) error {
53+
if desc.WriteScale.Enabled {
54+
return a.enableAutoScaling(ctx, desc)
55+
}
56+
return nil
57+
}
58+
59+
func (a *awsAutoscale) DescribeTable(ctx context.Context, desc *chunk.TableDesc) error {
60+
err := a.call.backoffAndRetry(ctx, func(ctx context.Context) error {
61+
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DescribeScalableTargetsWithContext", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
62+
out, err := a.ApplicationAutoScaling.DescribeScalableTargetsWithContext(ctx, &applicationautoscaling.DescribeScalableTargetsInput{
63+
ResourceIds: []*string{aws.String("table/" + desc.Name)},
64+
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
65+
ServiceNamespace: aws.String("dynamodb"),
66+
})
67+
if err != nil {
68+
return err
69+
}
70+
switch l := len(out.ScalableTargets); l {
71+
case 0:
72+
return err
73+
case 1:
74+
desc.WriteScale.Enabled = true
75+
if target := out.ScalableTargets[0]; target != nil {
76+
if target.RoleARN != nil {
77+
desc.WriteScale.RoleARN = *target.RoleARN
78+
}
79+
if target.MinCapacity != nil {
80+
desc.WriteScale.MinCapacity = *target.MinCapacity
81+
}
82+
if target.MaxCapacity != nil {
83+
desc.WriteScale.MaxCapacity = *target.MaxCapacity
84+
}
85+
}
86+
return err
87+
default:
88+
return fmt.Errorf("more than one scalable target found for DynamoDB table")
89+
}
90+
})
91+
})
92+
if err != nil {
93+
return err
94+
}
95+
96+
err = a.call.backoffAndRetry(ctx, func(ctx context.Context) error {
97+
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DescribeScalingPoliciesWithContext", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
98+
out, err := a.ApplicationAutoScaling.DescribeScalingPoliciesWithContext(ctx, &applicationautoscaling.DescribeScalingPoliciesInput{
99+
PolicyNames: []*string{aws.String(autoScalingPolicyNamePrefix + desc.Name)},
100+
ResourceId: aws.String("table/" + desc.Name),
101+
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
102+
ServiceNamespace: aws.String("dynamodb"),
103+
})
104+
if err != nil {
105+
return err
106+
}
107+
switch l := len(out.ScalingPolicies); l {
108+
case 0:
109+
return err
110+
case 1:
111+
config := out.ScalingPolicies[0].TargetTrackingScalingPolicyConfiguration
112+
if config != nil {
113+
if config.ScaleInCooldown != nil {
114+
desc.WriteScale.InCooldown = *config.ScaleInCooldown
115+
}
116+
if config.ScaleOutCooldown != nil {
117+
desc.WriteScale.OutCooldown = *config.ScaleOutCooldown
118+
}
119+
if config.TargetValue != nil {
120+
desc.WriteScale.TargetValue = *config.TargetValue
121+
}
122+
}
123+
return err
124+
default:
125+
return fmt.Errorf("more than one scaling policy found for DynamoDB table")
126+
}
127+
})
128+
})
129+
return err
130+
}
131+
132+
func (a *awsAutoscale) UpdateTable(ctx context.Context, current chunk.TableDesc, expected *chunk.TableDesc) error {
133+
var err error
134+
if !current.WriteScale.Enabled {
135+
if expected.WriteScale.Enabled {
136+
level.Info(util.Logger).Log("msg", "enabling autoscaling on table", "table")
137+
err = a.enableAutoScaling(ctx, *expected)
138+
}
139+
} else {
140+
if !expected.WriteScale.Enabled {
141+
level.Info(util.Logger).Log("msg", "disabling autoscaling on table", "table")
142+
err = a.disableAutoScaling(ctx, *expected)
143+
} else if current.WriteScale != expected.WriteScale {
144+
level.Info(util.Logger).Log("msg", "enabling autoscaling on table", "table")
145+
err = a.enableAutoScaling(ctx, *expected)
146+
}
147+
}
148+
return err
149+
}
150+
151+
func (a *awsAutoscale) enableAutoScaling(ctx context.Context, desc chunk.TableDesc) error {
152+
// Registers or updates a scalable target
153+
if err := a.call.backoffAndRetry(ctx, func(ctx context.Context) error {
154+
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.RegisterScalableTarget", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
155+
input := &applicationautoscaling.RegisterScalableTargetInput{
156+
MinCapacity: aws.Int64(desc.WriteScale.MinCapacity),
157+
MaxCapacity: aws.Int64(desc.WriteScale.MaxCapacity),
158+
ResourceId: aws.String("table/" + desc.Name),
159+
RoleARN: aws.String(desc.WriteScale.RoleARN),
160+
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
161+
ServiceNamespace: aws.String("dynamodb"),
162+
}
163+
_, err := a.ApplicationAutoScaling.RegisterScalableTarget(input)
164+
if err != nil {
165+
return err
166+
}
167+
return nil
168+
})
169+
}); err != nil {
170+
return err
171+
}
172+
173+
// Puts or updates a scaling policy
174+
return a.call.backoffAndRetry(ctx, func(ctx context.Context) error {
175+
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.PutScalingPolicy", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
176+
input := &applicationautoscaling.PutScalingPolicyInput{
177+
PolicyName: aws.String(autoScalingPolicyNamePrefix + desc.Name),
178+
PolicyType: aws.String("TargetTrackingScaling"),
179+
ResourceId: aws.String("table/" + desc.Name),
180+
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
181+
ServiceNamespace: aws.String("dynamodb"),
182+
TargetTrackingScalingPolicyConfiguration: &applicationautoscaling.TargetTrackingScalingPolicyConfiguration{
183+
PredefinedMetricSpecification: &applicationautoscaling.PredefinedMetricSpecification{
184+
PredefinedMetricType: aws.String("DynamoDBWriteCapacityUtilization"),
185+
},
186+
ScaleInCooldown: aws.Int64(desc.WriteScale.InCooldown),
187+
ScaleOutCooldown: aws.Int64(desc.WriteScale.OutCooldown),
188+
TargetValue: aws.Float64(desc.WriteScale.TargetValue),
189+
},
190+
}
191+
_, err := a.ApplicationAutoScaling.PutScalingPolicy(input)
192+
return err
193+
})
194+
})
195+
}
196+
197+
func (a *awsAutoscale) disableAutoScaling(ctx context.Context, desc chunk.TableDesc) error {
198+
// Deregister scalable target
199+
if err := a.call.backoffAndRetry(ctx, func(ctx context.Context) error {
200+
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DeregisterScalableTarget", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
201+
input := &applicationautoscaling.DeregisterScalableTargetInput{
202+
ResourceId: aws.String("table/" + desc.Name),
203+
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
204+
ServiceNamespace: aws.String("dynamodb"),
205+
}
206+
_, err := a.ApplicationAutoScaling.DeregisterScalableTarget(input)
207+
return err
208+
})
209+
}); err != nil {
210+
return err
211+
}
212+
213+
// Delete scaling policy
214+
return a.call.backoffAndRetry(ctx, func(ctx context.Context) error {
215+
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DeleteScalingPolicy", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
216+
input := &applicationautoscaling.DeleteScalingPolicyInput{
217+
PolicyName: aws.String(autoScalingPolicyNamePrefix + desc.Name),
218+
ResourceId: aws.String("table/" + desc.Name),
219+
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
220+
ServiceNamespace: aws.String("dynamodb"),
221+
}
222+
_, err := a.ApplicationAutoScaling.DeleteScalingPolicy(input)
223+
return err
224+
})
225+
})
226+
}

0 commit comments

Comments
 (0)