diff --git a/CHANGELOG.md b/CHANGELOG.md index 89b87452dbb..f648c08d0c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ * [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451 * [ENHANCEMENT] Alertmanager: Add the alert name in error log when it get throttled. #5456 * [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476 +* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index 6d2e3d65c8c..1fc356a4934 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -157,28 +157,37 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) return keys, totalCapacity, nil } -func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) error { +func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) (float64, error) { input := &dynamodb.DeleteItemInput{ TableName: kv.tableName, Key: generateItemKey(key), } - _, err := kv.ddbClient.DeleteItemWithContext(ctx, input) - return err + totalCapacity := float64(0) + output, err := kv.ddbClient.DeleteItemWithContext(ctx, input) + if err != nil { + totalCapacity = getCapacityUnits(output.ConsumedCapacity) + } + return totalCapacity, err } -func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) error { +func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (float64, error) { input := &dynamodb.PutItemInput{ TableName: kv.tableName, Item: kv.generatePutItemRequest(key, data), } - _, err := kv.ddbClient.PutItemWithContext(ctx, input) - return err + totalCapacity := float64(0) + output, err := kv.ddbClient.PutItemWithContext(ctx, input) + if err != nil { + totalCapacity = getCapacityUnits(output.ConsumedCapacity) + } + return totalCapacity, err } -func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) (float64, error) { + totalCapacity := float64(0) writeRequestSize := len(put) + len(delete) if writeRequestSize == 0 { - return nil + return totalCapacity, nil } writeRequestsSlices := make([][]*dynamodb.WriteRequest, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit)))) @@ -220,15 +229,18 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input) if err != nil { - return err + return totalCapacity, err + } + for _, consumedCapacity := range resp.ConsumedCapacity { + totalCapacity += getCapacityUnits(consumedCapacity) } if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 { - return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems) + return totalCapacity, fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems) } } - return nil + return totalCapacity, nil } func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue { diff --git a/pkg/ring/kv/dynamodb/dynamodb_test.go b/pkg/ring/kv/dynamodb/dynamodb_test.go index b7e04c1164b..d69ef5707de 100644 --- a/pkg/ring/kv/dynamodb/dynamodb_test.go +++ b/pkg/ring/kv/dynamodb/dynamodb_test.go @@ -23,7 +23,7 @@ func Test_TTLDisabled(t *testing.T) { } ddb := newDynamodbClientMock("TEST", ddbClientMock, 0) - err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST")) + _, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST")) require.NoError(t, err) } @@ -41,7 +41,7 @@ func Test_TTL(t *testing.T) { } ddb := newDynamodbClientMock("TEST", ddbClientMock, 5*time.Hour) - err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST")) + _, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST")) require.NoError(t, err) } @@ -72,7 +72,7 @@ func Test_Batch(t *testing.T) { } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - err := ddb.Batch(context.TODO(), update, delete) + _, err := ddb.Batch(context.TODO(), update, delete) require.NoError(t, err) } @@ -120,7 +120,7 @@ func Test_BatchSlices(t *testing.T) { delete = append(delete, ddbKeyDelete) } - err := ddb.Batch(context.TODO(), nil, delete) + _, err := ddb.Batch(context.TODO(), nil, delete) require.NoError(t, err) require.EqualValues(t, tc.expectedCalls, numOfCalls) @@ -134,7 +134,7 @@ func Test_EmptyBatch(t *testing.T) { ddbClientMock := &mockDynamodb{} ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - err := ddb.Batch(context.TODO(), nil, nil) + _, err := ddb.Batch(context.TODO(), nil, nil) require.NoError(t, err) } @@ -159,7 +159,7 @@ func Test_Batch_UnprocessedItems(t *testing.T) { } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - err := ddb.Batch(context.TODO(), nil, delete) + _, err := ddb.Batch(context.TODO(), nil, delete) require.Errorf(t, err, "error processing batch dynamodb") } @@ -178,7 +178,7 @@ func Test_Batch_Error(t *testing.T) { } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - err := ddb.Batch(context.TODO(), nil, delete) + _, err := ddb.Batch(context.TODO(), nil, delete) require.Errorf(t, err, "mocked error") } diff --git a/pkg/ring/kv/dynamodb/metrics.go b/pkg/ring/kv/dynamodb/metrics.go index d47f2fe3929..356e58a3581 100644 --- a/pkg/ring/kv/dynamodb/metrics.go +++ b/pkg/ring/kv/dynamodb/metrics.go @@ -29,8 +29,8 @@ func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics { }, []string{"operation", "status_code"})) dynamodbUsageMetrics := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "dynamodb_kv_read_capacity_total", - Help: "Total used read capacity on dynamodb", + Name: "dynamodb_kv_consumed_capacity_total", + Help: "Total consumed capacity on dynamodb", }, []string{"operation"}) dynamodbMetrics := dynamodbMetrics{ @@ -66,19 +66,25 @@ func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isP func (d dynamodbInstrumentation) Delete(ctx context.Context, key dynamodbKey) error { return instrument.CollectedRequest(ctx, "Delete", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { - return d.kv.Delete(ctx, key) + totalCapacity, err := d.kv.Delete(ctx, key) + d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Delete").Add(totalCapacity) + return err }) } func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data []byte) error { return instrument.CollectedRequest(ctx, "Put", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { - return d.kv.Put(ctx, key, data) + totalCapacity, err := d.kv.Put(ctx, key, data) + d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Put").Add(totalCapacity) + return err }) } func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { - return d.kv.Batch(ctx, put, delete) + totalCapacity, err := d.kv.Batch(ctx, put, delete) + d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity) + return err }) }