Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451
* [ENHANCEMENT] Alertmanager: Add the alert name in error log when it get throttled. #5456
* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476
* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
34 changes: 23 additions & 11 deletions pkg/ring/kv/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,37 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool)
return keys, totalCapacity, nil
}

func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) error {
func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) (float64, error) {
input := &dynamodb.DeleteItemInput{
TableName: kv.tableName,
Key: generateItemKey(key),
}
_, err := kv.ddbClient.DeleteItemWithContext(ctx, input)
return err
totalCapacity := float64(0)
output, err := kv.ddbClient.DeleteItemWithContext(ctx, input)
if err != nil {
totalCapacity = getCapacityUnits(output.ConsumedCapacity)
}
return totalCapacity, err
}

func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) error {
func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (float64, error) {
input := &dynamodb.PutItemInput{
TableName: kv.tableName,
Item: kv.generatePutItemRequest(key, data),
}
_, err := kv.ddbClient.PutItemWithContext(ctx, input)
return err
totalCapacity := float64(0)
output, err := kv.ddbClient.PutItemWithContext(ctx, input)
if err != nil {
totalCapacity = getCapacityUnits(output.ConsumedCapacity)
}
return totalCapacity, err
}

func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) (float64, error) {
totalCapacity := float64(0)
writeRequestSize := len(put) + len(delete)
if writeRequestSize == 0 {
return nil
return totalCapacity, nil
}

writeRequestsSlices := make([][]*dynamodb.WriteRequest, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit))))
Expand Down Expand Up @@ -220,15 +229,18 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele

resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input)
if err != nil {
return err
return totalCapacity, err
}
for _, consumedCapacity := range resp.ConsumedCapacity {
totalCapacity += getCapacityUnits(consumedCapacity)
}

if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 {
return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems)
return totalCapacity, fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems)
}
}

return nil
return totalCapacity, nil
}

func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue {
Expand Down
14 changes: 7 additions & 7 deletions pkg/ring/kv/dynamodb/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_TTLDisabled(t *testing.T) {
}

ddb := newDynamodbClientMock("TEST", ddbClientMock, 0)
err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
_, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
require.NoError(t, err)

}
Expand All @@ -41,7 +41,7 @@ func Test_TTL(t *testing.T) {
}

ddb := newDynamodbClientMock("TEST", ddbClientMock, 5*time.Hour)
err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
_, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
require.NoError(t, err)
}

Expand Down Expand Up @@ -72,7 +72,7 @@ func Test_Batch(t *testing.T) {
}

ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
err := ddb.Batch(context.TODO(), update, delete)
_, err := ddb.Batch(context.TODO(), update, delete)
require.NoError(t, err)
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func Test_BatchSlices(t *testing.T) {
delete = append(delete, ddbKeyDelete)
}

err := ddb.Batch(context.TODO(), nil, delete)
_, err := ddb.Batch(context.TODO(), nil, delete)
require.NoError(t, err)
require.EqualValues(t, tc.expectedCalls, numOfCalls)

Expand All @@ -134,7 +134,7 @@ func Test_EmptyBatch(t *testing.T) {
ddbClientMock := &mockDynamodb{}

ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
err := ddb.Batch(context.TODO(), nil, nil)
_, err := ddb.Batch(context.TODO(), nil, nil)
require.NoError(t, err)
}

Expand All @@ -159,7 +159,7 @@ func Test_Batch_UnprocessedItems(t *testing.T) {
}

ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
err := ddb.Batch(context.TODO(), nil, delete)
_, err := ddb.Batch(context.TODO(), nil, delete)
require.Errorf(t, err, "error processing batch dynamodb")
}

Expand All @@ -178,7 +178,7 @@ func Test_Batch_Error(t *testing.T) {
}

ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
err := ddb.Batch(context.TODO(), nil, delete)
_, err := ddb.Batch(context.TODO(), nil, delete)
require.Errorf(t, err, "mocked error")
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/ring/kv/dynamodb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics {
}, []string{"operation", "status_code"}))

dynamodbUsageMetrics := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "dynamodb_kv_read_capacity_total",
Help: "Total used read capacity on dynamodb",
Name: "dynamodb_kv_consumed_capacity_total",
Help: "Total consumed capacity on dynamodb",
}, []string{"operation"})

dynamodbMetrics := dynamodbMetrics{
Expand Down Expand Up @@ -66,19 +66,25 @@ func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isP

func (d dynamodbInstrumentation) Delete(ctx context.Context, key dynamodbKey) error {
return instrument.CollectedRequest(ctx, "Delete", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
return d.kv.Delete(ctx, key)
totalCapacity, err := d.kv.Delete(ctx, key)
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Delete").Add(totalCapacity)
return err
})
}

func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data []byte) error {
return instrument.CollectedRequest(ctx, "Put", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
return d.kv.Put(ctx, key, data)
totalCapacity, err := d.kv.Put(ctx, key, data)
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Put").Add(totalCapacity)
return err
})
}

func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
return d.kv.Batch(ctx, put, delete)
totalCapacity, err := d.kv.Batch(ctx, put, delete)
d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity)
return err
})
}

Expand Down