Skip to content

Commit 7a1f125

Browse files
committed
Change ddb CAS to use batch
Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent db5f4c6 commit 7a1f125

File tree

5 files changed

+214
-36
lines changed

5 files changed

+214
-36
lines changed

pkg/ring/kv/dynamodb/client.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,20 +168,18 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
168168
continue
169169
}
170170

171+
putRequests := map[dynamodbKey][]byte{}
171172
for childKey, bytes := range buf {
172-
err := c.kv.Put(ctx, dynamodbKey{primaryKey: key, sortKey: childKey}, bytes)
173-
if err != nil {
174-
level.Error(c.logger).Log("msg", "error CASing", "key", key, "err", err)
175-
continue
176-
}
173+
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = bytes
177174
}
178175

176+
deleteRequests := []dynamodbKey{}
179177
for _, childKey := range toDelete {
180-
err := c.kv.Delete(ctx, dynamodbKey{primaryKey: key, sortKey: childKey})
181-
if err != nil {
182-
level.Error(c.logger).Log("msg", "error CASing", "key", key, "err", err)
183-
continue
184-
}
178+
deleteRequests = append(deleteRequests, dynamodbKey{primaryKey: key, sortKey: childKey})
179+
}
180+
181+
if len(putRequests) > 0 || len(deleteRequests) > 0 {
182+
return c.kv.Batch(ctx, putRequests, deleteRequests)
185183
}
186184

187185
return nil

pkg/ring/kv/dynamodb/client_test.go

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,24 +87,25 @@ func Test_CAS_Update(t *testing.T) {
8787
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
8888
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
8989
}
90+
expectedBatch := map[dynamodbKey][]byte{
91+
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
92+
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
93+
}
9094

9195
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
9296
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
9397
descMock.On("Clone").Return(descMock).Once()
9498
descMock.On("FindDifference", descMock).Return(descMock, []string{}, nil).Once()
9599
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
96-
ddbMock.On("Put", context.TODO(), dynamodbKey{primaryKey: key, sortKey: expectedUpdatedKeys[0]}, []byte(expectedUpdatedKeys[0])).Once()
97-
ddbMock.On("Put", context.TODO(), dynamodbKey{primaryKey: key, sortKey: expectedUpdatedKeys[1]}, []byte(expectedUpdatedKeys[1])).Once()
100+
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
98101

99102
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
100103
return descMock, true, nil
101104
})
102105

103106
require.NoError(t, err)
104-
ddbMock.AssertNumberOfCalls(t, "Put", 2)
105-
ddbMock.AssertNumberOfCalls(t, "Delete", 0)
106-
ddbMock.AssertCalled(t, "Put", context.TODO(), dynamodbKey{primaryKey: key, sortKey: expectedUpdatedKeys[0]}, []byte(expectedUpdatedKeys[0]))
107-
ddbMock.AssertCalled(t, "Put", context.TODO(), dynamodbKey{primaryKey: key, sortKey: expectedUpdatedKeys[1]}, []byte(expectedUpdatedKeys[1]))
107+
ddbMock.AssertNumberOfCalls(t, "Batch", 1)
108+
ddbMock.AssertCalled(t, "Batch", context.TODO(), expectedBatch, []dynamodbKey{})
108109
}
109110

