Skip to content

Commit 3afa323

Browse files
authored
Merge pull request #1389 from cortexproject/combine-dynamo-writes
Send chunks writes to DynamoDB in the same request as index entries
2 parents 21a0556 + 34cb9d3 commit 3afa323

File tree

3 files changed

+40
-12
lines changed

3 files changed

+40
-12
lines changed

pkg/chunk/aws/dynamodb_storage_client.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -616,28 +616,47 @@ func processChunkResponse(response *dynamodb.BatchGetItemOutput, chunksByKey map
616616
return result, nil
617617
}
618618

619+
// PutChunkAndIndex implements chunk.ObjectAndIndexClient
620+
// Combine both sets of writes before sending to DynamoDB, for performance
621+
func (a dynamoDBStorageClient) PutChunkAndIndex(ctx context.Context, c chunk.Chunk, index chunk.WriteBatch) error {
622+
dynamoDBWrites, err := a.writesForChunks([]chunk.Chunk{c})
623+
if err != nil {
624+
return err
625+
}
626+
dynamoDBWrites.TakeReqs(index.(dynamoDBWriteBatch), 0)
627+
return a.BatchWrite(ctx, dynamoDBWrites)
628+
}
629+
619630
// PutChunks implements chunk.ObjectClient.
620631
func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
632+
dynamoDBWrites, err := a.writesForChunks(chunks)
633+
if err != nil {
634+
return err
635+
}
636+
return a.BatchWrite(ctx, dynamoDBWrites)
637+
}
638+
639+
func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWriteBatch, error) {
621640
var (
622641
dynamoDBWrites = dynamoDBWriteBatch{}
623642
)
624643

625644
for i := range chunks {
626645
buf, err := chunks[i].Encoded()
627646
if err != nil {
628-
return err
647+
return nil, err
629648
}
630649
key := chunks[i].ExternalKey()
631650

632651
table, err := a.schemaCfg.ChunkTableFor(chunks[i].From)
633652
if err != nil {
634-
return err
653+
return nil, err
635654
}
636655

637656
dynamoDBWrites.Add(table, key, placeholder, buf)
638657
}
639658

640-
return a.BatchWrite(ctx, dynamoDBWrites)
659+
return dynamoDBWrites, nil
641660
}
642661

643662
// Slice of values returned; map key is attribute name

pkg/chunk/series_store.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -328,21 +328,25 @@ func (c *seriesStore) Put(ctx context.Context, chunks []Chunk) error {
328328
func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error {
329329
chunks := []Chunk{chunk}
330330

331-
err := c.storage.PutChunks(ctx, chunks)
332-
if err != nil {
333-
return err
334-
}
335-
336-
c.writeBackCache(ctx, chunks)
337-
338331
writeReqs, keysToCache, err := c.calculateIndexEntries(from, through, chunk)
339332
if err != nil {
340333
return err
341334
}
342335

343-
if err := c.index.BatchWrite(ctx, writeReqs); err != nil {
344-
return err
336+
if oic, ok := c.storage.(ObjectAndIndexClient); ok {
337+
if err = oic.PutChunkAndIndex(ctx, chunk, writeReqs); err != nil {
338+
return err
339+
}
340+
} else {
341+
err := c.storage.PutChunks(ctx, chunks)
342+
if err != nil {
343+
return err
344+
}
345+
if err := c.index.BatchWrite(ctx, writeReqs); err != nil {
346+
return err
347+
}
345348
}
349+
c.writeBackCache(ctx, chunks)
346350

347351
bufs := make([][]byte, len(keysToCache))
348352
c.writeDedupeCache.Store(ctx, keysToCache, bufs)

pkg/chunk/storage_client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ type ObjectClient interface {
2222
GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error)
2323
}
2424

25+
// ObjectAndIndexClient allows optimisations where the same client handles both
26+
type ObjectAndIndexClient interface {
27+
PutChunkAndIndex(ctx context.Context, c Chunk, index WriteBatch) error
28+
}
29+
2530
// WriteBatch represents a batch of writes.
2631
type WriteBatch interface {
2732
Add(tableName, hashValue string, rangeValue []byte, value []byte)

0 commit comments

Comments
 (0)