Skip to content

Commit b2c5679

Browse files
committed
address some review feedback
Signed-off-by: Robert Fratto <[email protected]>
1 parent 3fea6de commit b2c5679

File tree

12 files changed

+208
-196
lines changed

12 files changed

+208
-196
lines changed

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -545,18 +545,9 @@ token_checker:
545545
# CLI flag: -ingester.spread-flushes
546546
[spreadflushes: <boolean> | default = false]
547547
548-
# Check that newly created streams fall within expected token ranges
549-
# CLI flag: -ingester.check-token-on-create
550-
[check_token_on_create: <boolean> | default = false]
551-
552-
# Check that existing streams appended to fall within expected token ranges
553-
# CLI flag: -ingester.check-token-on-append
554-
[check_token_on_append: <boolean> | default = false]
555-
556-
# Check that streams transferred in using the transfer mechanism fall within
557-
# expected token ranges
558-
# CLI flag: -ingester.check-token-on-transfer
559-
[check_token_on_transfer: <boolean> | default = false]
548+
# Check tokens for streams that are created or appended to.
549+
# CLI flag: -ingester.check-tokens
550+
[check_tokens: <boolean> | default = false]
560551
561552
# Period with which to update the per-user ingestion rates.
562553
# CLI flag: -ingester.rate-update-period

pkg/ingester/client/cortex.pb.go

