Skip to content

Commit ddabb51

Browse files
authored
Revert "Fix Worker Request flow to properly use batching (#775)" (#785)
This reverts commit 3b0c92f.
1 parent bcfba90 commit ddabb51

File tree

3 files changed

+2
-11
lines changed

3 files changed

+2
-11
lines changed

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2502,7 +2502,6 @@ public int scaleStage(
25022502
getJobId(), getJobState()));
25032503
throw new RuntimeException(error);
25042504
}
2505-
List<IMantisWorkerMetadata> workerRequests = new ArrayList<>();
25062505
if (newNumWorkerCount > oldNumWorkers) {
25072506
for (int i = 0; i < newNumWorkerCount - oldNumWorkers; i++) {
25082507
try {
@@ -2511,17 +2510,14 @@ public int scaleStage(
25112510
IMantisWorkerMetadata workerRequest = addWorker(schedInfo, stageMetaData.getStageNum(),
25122511
newWorkerIndex);
25132512
jobStore.storeNewWorker(workerRequest);
2514-
markStageAssignmentsChanged(true);
2515-
workerRequests.add(workerRequest);
2513+
queueTask(workerRequest);
25162514
} catch (Exception e) {
25172515
// creating a worker failed but expected no of workers was set successfully,
25182516
// during heartbeat check we will
25192517
// retry launching this worker
25202518
LOGGER.warn("Exception adding new worker for {}", stageMetaData.getJobId().getId(), e);
25212519
}
25222520
}
2523-
//one request to provision all new workers
2524-
queueTasks(workerRequests, empty());
25252521
} else {
25262522
// potential bulk removal opportunity?
25272523
for (int i = 0; i < oldNumWorkers - newNumWorkerCount; i++) {

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,6 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
502502
}
503503
});
504504

505-
log.debug("Pending TE count by group key: {}" ,pendingCountByGroupKey);
506505
// remove jobs from pending set which have all pending workers
507506
jobIdToMachineDef.forEach((jobId, workers) -> {
508507
final JobRequirements jobStats = pendingJobRequests.getIfPresent(jobId);
@@ -567,9 +566,6 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findTaskExecutorsFor(Ta
567566
pendingJobRequests.put(request.getJobId(), new JobRequirements(request.getGroupedBySchedulingConstraints()));
568567
}
569568
}
570-
else {
571-
log.info("Not adding job {} to pending requests for {} scheduling constraints {} because there were less than 2 TE allocation requests", request.getJobId(), allocationRequests.size(), schedulingConstraints);
572-
}
573569
return Optional.empty();
574570
}
575571
}

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,7 @@ public void testJobScaleUp() throws Exception, InvalidJobException, io.mantisrx.
145145
verify(jobStoreMock, times(3)).updateJob(any());
146146

147147
// initial worker + job master and scale up worker
148-
//should be twice because it is an initial request + scale up request
149-
verify(schedulerMock, times(2)).scheduleWorkers(any());
148+
verify(schedulerMock, times(3)).scheduleWorkers(any());
150149

151150
}
152151

0 commit comments

Comments
 (0)