Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 27e49cd

Browse files
zeripathtechknowlogick
andauthoredFeb 28, 2023
Properly flush unique queues on startup (#23154)
There have been a number of reports of PRs being blocked whilst being checked which have been difficult to debug. In investigating #23050 I have realised that whilst the Warn there is somewhat of a miscall there was a real bug in the way that the LevelUniqueQueue was being restored on start-up of the PersistableChannelUniqueQueue. Next there is a conflict in the setting of the internal leveldb queue name - This wasn't being set so it was being overridden by other unique queues. This PR fixes these bugs and adds a testcase. Thanks to @brechtvl for noticing the second issue. Fix #23050 and others --------- Signed-off-by: Andrew Thornton <[email protected]> Co-authored-by: techknowlogick <[email protected]>
1 parent 04347eb commit 27e49cd

7 files changed

+332
-21
lines changed
 

‎modules/queue/queue_channel.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,10 @@ func (q *ChannelQueue) Shutdown() {
124124
log.Trace("ChannelQueue: %s Flushing", q.name)
125125
// We can't use Cleanup here because that will close the channel
126126
if err := q.FlushWithContext(q.terminateCtx); err != nil {
127-
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
127+
count := atomic.LoadInt64(&q.numInQueue)
128+
if count > 0 {
129+
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
130+
}
128131
return
129132
}
130133
log.Debug("ChannelQueue: %s Flushed", q.name)

‎modules/queue/queue_disk_channel.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
9494
},
9595
Workers: 0,
9696
},
97-
DataDir: config.DataDir,
97+
DataDir: config.DataDir,
98+
QueueName: config.Name + "-level",
9899
}
99100

100101
levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
@@ -172,16 +173,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
172173
atShutdown(q.Shutdown)
173174
atTerminate(q.Terminate)
174175

175-
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
176+
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 {
176177
// Just run the level queue - we shut it down once it's flushed
177178
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
178179
go func() {
179-
for !q.IsEmpty() {
180-
_ = q.internal.Flush(0)
180+
for !lq.IsEmpty() {
181+
_ = lq.Flush(0)
181182
select {
182183
case <-time.After(100 * time.Millisecond):
183-
case <-q.internal.(*LevelQueue).shutdownCtx.Done():
184-
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
184+
case <-lq.shutdownCtx.Done():
185+
if lq.byteFIFO.Len(lq.terminateCtx) > 0 {
186+
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
187+
}
185188
return
186189
}
187190
}
@@ -316,10 +319,22 @@ func (q *PersistableChannelQueue) Shutdown() {
316319
// Redirect all remaining data in the chan to the internal channel
317320
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
318321
close(q.channelQueue.dataChan)
322+
countOK, countLost := 0, 0
319323
for data := range q.channelQueue.dataChan {
320-
_ = q.internal.Push(data)
324+
err := q.internal.Push(data)
325+
if err != nil {
326+
log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
327+
countLost++
328+
} else {
329+
countOK++
330+
}
321331
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
322332
}
333+
if countLost > 0 {
334+
log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
335+
} else if countOK > 0 {
336+
log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
337+
}
323338
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
324339

325340
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)

‎modules/queue/queue_disk_channel_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestPersistableChannelQueue(t *testing.T) {
3939
Workers: 1,
4040
BoostWorkers: 0,
4141
MaxWorkers: 10,
42-
Name: "first",
42+
Name: "test-queue",
4343
}, &testData{})
4444
assert.NoError(t, err)
4545

@@ -135,7 +135,7 @@ func TestPersistableChannelQueue(t *testing.T) {
135135
Workers: 1,
136136
BoostWorkers: 0,
137137
MaxWorkers: 10,
138-
Name: "second",
138+
Name: "test-queue",
139139
}, &testData{})
140140
assert.NoError(t, err)
141141

@@ -227,7 +227,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
227227
Workers: 1,
228228
BoostWorkers: 0,
229229
MaxWorkers: 10,
230-
Name: "first",
230+
Name: "test-queue",
231231
}, &testData{})
232232
assert.NoError(t, err)
233233

