Skip to content

Commit 58a3ad2

Browse files
authored
Add taskList throttling to allow users to limit activities executed per second (#432)
1 parent aa29ebb commit 58a3ad2

File tree

6 files changed

+370
-83
lines changed

6 files changed

+370
-83
lines changed

common/metrics/defs.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,12 +591,13 @@ const (
591591
MatchingFailures = iota + NumCommonMetrics
592592
PollSuccessCounter
593593
PollTimeoutCounter
594-
PollErrorsCounter
595594
PollSuccessWithSyncCounter
596595
LeaseRequestCounter
597596
LeaseFailureCounter
598597
ConditionFailedErrorCounter
599598
RespondQueryTaskFailedCounter
599+
SyncThrottleCounter
600+
BufferThrottleCounter
600601
)
601602

602603
// MetricDefs record the metrics for all services
@@ -673,12 +674,13 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
673674
MatchingFailures: {metricName: "matching.errors", metricType: Counter},
674675
PollSuccessCounter: {metricName: "poll.success"},
675676
PollTimeoutCounter: {metricName: "poll.timeouts"},
676-
PollErrorsCounter: {metricName: "poll.errors"},
677677
PollSuccessWithSyncCounter: {metricName: "poll.success.sync"},
678678
LeaseRequestCounter: {metricName: "lease.requests"},
679679
LeaseFailureCounter: {metricName: "lease.failures"},
680680
ConditionFailedErrorCounter: {metricName: "condition-failed-errors"},
681681
RespondQueryTaskFailedCounter: {metricName: "respond-query-failed"},
682+
SyncThrottleCounter: {metricName: "sync.throttle.count"},
683+
BufferThrottleCounter: {metricName: "buffer.throttle.count"},
682684
},
683685
}
684686

service/matching/matchingEngine.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ func (e *matchingEngineImpl) Stop() {
129129
}
130130

131131
func (e *matchingEngineImpl) getTaskLists(maxCount int) (lists []taskListManager) {
132-
e.taskListsLock.Lock()
132+
e.taskListsLock.RLock()
133+
defer e.taskListsLock.RUnlock()
133134
lists = make([]taskListManager, 0, len(e.taskLists))
134135
count := 0
135136
for _, tlMgr := range e.taskLists {
@@ -139,7 +140,6 @@ func (e *matchingEngineImpl) getTaskLists(maxCount int) (lists []taskListManager
139140
break
140141
}
141142
}
142-
e.taskListsLock.Unlock()
143143
return
144144
}
145145

@@ -153,20 +153,24 @@ func (e *matchingEngineImpl) String() string {
153153
return r
154154
}
155155

156-
// Returns taskListManager for a task list. If not already cached gets new range from DB and if successful creates one.
156+
// Returns taskListManager for a task list. If not already cached gets new range from DB and
157+
// if successful creates one.
157158
func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListManager, error) {
159+
// The first check is an optimization so almost all requests will have a task list manager
160+
// and return avoiding the write lock
158161
e.taskListsLock.RLock()
159162
if result, ok := e.taskLists[*taskList]; ok {
160163
e.taskListsLock.RUnlock()
161164
return result, nil
162165
}
163166
e.taskListsLock.RUnlock()
164-
mgr := newTaskListManager(e, taskList, e.config)
167+
// If it gets here, write lock and check again in case a task list is created between the two locks
165168
e.taskListsLock.Lock()
166169
if result, ok := e.taskLists[*taskList]; ok {
167170
e.taskListsLock.Unlock()
168171
return result, nil
169172
}
173+
mgr := newTaskListManager(e, taskList, e.config)
170174
e.taskLists[*taskList] = mgr
171175
e.taskListsLock.Unlock()
172176
logging.LogTaskListLoadingEvent(e.logger, taskList.taskListName, taskList.taskType)
@@ -179,6 +183,13 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListM
179183
return mgr, nil
180184
}
181185

186+
// For use in tests
187+
func (e *matchingEngineImpl) updateTaskList(taskList *taskListID, mgr taskListManager) {
188+
e.taskListsLock.Lock()
189+
defer e.taskListsLock.Unlock()
190+
e.taskLists[*taskList] = mgr
191+
}
192+
182193
func (e *matchingEngineImpl) removeTaskListManager(id *taskListID) {
183194
e.taskListsLock.Lock()
184195
defer e.taskListsLock.Unlock()
@@ -247,7 +258,7 @@ pollLoop:
247258
// long-poll when frontend calls CancelOutstandingPoll API
248259
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID)
249260
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
250-
tCtx, err := e.getTask(pollerCtx, taskList)
261+
tCtx, err := e.getTask(pollerCtx, taskList, nil)
251262
if err != nil {
252263
// TODO: Is empty poll the best reply for errPumpClosed?
253264
if err == ErrNoTasks || err == errPumpClosed {
@@ -341,10 +352,14 @@ pollLoop:
341352
}
342353

343354
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
355+
var maxDispatch *float64
356+
if request.TaskListMetadata != nil {
357+
maxDispatch = request.TaskListMetadata.MaxTasksPerSecond
358+
}
344359
// Add frontend generated pollerID to context so tasklistMgr can support cancellation of
345360
// long-poll when frontend calls CancelOutstandingPoll API
346361
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID)
347-
tCtx, err := e.getTask(pollerCtx, taskList)
362+
tCtx, err := e.getTask(pollerCtx, taskList, maxDispatch)
348363
if err != nil {
349364
// TODO: Is empty poll the best reply for errPumpClosed?
350365
if err == ErrNoTasks || err == errPumpClosed {
@@ -450,12 +465,14 @@ func (e *matchingEngineImpl) CancelOutstandingPoll(ctx context.Context, request
450465
}
451466

452467
// Loads a task from persistence and wraps it in a task context
453-
func (e *matchingEngineImpl) getTask(ctx context.Context, taskList *taskListID) (*taskContext, error) {
468+
func (e *matchingEngineImpl) getTask(
469+
ctx context.Context, taskList *taskListID, maxDispatchPerSecond *float64,
470+
) (*taskContext, error) {
454471
tlMgr, err := e.getTaskListManager(taskList)
455472
if err != nil {
456473
return nil, err
457474
}
458-
return tlMgr.GetTaskContext(ctx)
475+
return tlMgr.GetTaskContext(ctx, maxDispatchPerSecond)
459476
}
460477

461478
func (e *matchingEngineImpl) unloadTaskList(id *taskListID) {

0 commit comments

Comments
 (0)