@@ -17,16 +17,19 @@ import (
17
17
18
18
// Config to create a ConsulClient
19
19
type Config struct {
20
- Region string `yaml:"region"`
21
- TableName string `yaml:"table_name"`
22
- TTL time.Duration `yaml:"ttl"`
20
+ Region string `yaml:"region"`
21
+ TableName string `yaml:"table_name"`
22
+ TTL time.Duration `yaml:"ttl"`
23
+ PullerSyncTime time.Duration `yaml:"puller_sync_time"`
23
24
}
24
25
25
26
type Client struct {
26
- kv dynamoDbClient
27
- codec codec.Codec
28
- ddbMetrics * dynamodbMetrics
29
- logger log.Logger
27
+ kv dynamoDbClient
28
+ codec codec.Codec
29
+ ddbMetrics * dynamodbMetrics
30
+ logger log.Logger
31
+ pullerSyncTime time.Duration
32
+ backoffConfig backoff.Config
30
33
31
34
staleDataLock sync.RWMutex
32
35
staleData map [string ]staleData
@@ -37,22 +40,13 @@ type staleData struct {
37
40
timestamp time.Time
38
41
}
39
42
40
- var (
41
- backoffConfig = backoff.Config {
42
- MinBackoff : 1 * time .Second ,
43
- MaxBackoff : 1 * time .Minute ,
44
- MaxRetries : 0 ,
45
- }
46
-
47
- defaultLoopDelay = 1 * time .Minute
48
- )
49
-
50
43
// RegisterFlags adds the flags required to config this to the given FlagSet
51
44
// If prefix is not an empty string it should end with a period.
52
45
func (cfg * Config ) RegisterFlags (f * flag.FlagSet , prefix string ) {
53
46
f .StringVar (& cfg .Region , prefix + "dynamodb.region" , "" , "Region to access dynamodb." )
54
47
f .StringVar (& cfg .TableName , prefix + "dynamodb.table-name" , "" , "Table name to use on dynamodb." )
55
48
f .DurationVar (& cfg .TTL , prefix + "dynamodb.ttl-time" , 0 , "Time to expire items on dynamodb." )
49
+ f .DurationVar (& cfg .PullerSyncTime , prefix + "dynamodb.puller-sync-time" , 60 * time .Second , "Time to refresh local ring with information on dynamodb." )
56
50
}
57
51
58
52
func NewClient (cfg Config , cc codec.Codec , logger log.Logger , registerer prometheus.Registerer ) (* Client , error ) {
@@ -63,12 +57,20 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh
63
57
64
58
ddbMetrics := newDynamoDbMetrics (registerer )
65
59
60
+ backoffConfig := backoff.Config {
61
+ MinBackoff : 1 * time .Second ,
62
+ MaxBackoff : cfg .PullerSyncTime ,
63
+ MaxRetries : 0 ,
64
+ }
65
+
66
66
c := & Client {
67
- kv : dynamodbInstrumentation {kv : dynamoDB , ddbMetrics : ddbMetrics },
68
- codec : cc ,
69
- logger : ddbLog (logger ),
70
- ddbMetrics : ddbMetrics ,
71
- staleData : make (map [string ]staleData ),
67
+ kv : dynamodbInstrumentation {kv : dynamoDB , ddbMetrics : ddbMetrics },
68
+ codec : cc ,
69
+ logger : ddbLog (logger ),
70
+ ddbMetrics : ddbMetrics ,
71
+ pullerSyncTime : cfg .PullerSyncTime ,
72
+ staleData : make (map [string ]staleData ),
73
+ backoffConfig : backoffConfig ,
72
74
}
73
75
level .Info (c .logger ).Log ("dynamodb kv initialized" )
74
76
return c , nil
@@ -121,7 +123,7 @@ func (c *Client) Delete(ctx context.Context, key string) error {
121
123
}
122
124
123
125
func (c * Client ) CAS (ctx context.Context , key string , f func (in interface {}) (out interface {}, retry bool , err error )) error {
124
- bo := backoff .New (ctx , backoffConfig )
126
+ bo := backoff .New (ctx , c . backoffConfig )
125
127
for bo .Ongoing () {
126
128
resp , _ , err := c .kv .Query (ctx , dynamodbKey {primaryKey : key }, false )
127
129
if err != nil {
@@ -190,7 +192,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
190
192
}
191
193
192
194
func (c * Client ) WatchKey (ctx context.Context , key string , f func (interface {}) bool ) {
193
- bo := backoff .New (ctx , backoffConfig )
195
+ bo := backoff .New (ctx , c . backoffConfig )
194
196
195
197
for bo .Ongoing () {
196
198
out , _ , err := c .kv .Query (ctx , dynamodbKey {
@@ -199,12 +201,11 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
199
201
if err != nil {
200
202
level .Error (c .logger ).Log ("msg" , "error WatchKey" , "key" , key , "err" , err )
201
203
202
- if bo .NumRetries () > 10 {
204
+ if bo .NumRetries () >= 10 {
203
205
if staleData := c .getStaleData (key ); staleData != nil {
204
206
if ! f (staleData ) {
205
207
return
206
208
}
207
- bo .Reset ()
208
209
}
209
210
}
210
211
bo .Wait ()
@@ -226,13 +227,13 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
226
227
select {
227
228
case <- ctx .Done ():
228
229
return
229
- case <- time .After (defaultLoopDelay ):
230
+ case <- time .After (c . pullerSyncTime ):
230
231
}
231
232
}
232
233
}
233
234
234
235
func (c * Client ) WatchPrefix (ctx context.Context , prefix string , f func (string , interface {}) bool ) {
235
- bo := backoff .New (ctx , backoffConfig )
236
+ bo := backoff .New (ctx , c . backoffConfig )
236
237
237
238
for bo .Ongoing () {
238
239
out , _ , err := c .kv .Query (ctx , dynamodbKey {
@@ -259,7 +260,7 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
259
260
select {
260
261
case <- ctx .Done ():
261
262
return
262
- case <- time .After (defaultLoopDelay ):
263
+ case <- time .After (c . pullerSyncTime ):
263
264
}
264
265
}
265
266
}
0 commit comments