Skip to content

Commit cd087d4

Browse files
committed
sweepbatcher/test: fix races in require.Eventually
The code inside require.Eventually runs in parallel with the event loops of the batcher and its batches. Accessing fields of the batcher and batches must be done within an event loop. To address this, testRunInEventLoop methods were added to the Batcher and batch types. Unit tests were then rewritten to use this approach when accessing batcher and batch fields. Additionally, in many cases, receive operations from RegisterSpendChannel were moved before require.Eventually. This prevents testRunInEventLoop from getting stuck in an event loop while blocked on a RegisterSpendChannel send operation.
1 parent 34d70f9 commit cd087d4

File tree

3 files changed

+485
-228
lines changed

3 files changed

+485
-228
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ type batch struct {
215215
// reorgChan is the channel over which reorg notifications are received.
216216
reorgChan chan struct{}
217217

218+
// testReqs is a channel where test requests are received.
219+
// This is used only in unit tests! The reason to have this is to
220+
// avoid data races in require.Eventually calls running in parallel
221+
// to the event loop. See method testRunInEventLoop().
222+
testReqs chan *testRequest
223+
218224
// errChan is the channel over which errors are received.
219225
errChan chan error
220226

@@ -352,6 +358,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
352358
spendChan: make(chan *chainntnfs.SpendDetail),
353359
confChan: make(chan *chainntnfs.TxConfirmation, 1),
354360
reorgChan: make(chan struct{}, 1),
361+
testReqs: make(chan *testRequest),
355362
errChan: make(chan error, 1),
356363
callEnter: make(chan struct{}),
357364
callLeave: make(chan struct{}),
@@ -396,6 +403,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
396403
spendChan: make(chan *chainntnfs.SpendDetail),
397404
confChan: make(chan *chainntnfs.TxConfirmation, 1),
398405
reorgChan: make(chan struct{}, 1),
406+
testReqs: make(chan *testRequest),
399407
errChan: make(chan error, 1),
400408
callEnter: make(chan struct{}),
401409
callLeave: make(chan struct{}),
@@ -737,6 +745,10 @@ func (b *batch) Run(ctx context.Context) error {
737745
return err
738746
}
739747

748+
case testReq := <-b.testReqs:
749+
testReq.handler()
750+
close(testReq.quit)
751+
740752
case err := <-blockErrChan:
741753
return err
742754

@@ -749,6 +761,36 @@ func (b *batch) Run(ctx context.Context) error {
749761
}
750762
}
751763

764+
// testRunInEventLoop runs a function in the event loop blocking until
765+
// the function returns. For unit tests only!
766+
func (b *batch) testRunInEventLoop(ctx context.Context, handler func()) {
767+
// If the event loop is finished, run the function.
768+
select {
769+
case <-b.stopping:
770+
handler()
771+
772+
return
773+
default:
774+
}
775+
776+
quit := make(chan struct{})
777+
req := &testRequest{
778+
handler: handler,
779+
quit: quit,
780+
}
781+
782+
select {
783+
case b.testReqs <- req:
784+
case <-ctx.Done():
785+
return
786+
}
787+
788+
select {
789+
case <-quit:
790+
case <-ctx.Done():
791+
}
792+
}
793+
752794
// timeout returns minimum timeout as block height among sweeps of the batch.
753795
// If the batch is empty, return -1.
754796
func (b *batch) timeout() int32 {

sweepbatcher/sweep_batcher.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,16 @@ var (
225225
ErrBatcherShuttingDown = errors.New("batcher shutting down")
226226
)
227227

228+
// testRequest is a function passed to an event loop and a channel used to
229+
// wait until the function is executed. This is used in unit tests only!
230+
type testRequest struct {
231+
// handler is the function to an event loop.
232+
handler func()
233+
234+
// quit is closed when the handler completes.
235+
quit chan struct{}
236+
}
237+
228238
// Batcher is a system that is responsible for accepting sweep requests and
229239
// placing them in appropriate batches. It will spin up new batches as needed.
230240
type Batcher struct {
@@ -234,6 +244,12 @@ type Batcher struct {
234244
// sweepReqs is a channel where sweep requests are received.
235245
sweepReqs chan SweepRequest
236246

247+
// testReqs is a channel where test requests are received.
248+
// This is used only in unit tests! The reason to have this is to
249+
// avoid data races in require.Eventually calls running in parallel
250+
// to the event loop. See method testRunInEventLoop().
251+
testReqs chan *testRequest
252+
237253
// errChan is a channel where errors are received.
238254
errChan chan error
239255

@@ -461,6 +477,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
461477
return &Batcher{
462478
batches: make(map[int32]*batch),
463479
sweepReqs: make(chan SweepRequest),
480+
testReqs: make(chan *testRequest),
464481
errChan: make(chan error, 1),
465482
quit: make(chan struct{}),
466483
initDone: make(chan struct{}),
@@ -528,6 +545,10 @@ func (b *Batcher) Run(ctx context.Context) error {
528545
return err
529546
}
530547

548+
case testReq := <-b.testReqs:
549+
testReq.handler()
550+
close(testReq.quit)
551+
531552
case err := <-b.errChan:
532553
log.Warnf("Batcher received an error: %v.", err)
533554
return err
@@ -551,6 +572,36 @@ func (b *Batcher) AddSweep(sweepReq *SweepRequest) error {
551572
}
552573
}
553574

575+
// testRunInEventLoop runs a function in the event loop blocking until
576+
// the function returns. For unit tests only!
577+
func (b *Batcher) testRunInEventLoop(ctx context.Context, handler func()) {
578+
// If the event loop is finished, run the function.
579+
select {
580+
case <-b.quit:
581+
handler()
582+
583+
return
584+
default:
585+
}
586+
587+
quit := make(chan struct{})
588+
req := &testRequest{
589+
handler: handler,
590+
quit: quit,
591+
}
592+
593+
select {
594+
case b.testReqs <- req:
595+
case <-ctx.Done():
596+
return
597+
}
598+
599+
select {
600+
case <-quit:
601+
case <-ctx.Done():
602+
}
603+
}
604+
554605
// handleSweep handles a sweep request by either placing it in an existing
555606
// batch, or by spinning up a new batch for it.
556607
func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,

0 commit comments

Comments
 (0)