@@ -456,9 +456,10 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
456
456
s .matchingEngine .config .RangeSize = rangeSize // override to low number for the test
457
457
458
458
dispatchTTL := time .Nanosecond
459
+ dPtr := _maxDispatchDefault
459
460
mgr := newTaskListManagerWithRateLimiter (
460
461
s .matchingEngine , tlID , s .matchingEngine .config ,
461
- newRateLimiter (& _maxDispatchDefault , dispatchTTL ),
462
+ newRateLimiter (& dPtr , dispatchTTL ),
462
463
)
463
464
s .matchingEngine .updateTaskList (tlID , mgr )
464
465
s .taskManager .getTaskListManager (tlID ).rangeID = initialRangeID
@@ -498,15 +499,14 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
498
499
}, nil )
499
500
500
501
zeroDispatchCt := 0
501
- var maxDispatch float64
502
502
for i := int64 (0 ); i < taskCount ; i ++ {
503
503
scheduleID := i * 3
504
504
505
505
var wg sync.WaitGroup
506
506
507
507
var result * workflow.PollForActivityTaskResponse
508
508
var pollErr error
509
- maxDispatch = float64 (i )
509
+ maxDispatch : = float64 (i )
510
510
if i % 2 == 0 {
511
511
maxDispatch = 0
512
512
}
@@ -619,9 +619,10 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
619
619
tlID := & taskListID {domainID : domainID , taskListName : tl , taskType : persistence .TaskListTypeActivity }
620
620
dispatchTTL := time .Nanosecond
621
621
s .matchingEngine .config .RangeSize = rangeSize // override to low number for the test
622
+ dPtr := _maxDispatchDefault
622
623
mgr := newTaskListManagerWithRateLimiter (
623
624
s .matchingEngine , tlID , s .matchingEngine .config ,
624
- newRateLimiter (& _maxDispatchDefault , dispatchTTL ),
625
+ newRateLimiter (& dPtr , dispatchTTL ),
625
626
)
626
627
s .matchingEngine .updateTaskList (tlID , mgr )
627
628
s .taskManager .getTaskListManager (tlID ).rangeID = initialRangeID
@@ -685,12 +686,12 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
685
686
Identity : & identity ,
686
687
})}
687
688
}, nil )
688
- var maxDispatch float64
689
+ var throttleMu sync. Mutex
689
690
throttleCt := 0
690
691
for p := 0 ; p < workerCount ; p ++ {
691
- go func () {
692
+ go func (wNum int ) {
692
693
for i := int64 (0 ); i < taskCount ; {
693
- maxDispatch = dispatchLimitFn (p , i )
694
+ maxDispatch : = dispatchLimitFn (wNum , i )
694
695
result , err := s .matchingEngine .PollForActivityTask (s .callContext , & matching.PollForActivityTaskRequest {
695
696
DomainUUID : common .StringPtr (domainID ),
696
697
PollRequest : & workflow.PollForActivityTaskRequest {
@@ -701,7 +702,9 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
701
702
})
702
703
if err != nil {
703
704
s .Contains (err .Error (), "ServiceBusyError" )
705
+ throttleMu .Lock ()
704
706
throttleCt ++
707
+ throttleMu .Unlock ()
705
708
i ++
706
709
continue
707
710
}
@@ -729,7 +732,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
729
732
i ++
730
733
}
731
734
wg .Done ()
732
- }()
735
+ }(p )
733
736
}
734
737
wg .Wait ()
735
738
totalTasks := int (taskCount ) * workerCount
0 commit comments