Skip to content

Commit 4510e11

Browse files
authored
Add dynamodb multikey kv (#5026)
* Add dynamodb multikey kv Signed-off-by: Daniel Deluiggi <[email protected]> * Reorganize import Signed-off-by: Daniel Deluiggi <[email protected]> * Rework test Signed-off-by: Daniel Deluiggi <[email protected]> * Change ddb CAS to use batch Signed-off-by: Daniel Deluiggi <[email protected]> * Use slice to batch writeRequests Signed-off-by: Daniel Deluiggi <[email protected]> * Change slice usage Signed-off-by: Daniel Deluiggi <[email protected]> Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent 4bfd9f5 commit 4510e11

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+30350
-57
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
* [FEATURE] Query-frontend/Querier: Create spans to measure time to merge promql responses. #5041
2222
* [FEATURE] Querier/Ruler: Support the new thanos promql engine. This is an experimental feature and might change in the future. #5093
2323
* [FEATURE] Added zstd as an option for grpc compression #5092
24+
* [FEATURE] Ring: Add new kv store option `dynamodb`. #5026
2425
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
2526
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
2627
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055

docs/blocks-storage/compactor.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,19 @@ compactor:
204204
# CLI flag: -compactor.ring.prefix
205205
[prefix: <string> | default = "collectors/"]
206206

207+
dynamodb:
208+
# Region to access dynamodb.
209+
# CLI flag: -compactor.ring.dynamodb.region
210+
[region: <string> | default = ""]
211+
212+
# Table name to use on dynamodb.
213+
# CLI flag: -compactor.ring.dynamodb.table-name
214+
[table_name: <string> | default = ""]
215+
216+
# Time to expire items on dynamodb.
217+
# CLI flag: -compactor.ring.dynamodb.ttl-time
218+
[ttl: <duration> | default = 0s]
219+
207220
# The consul_config configures the consul client.
208221
# The CLI flags prefix for this block config is: compactor.ring
209222
[consul: <consul_config>]

docs/blocks-storage/store-gateway.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,19 @@ store_gateway:
205205
# CLI flag: -store-gateway.sharding-ring.prefix
206206
[prefix: <string> | default = "collectors/"]
207207

208+
dynamodb:
209+
# Region to access dynamodb.
210+
# CLI flag: -store-gateway.sharding-ring.dynamodb.region
211+
[region: <string> | default = ""]
212+
213+
# Table name to use on dynamodb.
214+
# CLI flag: -store-gateway.sharding-ring.dynamodb.table-name
215+
[table_name: <string> | default = ""]
216+
217+
# Time to expire items on dynamodb.
218+
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
219+
[ttl: <duration> | default = 0s]
220+
208221
# The consul_config configures the consul client.
209222
# The CLI flags prefix for this block config is:
210223
# store-gateway.sharding-ring

docs/configuration/config-file-reference.md

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,19 @@ ha_tracker:
493493
# CLI flag: -distributor.ha-tracker.prefix
494494
[prefix: <string> | default = "ha-tracker/"]
495495
496+
dynamodb:
497+
# Region to access dynamodb.
498+
# CLI flag: -distributor.ha-tracker.dynamodb.region
499+
[region: <string> | default = ""]
500+
501+
# Table name to use on dynamodb.
502+
# CLI flag: -distributor.ha-tracker.dynamodb.table-name
503+
[table_name: <string> | default = ""]
504+
505+
# Time to expire items on dynamodb.
506+
# CLI flag: -distributor.ha-tracker.dynamodb.ttl-time
507+
[ttl: <duration> | default = 0s]
508+
496509
# The consul_config configures the consul client.
497510
# The CLI flags prefix for this block config is: distributor.ha-tracker
498511
[consul: <consul_config>]
@@ -557,6 +570,19 @@ ring:
557570
# CLI flag: -distributor.ring.prefix
558571
[prefix: <string> | default = "collectors/"]
559572
573+
dynamodb:
574+
# Region to access dynamodb.
575+
# CLI flag: -distributor.ring.dynamodb.region
576+
[region: <string> | default = ""]
577+
578+
# Table name to use on dynamodb.
579+
# CLI flag: -distributor.ring.dynamodb.table-name
580+
[table_name: <string> | default = ""]
581+
582+
# Time to expire items on dynamodb.
583+
# CLI flag: -distributor.ring.dynamodb.ttl-time
584+
[ttl: <duration> | default = 0s]
585+
560586
# The consul_config configures the consul client.
561587
# The CLI flags prefix for this block config is: distributor.ring
562588
[consul: <consul_config>]
@@ -627,6 +653,19 @@ lifecycler:
627653
# CLI flag: -ring.prefix
628654
[prefix: <string> | default = "collectors/"]
629655
656+
dynamodb:
657+
# Region to access dynamodb.
658+
# CLI flag: -dynamodb.region
659+
[region: <string> | default = ""]
660+
661+
# Table name to use on dynamodb.
662+
# CLI flag: -dynamodb.table-name
663+
[table_name: <string> | default = ""]
664+
665+
# Time to expire items on dynamodb.
666+
# CLI flag: -dynamodb.ttl-time
667+
[ttl: <duration> | default = 0s]
668+
630669
# The consul_config configures the consul client.
631670
[consul: <consul_config>]
632671
@@ -1299,6 +1338,19 @@ ring:
12991338
# CLI flag: -ruler.ring.prefix
13001339
[prefix: <string> | default = "rulers/"]
13011340
1341+
dynamodb:
1342+
# Region to access dynamodb.
1343+
# CLI flag: -ruler.ring.dynamodb.region
1344+
[region: <string> | default = ""]
1345+
1346+
# Table name to use on dynamodb.
1347+
# CLI flag: -ruler.ring.dynamodb.table-name
1348+
[table_name: <string> | default = ""]
1349+
1350+
# Time to expire items on dynamodb.
1351+
# CLI flag: -ruler.ring.dynamodb.ttl-time
1352+
[ttl: <duration> | default = 0s]
1353+
13021354
# The consul_config configures the consul client.
13031355
# The CLI flags prefix for this block config is: ruler.ring
13041356
[consul: <consul_config>]
@@ -1681,6 +1733,19 @@ sharding_ring:
16811733
# CLI flag: -alertmanager.sharding-ring.prefix
16821734
[prefix: <string> | default = "alertmanagers/"]
16831735
1736+
dynamodb:
1737+
# Region to access dynamodb.
1738+
# CLI flag: -alertmanager.sharding-ring.dynamodb.region
1739+
[region: <string> | default = ""]
1740+
1741+
# Table name to use on dynamodb.
1742+
# CLI flag: -alertmanager.sharding-ring.dynamodb.table-name
1743+
[table_name: <string> | default = ""]
1744+
1745+
# Time to expire items on dynamodb.
1746+
# CLI flag: -alertmanager.sharding-ring.dynamodb.ttl-time
1747+
[ttl: <duration> | default = 0s]
1748+
16841749
# The consul_config configures the consul client.
16851750
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
16861751
[consul: <consul_config>]
@@ -3868,6 +3933,19 @@ sharding_ring:
38683933
# CLI flag: -compactor.ring.prefix
38693934
[prefix: <string> | default = "collectors/"]
38703935
3936+
dynamodb:
3937+
# Region to access dynamodb.
3938+
# CLI flag: -compactor.ring.dynamodb.region
3939+
[region: <string> | default = ""]
3940+
3941+
# Table name to use on dynamodb.
3942+
# CLI flag: -compactor.ring.dynamodb.table-name
3943+
[table_name: <string> | default = ""]
3944+
3945+
# Time to expire items on dynamodb.
3946+
# CLI flag: -compactor.ring.dynamodb.ttl-time
3947+
[ttl: <duration> | default = 0s]
3948+
38713949
# The consul_config configures the consul client.
38723950
# The CLI flags prefix for this block config is: compactor.ring
38733951
[consul: <consul_config>]
@@ -3955,6 +4033,19 @@ sharding_ring:
39554033
# CLI flag: -store-gateway.sharding-ring.prefix
39564034
[prefix: <string> | default = "collectors/"]
39574035
4036+
dynamodb:
4037+
# Region to access dynamodb.
4038+
# CLI flag: -store-gateway.sharding-ring.dynamodb.region
4039+
[region: <string> | default = ""]
4040+
4041+
# Table name to use on dynamodb.
4042+
# CLI flag: -store-gateway.sharding-ring.dynamodb.table-name
4043+
[table_name: <string> | default = ""]
4044+
4045+
# Time to expire items on dynamodb.
4046+
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
4047+
[ttl: <duration> | default = 0s]
4048+
39584049
# The consul_config configures the consul client.
39594050
# The CLI flags prefix for this block config is: store-gateway.sharding-ring
39604051
[consul: <consul_config>]

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
99
github.com/alicebob/miniredis/v2 v2.30.0
1010
github.com/armon/go-metrics v0.4.1
11+
github.com/aws/aws-sdk-go v1.44.156
1112
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
1213
github.com/cespare/xxhash v1.1.0
1314
github.com/dustin/go-humanize v1.0.0
@@ -89,7 +90,6 @@ require (
8990
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
9091
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
9192
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
92-
github.com/aws/aws-sdk-go v1.44.156 // indirect
9393
github.com/aws/aws-sdk-go-v2 v1.16.0 // indirect
9494
github.com/aws/aws-sdk-go-v2/config v1.15.1 // indirect
9595
github.com/aws/aws-sdk-go-v2/credentials v1.11.0 // indirect

integration/kv_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ func (c stringCodec) Decode(bb []byte) (interface{}, error) {
229229
func (c stringCodec) Encode(v interface{}) ([]byte, error) { return []byte(v.(string)), nil }
230230
func (c stringCodec) CodecID() string { return "stringCodec" }
231231

232+
func (stringCodec) EncodeMultiKey(msg interface{}) (map[string][]byte, error) {
233+
return nil, errors.New("String codec does not support EncodeMultiKey")
234+
}
235+
236+
func (stringCodec) DecodeMultiKey(map[string][]byte) (interface{}, error) {
237+
return nil, errors.New("String codec does not support DecodeMultiKey")
238+
}
239+
232240
type watcher struct {
233241
values map[string][]interface{}
234242
}

pkg/ring/http.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const pageContent = `
2424
<body>
2525
<h1>Ring Status</h1>
2626
<p>Current time: {{ .Now }}</p>
27+
<p>Storage updated: {{ .StorageLastUpdated }}</p>
2728
<form action="" method="POST">
2829
<input type="hidden" name="csrf_token" value="$__CSRF_TOKEN_PLACEHOLDER__">
2930
<table width="100%" border="1">
@@ -116,9 +117,10 @@ type ingesterDesc struct {
116117
}
117118

118119
type httpResponse struct {
119-
Ingesters []ingesterDesc `json:"shards"`
120-
Now time.Time `json:"now"`
121-
ShowTokens bool `json:"-"`
120+
Ingesters []ingesterDesc `json:"shards"`
121+
Now time.Time `json:"now"`
122+
StorageLastUpdated time.Time `json:"storageLastUpdated"`
123+
ShowTokens bool `json:"-"`
122124
}
123125

124126
func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -149,14 +151,14 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
149151
}
150152
sort.Strings(ingesterIDs)
151153

152-
now := time.Now()
154+
storageLastUpdate := r.KVClient.LastUpdateTime(r.key)
153155
var ingesters []ingesterDesc
154156
_, owned := r.countTokens()
155157
for _, id := range ingesterIDs {
156158
ing := r.ringDesc.Ingesters[id]
157159
heartbeatTimestamp := time.Unix(ing.Timestamp, 0)
158160
state := ing.State.String()
159-
if !r.IsHealthy(&ing, Reporting, now) {
161+
if !r.IsHealthy(&ing, Reporting, storageLastUpdate) {
160162
state = unhealthy
161163
}
162164

@@ -182,9 +184,10 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
182184
tokensParam := req.URL.Query().Get("tokens")
183185

184186
renderHTTPResponse(w, httpResponse{
185-
Ingesters: ingesters,
186-
Now: now,
187-
ShowTokens: tokensParam == "true",
187+
Ingesters: ingesters,
188+
Now: time.Now(),
189+
StorageLastUpdated: storageLastUpdate,
190+
ShowTokens: tokensParam == "true",
188191
}, pageTemplate, req)
189192
}
190193

pkg/ring/kv/client.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import (
55
"flag"
66
"fmt"
77
"sync"
8+
"time"
89

910
"github.com/go-kit/log"
1011
"github.com/prometheus/client_golang/prometheus"
1112

1213
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
1314
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
15+
"github.com/cortexproject/cortex/pkg/ring/kv/dynamodb"
1416
"github.com/cortexproject/cortex/pkg/ring/kv/etcd"
1517
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
1618
)
@@ -40,9 +42,10 @@ var inmemoryStore Client
4042
// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep
4143
// single-client config separate from final client-config (with all the wrappers)
4244
type StoreConfig struct {
43-
Consul consul.Config `yaml:"consul"`
44-
Etcd etcd.Config `yaml:"etcd"`
45-
Multi MultiConfig `yaml:"multi"`
45+
DynamoDB dynamodb.Config `yaml:"dynamodb"`
46+
Consul consul.Config `yaml:"consul"`
47+
Etcd etcd.Config `yaml:"etcd"`
48+
Multi MultiConfig `yaml:"multi"`
4649

4750
// Function that returns memberlist.KV store to use. By using a function, we can delay
4851
// initialization of memberlist.KV until it is actually required.
@@ -69,6 +72,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f
6972
// This needs to be fixed in the future (1.0 release maybe?) when we normalize flags.
7073
// At the moment we have consul.<flag-name>, and ring.store, going forward it would
7174
// be easier to have everything under ring, so ring.consul.<flag-name>
75+
cfg.DynamoDB.RegisterFlags(f, flagsPrefix)
7276
cfg.Consul.RegisterFlags(f, flagsPrefix)
7377
cfg.Etcd.RegisterFlagsWithPrefix(f, flagsPrefix)
7478
cfg.Multi.RegisterFlagsWithPrefix(f, flagsPrefix)
@@ -111,6 +115,9 @@ type Client interface {
111115

112116
// WatchPrefix calls f whenever any value stored under prefix changes.
113117
WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
118+
119+
// LastUpdateTime returns the time a key was last sync by the kv store
120+
LastUpdateTime(key string) time.Time
114121
}
115122

116123
// NewClient creates a new Client (consul, etcd or inmemory) based on the config,
@@ -128,6 +135,9 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
128135
var err error
129136

130137
switch backend {
138+
case "dynamodb":
139+
client, err = dynamodb.NewClient(cfg.DynamoDB, codec, logger, reg)
140+
131141
case "consul":
132142
client, err = consul.NewClient(cfg.Consul, codec, logger, reg)
133143

pkg/ring/kv/codec/clonable.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package codec
2+
3+
type Clonable interface {
4+
// Clone should return a deep copy of the state.
5+
Clone() interface{}
6+
}

0 commit comments

Comments
 (0)