110111
func Test_CAS_Delete(t *testing.T) {
@@ -113,24 +114,61 @@ func Test_CAS_Delete(t *testing.T) {
113114
descMock := &DescMock{}
114115
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry())
115116
expectedToDelete := []string{"test", "test2"}
117+
expectedBatch := []dynamodbKey{
118+
{primaryKey: key, sortKey: expectedToDelete[0]},
119+
{primaryKey: key, sortKey: expectedToDelete[1]},
120+
}
116121

117122
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
118123
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
119124
descMock.On("Clone").Return(descMock).Once()
120125
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
121126
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Once()
122-
ddbMock.On("Delete", context.TODO(), dynamodbKey{primaryKey: key, sortKey: expectedToDelete[0]})
123-
ddbMock.On("Delete", context.TODO(), dynamodbKey{primaryKey: key, sortKey: expectedToDelete[1]})
127+
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
128+
129+
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
130+
return descMock, true, nil
131+
})
132+
133+
require.NoError(t, err)
134+
ddbMock.AssertNumberOfCalls(t, "Batch", 1)
135+
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
136+
}
137+
138+
func Test_CAS_Update_Delete(t *testing.T) {
139+
ddbMock := NewDynamodbClientMock()
140+
codecMock := &CodecMock{}
141+
descMock := &DescMock{}
142+
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry())
143+
expectedUpdatedKeys := []string{"t1", "t2"}
144+
expectedUpdated := map[string][]byte{
145+
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
146+
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
147+
}
148+
expectedUpdateBatch := map[dynamodbKey][]byte{
149+
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
150+
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
151+
}
152+
expectedToDelete := []string{"test", "test2"}
153+
expectedDeleteBatch := []dynamodbKey{
154+
{primaryKey: key, sortKey: expectedToDelete[0]},
155+
{primaryKey: key, sortKey: expectedToDelete[1]},
156+
}
157+
158+
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
159+
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
160+
descMock.On("Clone").Return(descMock).Once()
161+
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
162+
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
163+
ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch)
124164

125165
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
126166
return descMock, true, nil
127167
})
128168

129169
require.NoError(t, err)
130-
ddbMock.AssertNumberOfCalls(t, "Put", 0)
131-
ddbMock.AssertNumberOfCalls(t, "Delete", 2)
132-
ddbMock.AssertCalled(t, "Delete", context.TODO(), dynamodbKey{primaryKey: key, sortKey: expectedToDelete[0]})
133-
ddbMock.AssertCalled(t, "Delete", context.TODO(), dynamodbKey{primaryKey: key, sortKey: expectedToDelete[1]})
170+
ddbMock.AssertNumberOfCalls(t, "Batch", 1)
171+
ddbMock.AssertCalled(t, "Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch)
134172
}
135173

136174
func Test_WatchKey(t *testing.T) {
@@ -271,6 +309,10 @@ func (m *mockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []by
271309
m.Called(ctx, key, data)
272310
return nil
273311
}
312+
func (m *mockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
313+
m.Called(ctx, put, delete)
314+
return nil
315+
}
274316

275317
type TestLogger struct {
276318
}

pkg/ring/kv/dynamodb/dynamodb.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type dynamoDbClient interface {
2323
Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error)
2424
Delete(ctx context.Context, key dynamodbKey) error
2525
Put(ctx context.Context, key dynamodbKey, data []byte) error
26+
Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error
2627
}
2728

2829
type dynamodbKV struct {
@@ -161,6 +162,55 @@ func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) error {
161162
}
162163

163164
func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) error {
165+
input := &dynamodb.PutItemInput{
166+
TableName: kv.tableName,
167+
Item: kv.generatePutItemRequest(key, data),
168+
}
169+
_, err := kv.ddbClient.PutItemWithContext(ctx, input)
170+
return err
171+
}
172+
173+
func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
174+
var writeRequests []*dynamodb.WriteRequest
175+
for key, data := range put {
176+
item := kv.generatePutItemRequest(key, data)
177+
writeRequests = append(writeRequests, &dynamodb.WriteRequest{
178+
PutRequest: &dynamodb.PutRequest{
179+
Item: item,
180+
},
181+
})
182+
}
183+
184+
for _, key := range delete {
185+
item := generateItemKey(key)
186+
187+
writeRequests = append(writeRequests, &dynamodb.WriteRequest{
188+
DeleteRequest: &dynamodb.DeleteRequest{
189+
Key: item,
190+
},
191+
})
192+
}
193+
194+
if len(writeRequests) == 0 {
195+
return nil
196+
}
197+
198+
input := &dynamodb.BatchWriteItemInput{
199+
RequestItems: map[string][]*dynamodb.WriteRequest{
200+
*kv.tableName: writeRequests,
201+
},
202+
}
203+
204+
resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input)
205+
206+
if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 {
207+
return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems)
208+
}
209+
210+
return err
211+
}
212+
213+
func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue {
164214
item := generateItemKey(key)
165215
item[contentData] = &dynamodb.AttributeValue{
166216
B: data,
@@ -171,12 +221,7 @@ func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) erro
171221
}
172222
}
173223