Lines changed: 107 additions & 107 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ingester/client/cortex.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ service Ingester {
2929
// TransferTSDB transfers all files of a tsdb to a joining ingester
3030
rpc TransferTSDB(stream TimeSeriesFile) returns (TransferTSDBResponse) {};
3131

32-
// TransferChunksSubset is invoked by a leaving ingester (client), streaming
32+
// AcceptChunksSubset is invoked by a leaving ingester (client), streaming
3333
// a subset of its chunks directly to an existing ingester.
34-
rpc TransferChunksSubset(stream TimeSeriesChunk) returns (TransferChunksResponse) {};
34+
rpc AcceptChunksSubset(stream TimeSeriesChunk) returns (TransferChunksResponse) {};
3535

3636
// GetChunksSubset is invoked by a joining ingester (client). A subset of the
3737
// serving ingester's chunks will be sent as a stream.

pkg/ingester/incremental_transfer.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,20 @@ import (
1010
"github.com/go-kit/kit/log/level"
1111
"github.com/pkg/errors"
1212
"github.com/prometheus/client_golang/prometheus"
13+
"github.com/prometheus/client_golang/prometheus/promauto"
1314
"github.com/weaveworks/common/user"
1415
"golang.org/x/net/context"
1516
)
1617

1718
var (
18-
blockedRanges = prometheus.NewGauge(prometheus.GaugeOpts{
19+
blockedRanges = promauto.NewGauge(prometheus.GaugeOpts{
1920
Name: "cortex_ingester_blocked_ranges",
2021
Help: "The current number of ranges that will not accept writes by this ingester.",
2122
})
2223
)
2324

24-
func init() {
25-
prometheus.MustRegister(blockedRanges)
26-
}
27-
28-
// TransferChunksSubset accepts chunks from a client and moves them into the local Ingester.
29-
func (i *Ingester) TransferChunksSubset(stream client.Ingester_TransferChunksSubsetServer) error {
25+
// AcceptChunksSubset accepts chunks from a client and moves them into the local Ingester.
26+
func (i *Ingester) AcceptChunksSubset(stream client.Ingester_AcceptChunksSubsetServer) error {
3027
i.userStatesMtx.Lock()
3128
defer i.userStatesMtx.Unlock()
3229

@@ -90,8 +87,8 @@ func (i *Ingester) GetChunksSubset(req *client.GetChunksRequest, stream client.I
9087
// sitting around forever if a joining ingester crashes, as writes will continue
9188
// to go to us and get rejected for as long as the blocked range exists.
9289
func (i *Ingester) BlockRanges(ranges []ring.TokenRange) {
93-
i.blockedTokenMtx.Lock()
94-
defer i.blockedTokenMtx.Unlock()
90+
i.blockedRangesMtx.Lock()
91+
defer i.blockedRangesMtx.Unlock()
9592

9693
for _, rg := range ranges {
9794
if exist := i.blockedRanges[rg]; exist {
@@ -112,8 +109,8 @@ func (i *Ingester) BlockRanges(ranges []ring.TokenRange) {
112109

113110
// UnblockRanges manually removes blocks for the provided ranges.
114111
func (i *Ingester) UnblockRanges(ctx context.Context, in *client.UnblockRangesRequest) (*client.UnblockRangesResponse, error) {
115-
i.blockedTokenMtx.Lock()
116-
defer i.blockedTokenMtx.Unlock()
112+
i.blockedRangesMtx.Lock()
113+
defer i.blockedRangesMtx.Unlock()
117114

118115
for _, rg := range in.Ranges {
119116
if exist := i.blockedRanges[rg]; !exist {
@@ -146,8 +143,8 @@ func (i *Ingester) SendChunkRanges(ctx context.Context, ranges []ring.TokenRange
146143
}
147144
defer c.Close()
148145

149-
ctx = user.InjectOrgID(ctx, fakeOrgID)
150-
stream, err := c.TransferChunksSubset(ctx)
146+
ctx = user.InjectOrgID(ctx, noOrgID)
147+
stream, err := c.AcceptChunksSubset(ctx)
151148
if err != nil {
152149
return errors.Wrap(err, "SendChunks")
153150
}
@@ -186,7 +183,7 @@ func (i *Ingester) RequestChunkRanges(ctx context.Context, ranges []ring.TokenRa
186183
}
187184
defer c.Close()
188185

189-
ctx = user.InjectOrgID(ctx, fakeOrgID)
186+
ctx = user.InjectOrgID(ctx, noOrgID)
190187
stream, err := c.GetChunksSubset(ctx, &client.GetChunksRequest{
191188
Ranges: ranges,
192189
Move: move,
@@ -220,7 +217,7 @@ func (i *Ingester) RequestComplete(ctx context.Context, ranges []ring.TokenRange
220217
}
221218
defer c.Close()
222219

223-
ctx = user.InjectOrgID(ctx, fakeOrgID)
220+
ctx = user.InjectOrgID(ctx, noOrgID)
224221
_, err = c.UnblockRanges(ctx, &client.UnblockRangesRequest{Ranges: ranges})
225222
if err != nil {
226223
level.Error(util.Logger).Log("msg", "could not clean up target after transfer", "err", err)

pkg/ingester/ingester.go

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@ type Config struct {
6363
SpreadFlushes bool
6464

6565
// Config for checking tokens.
66-
CheckOnCreate bool `yaml:"check_token_on_create,omitempty"`
67-
CheckOnAppend bool `yaml:"check_token_on_append,omitempty"`
68-
CheckOnTransfer bool `yaml:"check_token_on_transfer,omitempty"`
66+
CheckTokens bool `yaml:"check_tokens,omitempty"`
6967

7068
RateUpdatePeriod time.Duration
7169

@@ -98,9 +96,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
9896
f.DurationVar(&cfg.ChunkAgeJitter, "ingester.chunk-age-jitter", 20*time.Minute, "Range of time to subtract from MaxChunkAge to spread out flushes")
9997
f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge")
10098
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 50, "Number of concurrent goroutines flushing to dynamodb.")
101-
f.BoolVar(&cfg.CheckOnCreate, "ingester.check-token-on-create", false, "Check that newly created streams fall within expected token ranges")
102-
f.BoolVar(&cfg.CheckOnAppend, "ingester.check-token-on-append", false, "Check that existing streams appended to fall within expected token ranges")
103-
f.BoolVar(&cfg.CheckOnTransfer, "ingester.check-token-on-transfer", false, "Check that streams transferred in using the transfer mechanism fall within expected token ranges")
99+
f.BoolVar(&cfg.CheckTokens, "ingester.check-tokens", false, "Check tokens for streams that are created or appended to.")
104100
f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
105101
}
106102

@@ -134,8 +130,8 @@ type Ingester struct {
134130
wal WAL
135131

136132
// Stops specific appends
137-
blockedTokenMtx sync.RWMutex
138-
blockedRanges map[ring.TokenRange]bool
133+
blockedRangesMtx sync.RWMutex
134+
blockedRanges map[ring.TokenRange]bool
139135

140136
// Hook for injecting behaviour from tests.
141137
preFlushUserSeries func()
@@ -299,21 +295,22 @@ func (i *Ingester) StopIncomingRequests() {
299295
i.userStatesMtx.Lock()
300296
defer i.userStatesMtx.Unlock()
301297
i.stopped = true
302-
return
303298
}
304299

305-
// When we are incrementally transferring tokens, we want to wait
306-
// for there to be no blocked ranges on our local ingester.
307-
for {
308-
i.blockedTokenMtx.RLock()
309-
numBlocked := len(i.blockedRanges)
310-
i.blockedTokenMtx.RUnlock()
300+
if i.cfg.LifecyclerConfig.LeaveIncrementalTransfer {
301+
// When we are incrementally transferring tokens, we want to wait
302+
// for there to be no blocked ranges on our local ingester.
303+
for {
304+
i.blockedRangesMtx.RLock()
305+
numBlocked := len(i.blockedRanges)
306+
i.blockedRangesMtx.RUnlock()
311307

312-
if numBlocked == 0 {
313-
return
314-
}
308+
if numBlocked == 0 {
309+
break
310+
}
315311

316-
time.Sleep(time.Millisecond * 250)
312+
time.Sleep(time.Millisecond * 250)
313+
}
317314
}
318315
}
319316

@@ -379,8 +376,8 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
379376

380377
// isTokenBlocked checks to see if a token is in a blocked range.
381378
func (i *Ingester) isTokenBlocked(token uint32) error {
382-
i.blockedTokenMtx.RLock()
383-
defer i.blockedTokenMtx.RUnlock()
379+
i.blockedRangesMtx.RLock()
380+
defer i.blockedRangesMtx.RUnlock()
384381

385382
for rg := range i.blockedRanges {
386383
if rg.Contains(token) {
@@ -429,12 +426,12 @@ func (i *Ingester) append(ctx context.Context, userID string, token uint32, labe
429426
return err
430427
}
431428

432-
if sstate == seriesCreated && i.cfg.CheckOnCreate {
429+
if sstate == seriesCreated && i.cfg.CheckTokens {
433430
if ok := i.tokenChecker.TokenExpected(token); !ok {
434431
level.Debug(util.Logger).Log("msg", "unexpected stream created in ingester", "token", token)
435432
i.metrics.unexpectedSeriesTotal.WithLabelValues("create").Inc()
436433
}
437-
} else if i.cfg.CheckOnAppend {
434+
} else if i.cfg.CheckTokens {
438435
if ok := i.tokenChecker.TokenExpected(token); !ok {
439436
level.Debug(util.Logger).Log("msg", "unexpected stream appended in ingester", "token", token)
440437
i.metrics.unexpectedSeriesTotal.WithLabelValues("append").Inc()

pkg/ingester/transfer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
)
2727

2828
var (
29-
fakeOrgID = "-1"
29+
noOrgID = "-1"
3030

3131
sentChunks = prometheus.NewCounter(prometheus.CounterOpts{
3232
Name: "cortex_ingester_sent_chunks",
@@ -123,7 +123,7 @@ func (i *Ingester) acceptChunksFromStream(opts acceptChunksOptions) (fromIngeste
123123
}
124124
prevNumChunks := len(series.chunkDescs)
125125

126-
if i.cfg.CheckOnTransfer && !i.tokenChecker.TokenExpected(wireSeries.Token) {
126+
if i.cfg.CheckTokens && !i.tokenChecker.TokenExpected(wireSeries.Token) {
127127
level.Debug(util.Logger).Log("msg", "unexpected stream transferred to ingester", "token", wireSeries.Token)
128128
i.metrics.unexpectedSeriesTotal.WithLabelValues("transfer").Inc()
129129
}
@@ -577,7 +577,7 @@ func (i *Ingester) transferOut(ctx context.Context) error {
577577
}
578578
defer c.Close()
579579

580-
ctx = user.InjectOrgID(ctx, fakeOrgID)
580+
ctx = user.InjectOrgID(ctx, noOrgID)
581581
stream, err := c.TransferChunks(ctx)
582582
if err != nil {
583583
return errors.Wrap(err, "TransferChunks")
@@ -672,7 +672,7 @@ func (i *Ingester) v2TransferOut(ctx context.Context) error {
672672
}
673673
defer c.Close()
674674

675-
ctx = user.InjectOrgID(ctx, fakeOrgID)
675+
ctx = user.InjectOrgID(ctx, noOrgID)
676676
stream, err := c.TransferTSDB(ctx)
677677
if err != nil {
678678
return errors.Wrap(err, "TransferTSDB() has failed")

pkg/ingester/transfer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ func (s *testSendChunksServer) SendAndClose(resp *client.TransferChunksResponse)
630630
return nil
631631
}
632632

633-
func (c *testIngesterClient) TransferChunksSubset(ctx context.Context, opts ...grpc.CallOption) (client.Ingester_TransferChunksSubsetClient, error) {
633+
func (c *testIngesterClient) AcceptChunksSubset(ctx context.Context, opts ...grpc.CallOption) (client.Ingester_AcceptChunksSubsetClient, error) {
634634
ch := make(chan *client.TimeSeriesChunk)
635635
resp := make(chan *client.TransferChunksResponse)
636636

@@ -645,7 +645,7 @@ func (c *testIngesterClient) TransferChunksSubset(ctx context.Context, opts ...g
645645
}
646646

647647
go func() {
648-
_ = c.i.TransferChunksSubset(&srv)
648+
_ = c.i.AcceptChunksSubset(&srv)
649649
}()
650650

651651
return &cli, nil

pkg/ring/lifecycler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func (i *Lifecycler) loop() {
471471
}
472472

473473
if err := i.autoJoin(context.Background(), JOINING); err != nil {
474-
level.Error(util.Logger).Log("msg", "failed to pick tokens in consul", "err", err)
474+
level.Error(util.Logger).Log("msg", "failed to pick tokens in KV store", "err", err)
475475
os.Exit(1)
476476
}
477477

pkg/ring/model.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,26 @@ func (d *Desc) TokensFor(id string) (tokens, other []uint32) {
154154
func (i *IngesterDesc) IsHealthyState(op Operation) bool {
155155
healthy := false
156156

157-
switch op {
158-
case Write:
159-
healthy = (i.State == ACTIVE)
157+
if i.Incremental {
158+
// Incremental transferring ingester: everything is healthy except
159+
// writing or reading to a PENDING ingester.
160+
switch op {
161+
case Write:
162+
healthy = (i.State != PENDING)
163+
default:
164+
healthy = true
165+
}
166+
} else {
167+
switch op {
168+
case Write:
169+
healthy = (i.State == ACTIVE)
160170

161-
case Read:
162-
healthy = (i.State == ACTIVE) || (i.State == LEAVING) || (i.State == PENDING)
171+
case Read:
172+
healthy = (i.State == ACTIVE) || (i.State == LEAVING) || (i.State == PENDING)
163173

164-
case Reporting:
165-
healthy = true
174+
case Reporting:
175+
healthy = true
176+
}
166177
}
167178

168179
return healthy

pkg/ring/ring.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,22 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
215215
// is increased for the key. Dead ingesters will be filtered later by
216216
// replication_strategy.go. Filtering later means that we can calculate
217217
// a healthiness quorum.
218+
if !ingester.Incremental {
219+
if op == Write && ingester.State != ACTIVE {
220+
n++
221+
} else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) {
222+
n++
223+
}
224+
} else {
225+
// If the ingester is incrementally transferring tokens, its current
226+
// state is just informational and is used to inform users what phase
227+
// the transfer is in. Incremental transfers only disallow writing in
228+
// the PENDING state.
229+
if op == Write && ingester.State == PENDING {
230+
n++
231+
}
232+
}
233+
218234
if !ingester.IsHealthyState(op) {
219235
n++
220236
}

0 commit comments

Comments
 (0)