@@ -433,7 +433,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
433433
Workers: 1,
434434
BoostWorkers: 0,
435435
MaxWorkers: 10,
436-
Name: "second",
436+
Name: "test-queue",
437437
}, &testData{})
438438
assert.NoError(t, err)
439439
pausable, ok = queue.(Pausable)

‎modules/queue/unique_queue_channel.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,9 @@ func (q *ChannelUniqueQueue) Shutdown() {
177177
go func() {
178178
log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
179179
if err := q.FlushWithContext(q.terminateCtx); err != nil {
180-
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
180+
if !q.IsEmpty() {
181+
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
182+
}
181183
return
182184
}
183185
log.Debug("ChannelUniqueQueue: %s Flushed", q.name)

‎modules/queue/unique_queue_channel_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@ import (
88
"testing"
99
"time"
1010

11+
"code.gitea.io/gitea/modules/log"
12+
1113
"github.com/stretchr/testify/assert"
1214
)
1315

1416
func TestChannelUniqueQueue(t *testing.T) {
17+
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
1518
handleChan := make(chan *testData)
1619
handle := func(data ...Data) []Data {
1720
for _, datum := range data {
@@ -52,6 +55,8 @@ func TestChannelUniqueQueue(t *testing.T) {
5255
}
5356

5457
func TestChannelUniqueQueue_Batch(t *testing.T) {
58+
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
59+
5560
handleChan := make(chan *testData)
5661
handle := func(data ...Data) []Data {
5762
for _, datum := range data {
@@ -98,6 +103,8 @@ func TestChannelUniqueQueue_Batch(t *testing.T) {
98103
}
99104

100105
func TestChannelUniqueQueue_Pause(t *testing.T) {
106+
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
107+
101108
lock := sync.Mutex{}
102109
var queue Queue
103110
var err error

‎modules/queue/unique_queue_disk_channel.go

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
9494
},
9595
Workers: 0,
9696
},
97-
DataDir: config.DataDir,
97+
DataDir: config.DataDir,
98+
QueueName: config.Name + "-level",
9899
}
99100

100101
queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
@@ -209,17 +210,29 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
209210
atTerminate(q.Terminate)
210211
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
211212

212-
if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
213+
if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
213214
// Just run the level queue - we shut it down once it's flushed
214-
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
215+
go luq.Run(func(_ func()) {}, func(_ func()) {})
215216
go func() {
216-
_ = q.internal.Flush(0)
217-
log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name())
218-
q.internal.(*LevelUniqueQueue).Shutdown()
219-
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
217+
_ = luq.Flush(0)
218+
for !luq.IsEmpty() {
219+
_ = luq.Flush(0)
220+
select {
221+
case <-time.After(100 * time.Millisecond):
222+
case <-luq.shutdownCtx.Done():
223+
if luq.byteFIFO.Len(luq.terminateCtx) > 0 {
224+
log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
225+
}
226+
return
227+
}
228+
}
229+
log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
230+
luq.Shutdown()
231+
GetManager().Remove(luq.qid)
220232
}()
221233
} else {
222234
log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
235+
_ = q.internal.Flush(0)
223236
q.internal.(*LevelUniqueQueue).Shutdown()
224237
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
225238
}
@@ -285,8 +298,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
285298
// Redirect all remaining data in the chan to the internal channel
286299
close(q.channelQueue.dataChan)
287300
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
301+
countOK, countLost := 0, 0
288302
for data := range q.channelQueue.dataChan {
289-
_ = q.internal.Push(data)
303+
err := q.internal.(*LevelUniqueQueue).Push(data)
304+
if err != nil {
305+
log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
306+
countLost++
307+
} else {
308+
countOK++
309+
}
310+
}
311+
if countLost > 0 {
312+
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
313+
} else if countOK > 0 {
314+
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
290315
}
291316
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
292317

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package queue
5+
6+
import (
7+
"fmt"
8+
"strconv"
9+
"sync"
10+
"testing"
11+
"time"
12+
13+
"code.gitea.io/gitea/modules/log"
14+
15+
"github.com/stretchr/testify/assert"
16+
)
17+
18+
func TestPersistableChannelUniqueQueue(t *testing.T) {
19+
tmpDir := t.TempDir()
20+
fmt.Printf("TempDir %s\n", tmpDir)
21+
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
22+
23+
// Common function to create the Queue
24+
newQueue := func(name string, handle func(data ...Data) []Data) Queue {
25+
q, err := NewPersistableChannelUniqueQueue(handle,
26+
PersistableChannelUniqueQueueConfiguration{
27+
Name: name,
28+
DataDir: tmpDir,
29+
QueueLength: 200,
30+
MaxWorkers: 1,
31+
BlockTimeout: 1 * time.Second,
32+
BoostTimeout: 5 * time.Minute,
33+
BoostWorkers: 1,
34+
Workers: 0,
35+
}, "task-0")
36+
assert.NoError(t, err)
37+
return q
38+
}
39+
40+
// runs the provided queue and provides some timer function
41+
type channels struct {
42+
readyForShutdown chan struct{} // closed when shutdown functions have been assigned
43+
readyForTerminate chan struct{} // closed when terminate functions have been assigned
44+
signalShutdown chan struct{} // Should close to signal shutdown
45+
doneShutdown chan struct{} // closed when shutdown function is done
46+
queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock
47+
}
48+
runQueue := func(q Queue, lock *sync.Mutex) *channels {
49+
chans := &channels{
50+
readyForShutdown: make(chan struct{}),
51+
readyForTerminate: make(chan struct{}),
52+
signalShutdown: make(chan struct{}),
53+
doneShutdown: make(chan struct{}),
54+
}
55+
go q.Run(func(atShutdown func()) {
56+
go func() {
57+
lock.Lock()
58+
select {
59+
case <-chans.readyForShutdown:
60+
default:
61+
close(chans.readyForShutdown)
62+
}
63+
lock.Unlock()
64+
<-chans.signalShutdown
65+
atShutdown()
66+
close(chans.doneShutdown)
67+
}()
68+
}, func(atTerminate func()) {
69+
lock.Lock()
70+
defer lock.Unlock()
71+
select {
72+
case <-chans.readyForTerminate:
73+
default:
74+
close(chans.readyForTerminate)
75+
}
76+
chans.queueTerminate = append(chans.queueTerminate, atTerminate)
77+
})
78+
79+
return chans
80+
}
81+
82+
// call to shutdown and terminate the queue associated with the channels
83+
doTerminate := func(chans *channels, lock *sync.Mutex) {
84+
<-chans.readyForTerminate
85+
86+
lock.Lock()
87+
callbacks := []func(){}
88+
callbacks = append(callbacks, chans.queueTerminate...)
89+
lock.Unlock()
90+
91+
for _, callback := range callbacks {
92+
callback()
93+
}
94+
}
95+
96+
mapLock := sync.Mutex{}
97+
executedInitial := map[string][]string{}
98+
hasInitial := map[string][]string{}
99+
100+
fillQueue := func(name string, done chan struct{}) {
101+
t.Run("Initial Filling: "+name, func(t *testing.T) {
102+
lock := sync.Mutex{}
103+
104+
startAt100Queued := make(chan struct{})
105+
stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
106+
107+
handle := func(data ...Data) []Data {
108+
<-startAt100Queued
109+
for _, datum := range data {
110+
s := datum.(string)
111+
mapLock.Lock()
112+
executedInitial[name] = append(executedInitial[name], s)
113+
mapLock.Unlock()
114+
if s == "task-20" {
115+
close(stopAt20Shutdown)
116+
}
117+
}
118+
return nil
119+
}
120+
121+
q := newQueue(name, handle)
122+
123+
// add 100 tasks to the queue
124+
for i := 0; i < 100; i++ {
125+
_ = q.Push("task-" + strconv.Itoa(i))
126+
}
127+
close(startAt100Queued)
128+
129+
chans := runQueue(q, &lock)
130+
131+
<-chans.readyForShutdown
132+
<-stopAt20Shutdown
133+
close(chans.signalShutdown)
134+
<-chans.doneShutdown
135+
_ = q.Push("final")
136+
137+
// check which tasks are still in the queue
138+
for i := 0; i < 100; i++ {
139+
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
140+
mapLock.Lock()
141+
hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i))
142+
mapLock.Unlock()
143+
}
144+
}
145+
if has, _ := q.(UniqueQueue).Has("final"); has {
146+
mapLock.Lock()
147+
hasInitial[name] = append(hasInitial[name], "final")
148+
mapLock.Unlock()
149+
} else {
150+
assert.Fail(t, "UnqueQueue %s should have \"final\"", name)
151+
}
152+
doTerminate(chans, &lock)
153+
mapLock.Lock()
154+
assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
155+
mapLock.Unlock()
156+
})
157+
close(done)
158+
}
159+
160+
doneA := make(chan struct{})
161+
doneB := make(chan struct{})
162+
163+
go fillQueue("QueueA", doneA)
164+
go fillQueue("QueueB", doneB)
165+
166+
<-doneA
167+
<-doneB
168+
169+
executedEmpty := map[string][]string{}
170+
hasEmpty := map[string][]string{}
171+
emptyQueue := func(name string, done chan struct{}) {
172+
t.Run("Empty Queue: "+name, func(t *testing.T) {
173+
lock := sync.Mutex{}
174+
stop := make(chan struct{})
175+
176+
// collect the tasks that have been executed
177+
handle := func(data ...Data) []Data {
178+
lock.Lock()
179+
for _, datum := range data {
180+
mapLock.Lock()
181+
executedEmpty[name] = append(executedEmpty[name], datum.(string))
182+
mapLock.Unlock()
183+
if datum.(string) == "final" {
184+
close(stop)
185+
}
186+
}
187+
lock.Unlock()
188+
return nil
189+
}
190+
191+
q := newQueue(name, handle)
192+
chans := runQueue(q, &lock)
193+
194+
<-chans.readyForShutdown
195+
<-stop
196+
close(chans.signalShutdown)
197+
<-chans.doneShutdown
198+
199+
// check which tasks are still in the queue
200+
for i := 0; i < 100; i++ {
201+
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
202+
mapLock.Lock()
203+
hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i))
204+
mapLock.Unlock()
205+
}
206+
}
207+
doTerminate(chans, &lock)
208+
209+
mapLock.Lock()
210+
assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name]))
211+
assert.Equal(t, 0, len(hasEmpty[name]))
212+
mapLock.Unlock()
213+
})
214+
close(done)
215+
}
216+
217+
doneA = make(chan struct{})
218+
doneB = make(chan struct{})
219+
220+
go emptyQueue("QueueA", doneA)
221+
go emptyQueue("QueueB", doneB)
222+
223+
<-doneA
224+
<-doneB
225+
226+
mapLock.Lock()
227+
t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
228+
len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
229+
230+
// reset and rerun
231+
executedInitial = map[string][]string{}
232+
hasInitial = map[string][]string{}
233+
executedEmpty = map[string][]string{}
234+
hasEmpty = map[string][]string{}
235+
mapLock.Unlock()
236+
237+
doneA = make(chan struct{})
238+
doneB = make(chan struct{})
239+
240+
go fillQueue("QueueA", doneA)
241+
go fillQueue("QueueB", doneB)
242+
243+
<-doneA
244+
<-doneB
245+
246+
doneA = make(chan struct{})
247+
doneB = make(chan struct{})
248+
249+
go emptyQueue("QueueA", doneA)
250+
go emptyQueue("QueueB", doneB)
251+
252+
<-doneA
253+
<-doneB
254+
255+
mapLock.Lock()
256+
t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
257+
len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
258+
mapLock.Unlock()
259+
}

0 commit comments

Comments
 (0)
Please sign in to comment.