174-
input := &dynamodb.PutItemInput{
175-
TableName: kv.tableName,
176-
Item: item,
177-
}
178-
_, err := kv.ddbClient.PutItemWithContext(ctx, input)
179-
return err
224+
return item
180225
}
181226

182227
func generateItemKey(key dynamodbKey) map[string]*dynamodb.AttributeValue {

pkg/ring/kv/dynamodb/dynamodb_test.go

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,21 @@ import (
1515

1616
func Test_TTLDisabled(t *testing.T) {
1717
ddbClientMock := &mockDynamodb{
18-
f: func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput {
18+
putItem: func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput {
1919
require.Nil(t, input.Item["ttl"])
2020
return &dynamodb.PutItemOutput{}
2121
},
2222
}
2323

24-
ddb := newDynamodbClientMock(ddbClientMock, 0)
24+
ddb := newDynamodbClientMock("TEST", ddbClientMock, 0)
2525
err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
2626
require.NoError(t, err)
2727

2828
}
2929

3030
func Test_TTL(t *testing.T) {
3131
ddbClientMock := &mockDynamodb{
32-
f: func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput {
32+
putItem: func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput {
3333
require.NotNil(t, input.Item["ttl"].N)
3434
parsedTime, err := strconv.ParseInt(*input.Item["ttl"].N, 10, 64)
3535
require.NoError(t, err)
@@ -39,25 +39,112 @@ func Test_TTL(t *testing.T) {
3939
},
4040
}
4141

42-
ddb := newDynamodbClientMock(ddbClientMock, 5*time.Hour)
42+
ddb := newDynamodbClientMock("TEST", ddbClientMock, 5*time.Hour)
4343
err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST"))
4444
require.NoError(t, err)
4545
}
4646

47+
func Test_Batch(t *testing.T) {
48+
tableName := "TEST"
49+
ddbKeyUpdate := dynamodbKey{
50+
primaryKey: "PKUpdate",
51+
sortKey: "SKUpdate",
52+
}
53+
ddbKeyDelete := dynamodbKey{
54+
primaryKey: "PKDelete",
55+
sortKey: "SKDelete",
56+
}
57+
update := map[dynamodbKey][]byte{
58+
ddbKeyUpdate: {},
59+
}
60+
delete := []dynamodbKey{ddbKeyDelete}
61+
62+
ddbClientMock := &mockDynamodb{
63+
batchWriteItem: func(input *dynamodb.BatchWriteItemInput) *dynamodb.BatchWriteItemOutput {
64+
require.NotNil(t, input.RequestItems[tableName])
65+
require.True(t, len(input.RequestItems[tableName]) == 2)
66+
require.True(t,
67+
(checkPutRequestForItem(input.RequestItems[tableName][0], ddbKeyUpdate) || checkPutRequestForItem(input.RequestItems[tableName][1], ddbKeyUpdate)) &&
68+
(checkDeleteRequestForItem(input.RequestItems[tableName][0], ddbKeyDelete) || checkDeleteRequestForItem(input.RequestItems[tableName][1], ddbKeyDelete)))
69+
return &dynamodb.BatchWriteItemOutput{}
70+
},
71+
}
72+
73+
ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
74+
err := ddb.Batch(context.TODO(), update, delete)
75+
require.NoError(t, err)
76+
}
77+
78+
func Test_EmptyBatch(t *testing.T) {
79+
tableName := "TEST"
80+
ddbClientMock := &mockDynamodb{}
81+
82+
ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
83+
err := ddb.Batch(context.TODO(), nil, nil)
84+
require.NoError(t, err)
85+
}
86+
87+
func Test_Batch_Error(t *testing.T) {
88+
tableName := "TEST"
89+
ddbKeyDelete := dynamodbKey{
90+
primaryKey: "PKDelete",
91+
sortKey: "SKDelete",
92+
}
93+
delete := []dynamodbKey{ddbKeyDelete}
94+
95+
ddbClientMock := &mockDynamodb{
96+
batchWriteItem: func(input *dynamodb.BatchWriteItemInput) *dynamodb.BatchWriteItemOutput {
97+
return &dynamodb.BatchWriteItemOutput{
98+
UnprocessedItems: map[string][]*dynamodb.WriteRequest{
99+
tableName: {&dynamodb.WriteRequest{
100+
PutRequest: &dynamodb.PutRequest{Item: generateItemKey(ddbKeyDelete)}},
101+
},
102+
},
103+
}
104+
},
105+
}
106+
107+
ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
108+
err := ddb.Batch(context.TODO(), nil, delete)
109+
require.Errorf(t, err, "error processing batch dynamodb")
110+
}
111+
112+
func checkPutRequestForItem(request *dynamodb.WriteRequest, key dynamodbKey) bool {
113+
return request.PutRequest != nil &&
114+
request.PutRequest.Item[primaryKey] != nil &&
115+
request.PutRequest.Item[sortKey] != nil &&
116+
*request.PutRequest.Item[primaryKey].S == key.primaryKey &&
117+
*request.PutRequest.Item[sortKey].S == key.sortKey
118+
}
119+
120+
func checkDeleteRequestForItem(request *dynamodb.WriteRequest, key dynamodbKey) bool {
121+
return request.DeleteRequest != nil &&
122+
request.DeleteRequest.Key[primaryKey] != nil &&
123+
request.DeleteRequest.Key[sortKey] != nil &&
124+
*request.DeleteRequest.Key[primaryKey].S == key.primaryKey &&
125+
*request.DeleteRequest.Key[sortKey].S == key.sortKey
126+
}
127+
47128
type mockDynamodb struct {
48-
f func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput
129+
putItem func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput
130+
batchWriteItem func(input *dynamodb.BatchWriteItemInput) *dynamodb.BatchWriteItemOutput
131+
49132
dynamodbiface.DynamoDBAPI
50133
}
51134

52135
func (m *mockDynamodb) PutItemWithContext(_ context.Context, input *dynamodb.PutItemInput, _ ...request.Option) (*dynamodb.PutItemOutput, error) {
53-
return m.f(input), nil
136+
return m.putItem(input), nil
137+
}
138+
139+
func (m *mockDynamodb) BatchWriteItemWithContext(_ context.Context, input *dynamodb.BatchWriteItemInput, _ ...request.Option) (*dynamodb.BatchWriteItemOutput, error) {
140+
return m.batchWriteItem(input), nil
54141
}
55142

56-
func newDynamodbClientMock(mock *mockDynamodb, ttl time.Duration) *dynamodbKV {
143+
func newDynamodbClientMock(tableName string, mock *mockDynamodb, ttl time.Duration) *dynamodbKV {
57144
ddbKV := &dynamodbKV{
58145
ddbClient: mock,
59146
logger: TestLogger{},
60-
tableName: aws.String("TEST"),
147+
tableName: aws.String(tableName),
61148
ttlValue: ttl,
62149
}
63150

pkg/ring/kv/dynamodb/metrics.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data
7676
})
7777
}
7878

79+
func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
80+
return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error {
81+
return d.kv.Batch(ctx, put, delete)
82+
})
83+
}
84+
7985
// errorCode converts an error into an error code string.
8086
func errorCode(err error) string {
8187
if err == nil {

0 commit comments

Comments
 (0)