Skip to content

Commit 80c8952

Browse files
author
Thor
committed
moved transfer function to common wrapper
Signed-off-by: Thor <[email protected]>
1 parent 474d4b5 commit 80c8952

File tree

1 file changed

+124
-124
lines changed

1 file changed

+124
-124
lines changed

pkg/ingester/transfer.go

Lines changed: 124 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -56,93 +56,74 @@ func init() {
5656

5757
// TransferChunks receives all the chunks from another ingester.
5858
func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) error {
59-
// Enter JOINING state (only valid from PENDING)
60-
if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil {
61-
return errors.Wrap(err, "TransferChunks: ChangeState")
62-
}
59+
fromIngesterID := ""
60+
seriesReceived := 0
61+
xfer := func() error {
62+
userStates := newUserStates(i.limits, i.cfg)
6363

64-
// The ingesters state effectively works as a giant mutex around this whole
65-
// method, and as such we have to ensure we unlock the mutex.
66-
defer func() {
67-
state := i.lifecycler.GetState()
68-
if i.lifecycler.GetState() == ring.ACTIVE {
69-
return
70-
}
64+
for {
65+
wireSeries, err := stream.Recv()
66+
if err == io.EOF {
67+
break
68+
}
69+
if err != nil {
70+
return errors.Wrap(err, "TransferChunks: Recv")
71+
}
7172

72-
level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)
73+
// We can't send "extra" fields with a streaming call, so we repeat
74+
// wireSeries.FromIngesterId and assume it is the same every time
75+
// round this loop.
76+
if fromIngesterID == "" {
77+
fromIngesterID = wireSeries.FromIngesterId
78+
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
79+
}
80+
descs, err := fromWireChunks(wireSeries.Chunks)
81+
if err != nil {
82+
return errors.Wrap(err, "TransferChunks: fromWireChunks")
83+
}
7384

74-
// Enter PENDING state (only valid from JOINING)
75-
if i.lifecycler.GetState() == ring.JOINING {
76-
if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil {
77-
level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
78-
os.Exit(1)
85+
state, fp, series, err := userStates.getOrCreateSeries(stream.Context(), wireSeries.UserId, wireSeries.Labels)
86+
if err != nil {
87+
return errors.Wrapf(err, "TransferChunks: getOrCreateSeries: user %s series %s", wireSeries.UserId, wireSeries.Labels)
7988
}
80-
}
81-
}()
89+
prevNumChunks := len(series.chunkDescs)
8290

83-
userStates := newUserStates(i.limits, i.cfg)
84-
fromIngesterID := ""
85-
seriesReceived := 0
91+
err = series.setChunks(descs)
92+
state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries
93+
if err != nil {
94+
return errors.Wrapf(err, "TransferChunks: setChunks: user %s series %s", wireSeries.UserId, wireSeries.Labels)
95+
}
8696

87-
for {
88-
wireSeries, err := stream.Recv()
89-
if err == io.EOF {
90-
break
91-
}
92-
if err != nil {
93-
return errors.Wrap(err, "TransferChunks: Recv")
97+
seriesReceived++
98+
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
99+
receivedChunks.Add(float64(len(descs)))
94100
}
95101

96-
// We can't send "extra" fields with a streaming call, so we repeat
97-
// wireSeries.FromIngesterId and assume it is the same every time
98-
// round this loop.
99-
if fromIngesterID == "" {
100-
fromIngesterID = wireSeries.FromIngesterId
101-
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
102-
}
103-
descs, err := fromWireChunks(wireSeries.Chunks)
104-
if err != nil {
105-
return errors.Wrap(err, "TransferChunks: fromWireChunks")
102+
if seriesReceived == 0 {
103+
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
104+
return fmt.Errorf("TransferChunks: no series")
106105
}
107106

108-
state, fp, series, err := userStates.getOrCreateSeries(stream.Context(), wireSeries.UserId, wireSeries.Labels)
109-
if err != nil {
110-
return errors.Wrapf(err, "TransferChunks: getOrCreateSeries: user %s series %s", wireSeries.UserId, wireSeries.Labels)
107+
if fromIngesterID == "" {
108+
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
109+
return fmt.Errorf("no ingester id")
111110
}
112-
prevNumChunks := len(series.chunkDescs)
113111

114-
err = series.setChunks(descs)
115-
state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries
116-
if err != nil {
117-
return errors.Wrapf(err, "TransferChunks: setChunks: user %s series %s", wireSeries.UserId, wireSeries.Labels)
112+
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
113+
return errors.Wrap(err, "TransferChunks: ClaimTokensFor")
118114
}
119115

120-
seriesReceived++
121-
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
122-
receivedChunks.Add(float64(len(descs)))
123-
}
116+
i.userStatesMtx.Lock()
117+
defer i.userStatesMtx.Unlock()
124118

125-
if seriesReceived == 0 {
126-
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
127-
return fmt.Errorf("TransferChunks: no series")
128-
}
119+
i.userStates = userStates
129120

130-
if fromIngesterID == "" {
131-
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
132-
return fmt.Errorf("no ingester id")
133-
}
134-
135-
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
136-
return errors.Wrap(err, "TransferChunks: ClaimTokensFor")
121+
return nil
137122
}
138123

139-
i.userStatesMtx.Lock()
140-
defer i.userStatesMtx.Unlock()
141-
142-
if err := i.lifecycler.ChangeState(stream.Context(), ring.ACTIVE); err != nil {
143-
return errors.Wrap(err, "TransferChunks: ChangeState")
124+
if err := i.transfer(stream.Context(), xfer); err != nil {
125+
return err
144126
}
145-
i.userStates = userStates
146127

147128
// Close the stream last, as this is what tells the "from" ingester that
148129
// it's OK to shut down.
@@ -151,13 +132,13 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
151132
return err
152133
}
153134
level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
135+
154136
return nil
155137
}
156138

157-
// TransferTSDB receives all the file chunks from another ingester, and writes them to tsdb directories
158-
func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error {
139+
func (i *Ingester) transfer(ctx context.Context, xfer func() error) error {
159140
// Enter JOINING state (only valid from PENDING)
160-
if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil {
141+
if err := i.lifecycler.ChangeState(ctx, ring.JOINING); err != nil {
161142
return err
162143
}
163144

@@ -169,93 +150,112 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error
169150
return
170151
}
171152

172-
level.Error(util.Logger).Log("msg", "TransferTSDB failed, not in ACTIVE state.", "state", state)
153+
level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)
173154

174155
// Enter PENDING state (only valid from JOINING)
175156
if i.lifecycler.GetState() == ring.JOINING {
176-
if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil {
177-
level.Error(util.Logger).Log("msg", "error rolling back failed TransferTSDB", "err", err)
157+
if err := i.lifecycler.ChangeState(ctx, ring.PENDING); err != nil {
158+
level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
178159
os.Exit(1)
179160
}
180161
}
181162
}()
182163

183-
filesXfer := 0
164+
if err := xfer(); err != nil {
165+
return err
166+
}
167+
168+
if err := i.lifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
169+
return errors.Wrap(err, "Transfer: ChangeState")
170+
}
171+
172+
return nil
173+
}
174+
175+
// TransferTSDB receives all the file chunks from another ingester, and writes them to tsdb directories
176+
func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error {
184177
fromIngesterID := ""
185178

186-
files := make(map[string]*os.File)
187-
defer func() {
188-
for _, f := range files {
189-
if err := f.Close(); err != nil {
190-
level.Warn(util.Logger).Log("msg", "failed to close xfer file", "err", err)
191-
}
192-
}
193-
}()
194-
for {
195-
f, err := stream.Recv()
196-
if err == io.EOF {
197-
break
198-
}
199-
if err != nil {
200-
return err
201-
}
202-
if fromIngesterID == "" {
203-
fromIngesterID = f.FromIngesterId
204-
level.Info(util.Logger).Log("msg", "processing TransferTSDB request", "from_ingester", fromIngesterID)
205-
}
206-
filesXfer++
179+
xfer := func() error {
180+
filesXfer := 0
207181

208-
// TODO(thor) To avoid corruption from errors, it's probably best to write to a temp dir, and then move that to the final location
209-
createfile := func(f *client.TimeSeriesFile) (*os.File, error) {
210-
dir := filepath.Join(i.V2.TSDBDir, filepath.Dir(f.Filename))
211-
if err := os.MkdirAll(dir, 0777); err != nil {
212-
return nil, err
182+
files := make(map[string]*os.File)
183+
defer func() {
184+
for _, f := range files {
185+
if err := f.Close(); err != nil {
186+
level.Warn(util.Logger).Log("msg", "failed to close xfer file", "err", err)
187+
}
188+
}
189+
}()
190+
for {
191+
f, err := stream.Recv()
192+
if err == io.EOF {
193+
break
213194
}
214-
file, err := os.Create(filepath.Join(i.V2.TSDBDir, f.Filename))
215195
if err != nil {
216-
return nil, err
196+
return errors.Wrap(err, "TransferTSDB: Recv")
197+
}
198+
if fromIngesterID == "" {
199+
fromIngesterID = f.FromIngesterId
200+
level.Info(util.Logger).Log("msg", "processing TransferTSDB request", "from_ingester", fromIngesterID)
217201
}
202+
filesXfer++
218203

219-
_, err = file.Write(f.Data)
220-
return file, err
221-
}
204+
// TODO(thor) To avoid corruption from errors, it's probably best to write to a temp dir, and then move that to the final location
205+
createfile := func(f *client.TimeSeriesFile) (*os.File, error) {
206+
dir := filepath.Join(i.V2.TSDBDir, filepath.Dir(f.Filename))
207+
if err := os.MkdirAll(dir, 0777); err != nil {
208+
return nil, errors.Wrap(err, "TransferTSDB: MkdirAll")
209+
}
210+
file, err := os.Create(filepath.Join(i.V2.TSDBDir, f.Filename))
211+
if err != nil {
212+
return nil, errors.Wrap(err, "TransferTSDB: Create")
213+
}
222214

223-
// Create or get existing open file
224-
file, ok := files[f.Filename]
225-
if !ok {
226-
file, err = createfile(f)
227-
if err != nil {
228-
return err
215+
_, err = file.Write(f.Data)
216+
return file, errors.Wrap(err, "TransferTSDB: Write")
229217
}
230218

231-
files[f.Filename] = file
232-
} else {
219+
// Create or get existing open file
220+
file, ok := files[f.Filename]
221+
if !ok {
222+
file, err = createfile(f)
223+
if err != nil {
224+
return err
225+
}
226+
227+
files[f.Filename] = file
228+
} else {
233229

234-
// Write to existing file
235-
if _, err := file.Write(f.Data); err != nil {
236-
return err
230+
// Write to existing file
231+
if _, err := file.Write(f.Data); err != nil {
232+
return errors.Wrap(err, "TransferTSDB: Write")
233+
}
237234
}
238235
}
239-
}
240236

241-
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
242-
return err
237+
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
238+
return errors.Wrap(err, "TransferTSDB: ClaimTokensFor")
239+
}
240+
241+
receivedFiles.Add(float64(filesXfer))
242+
level.Error(util.Logger).Log("msg", "Total files xfer", "from_ingester", fromIngesterID, "num", filesXfer)
243+
244+
return nil
243245
}
244246

245-
if err := i.lifecycler.ChangeState(stream.Context(), ring.ACTIVE); err != nil {
247+
if err := i.transfer(stream.Context(), xfer); err != nil {
246248
return err
247249
}
248250

249-
receivedFiles.Add(float64(filesXfer))
250-
level.Error(util.Logger).Log("msg", "Total files xfer", "from_ingester", fromIngesterID, "num", filesXfer)
251-
252251
// Close the stream last, as this is what tells the "from" ingester that
253252
// it's OK to shut down.
254253
if err := stream.SendAndClose(&client.TransferTSDBResponse{}); err != nil {
255254
level.Error(util.Logger).Log("msg", "Error closing TransferTSDB stream", "from_ingester", fromIngesterID, "err", err)
256255
return err
257256
}
258257
level.Info(util.Logger).Log("msg", "Successfully transferred tsdbs", "from_ingester", fromIngesterID)
258+
259259
return nil
260260
}
261261

0 commit comments

Comments
 (0)