Skip to content

Commit 1065a1b

Browse files
committed
feat: [CI-4540]: Added step group support in CI
1 parent 6e0b86c commit 1065a1b

File tree

6 files changed

+197
-21
lines changed

6 files changed

+197
-21
lines changed

320-ci-execution/src/main/java/io/harness/ci/integrationstage/IntegrationStageUtils.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import io.harness.plancreator.stages.stage.StageElementConfig;
7070
import io.harness.plancreator.steps.ParallelStepElementConfig;
7171
import io.harness.plancreator.steps.StepElementConfig;
72+
import io.harness.plancreator.steps.StepGroupElementConfig;
7273
import io.harness.pms.contracts.plan.ExecutionTriggerInfo;
7374
import io.harness.pms.contracts.plan.PlanCreationContextValue;
7475
import io.harness.pms.contracts.plan.TriggerType;
@@ -123,6 +124,14 @@ public StepElementConfig getStepElementConfig(ExecutionWrapperConfig executionWr
123124
}
124125
}
125126

127+
public StepGroupElementConfig getStepGroupElementConfig(ExecutionWrapperConfig executionWrapperConfig) {
128+
try {
129+
return YamlUtils.read(executionWrapperConfig.getStepGroup().toString(), StepGroupElementConfig.class);
130+
} catch (Exception ex) {
131+
throw new CIStageExecutionException("Failed to deserialize ExecutionWrapperConfig step node", ex);
132+
}
133+
}
134+
126135
public CodeBase getCiCodeBase(YamlNode ciCodeBase) {
127136
try {
128137
return YamlUtils.read(ciCodeBase.toString(), CodeBase.class);
@@ -464,9 +473,9 @@ public OSType getK8OS(Infrastructure infrastructure) {
464473
return resolveOSType(k8sDirectInfraYaml.getSpec().getOs());
465474
}
466475

467-
public List<String> getStageConnectorRefs(IntegrationStageConfig integrationStageConfig) {
468-
ArrayList<String> connectorIdentifiers = new ArrayList<>();
469-
for (ExecutionWrapperConfig executionWrapper : integrationStageConfig.getExecution().getSteps()) {
476+
public void populateConnectorIdentifiers(
477+
List<ExecutionWrapperConfig> wrappers, ArrayList<String> connectorIdentifiers) {
478+
for (ExecutionWrapperConfig executionWrapper : wrappers) {
470479
if (executionWrapper.getStep() != null && !executionWrapper.getStep().isNull()) {
471480
StepElementConfig stepElementConfig = IntegrationStageUtils.getStepElementConfig(executionWrapper);
472481
String identifier = getConnectorIdentifier(stepElementConfig);
@@ -477,20 +486,19 @@ public List<String> getStageConnectorRefs(IntegrationStageConfig integrationStag
477486
ParallelStepElementConfig parallelStepElementConfig =
478487
IntegrationStageUtils.getParallelStepElementConfig(executionWrapper);
479488
if (isNotEmpty(parallelStepElementConfig.getSections())) {
480-
for (ExecutionWrapperConfig executionWrapperInParallel : parallelStepElementConfig.getSections()) {
481-
if (executionWrapperInParallel.getStep() == null || executionWrapperInParallel.getStep().isNull()) {
482-
continue;
483-
}
484-
StepElementConfig stepElementConfig =
485-
IntegrationStageUtils.getStepElementConfig(executionWrapperInParallel);
486-
String identifier = getConnectorIdentifier(stepElementConfig);
487-
if (identifier != null) {
488-
connectorIdentifiers.add(identifier);
489-
}
490-
}
489+
populateConnectorIdentifiers(parallelStepElementConfig.getSections(), connectorIdentifiers);
491490
}
491+
} else {
492+
StepGroupElementConfig stepGroupElementConfig =
493+
IntegrationStageUtils.getStepGroupElementConfig(executionWrapper);
494+
populateConnectorIdentifiers(stepGroupElementConfig.getSteps(), connectorIdentifiers);
492495
}
493496
}
497+
}
498+
499+
public List<String> getStageConnectorRefs(IntegrationStageConfig integrationStageConfig) {
500+
ArrayList<String> connectorIdentifiers = new ArrayList<>();
501+
populateConnectorIdentifiers(integrationStageConfig.getExecution().getSteps(), connectorIdentifiers);
494502

495503
if (integrationStageConfig.getServiceDependencies() == null
496504
|| isEmpty(integrationStageConfig.getServiceDependencies().getValue())) {

320-ci-execution/src/main/java/io/harness/ci/integrationstage/K8InitializeStepUtils.java

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import io.harness.plancreator.stages.stage.StageElementConfig;
5656
import io.harness.plancreator.steps.ParallelStepElementConfig;
5757
import io.harness.plancreator.steps.StepElementConfig;
58+
import io.harness.plancreator.steps.StepGroupElementConfig;
5859
import io.harness.pms.contracts.ambiance.Ambiance;
5960
import io.harness.pms.execution.utils.AmbianceUtils;
6061
import io.harness.pms.yaml.YamlUtils;
@@ -88,18 +89,89 @@ public class K8InitializeStepUtils {
8889
@Inject private CIFeatureFlagService featureFlagService;
8990
@Inject private ConnectorUtils connectorUtils;
9091

92+
private List<ContainerDefinitionInfo> createStepContainerDefinitionsForStepGroup(
93+
ExecutionWrapperConfig executionWrapper, StageElementConfig integrationStage, CIExecutionArgs ciExecutionArgs,
94+
PortFinder portFinder, String accountId, OSType os, int stageMemoryRequest, int stageCpuRequest, int stepIndex) {
95+
List<ContainerDefinitionInfo> containerDefinitionInfos = new ArrayList<>();
96+
StepGroupElementConfig stepGroupElementConfig = IntegrationStageUtils.getStepGroupElementConfig(executionWrapper);
97+
98+
for (ExecutionWrapperConfig step : stepGroupElementConfig.getSteps()) {
99+
if (step.getStep() != null && !step.getStep().isNull()) {
100+
StepElementConfig stepElementConfig = IntegrationStageUtils.getStepElementConfig(step);
101+
stepIndex++;
102+
103+
ContainerDefinitionInfo containerDefinitionInfo =
104+
createStepContainerDefinition(stepElementConfig, integrationStage, ciExecutionArgs, portFinder, stepIndex,
105+
accountId, os, stageMemoryRequest, stageCpuRequest);
106+
if (containerDefinitionInfo != null) {
107+
containerDefinitionInfos.add(containerDefinitionInfo);
108+
}
109+
} else if (step.getParallel() != null && !step.getParallel().isNull()) {
110+
List<ContainerDefinitionInfo> temp = createStepContainerDefinitionsForParallel(step, integrationStage,
111+
ciExecutionArgs, portFinder, accountId, os, stageMemoryRequest, stageCpuRequest, stepIndex);
112+
if (temp != null) {
113+
stepIndex += temp.size();
114+
if (temp.size() > 0) {
115+
containerDefinitionInfos.addAll(temp);
116+
}
117+
}
118+
}
119+
}
120+
return containerDefinitionInfos;
121+
}
122+
123+
private List<ContainerDefinitionInfo> createStepContainerDefinitionsForParallel(
124+
ExecutionWrapperConfig executionWrapper, StageElementConfig integrationStage, CIExecutionArgs ciExecutionArgs,
125+
PortFinder portFinder, String accountId, OSType os, int extraMemory, int extraCPU, int stepIndex) {
126+
List<ContainerDefinitionInfo> containerDefinitionInfos = new ArrayList<>();
127+
ParallelStepElementConfig parallelStepElementConfig =
128+
IntegrationStageUtils.getParallelStepElementConfig(executionWrapper);
129+
if (isEmpty(parallelStepElementConfig.getSections())) {
130+
return containerDefinitionInfos;
131+
}
132+
133+
int steps = parallelStepElementConfig.getSections().size();
134+
Integer extraMemoryPerStep = extraMemory / steps;
135+
Integer extraCPUPerStep = extraCPU / steps;
136+
137+
for (ExecutionWrapperConfig executionWrapperInParallel : parallelStepElementConfig.getSections()) {
138+
if (executionWrapperInParallel.getStep() != null && !executionWrapperInParallel.getStep().isNull()) {
139+
StepElementConfig stepElementConfig = IntegrationStageUtils.getStepElementConfig(executionWrapperInParallel);
140+
stepIndex++;
141+
ContainerDefinitionInfo containerDefinitionInfo =
142+
createStepContainerDefinition(stepElementConfig, integrationStage, ciExecutionArgs, portFinder, stepIndex,
143+
accountId, os, extraMemoryPerStep, extraCPUPerStep);
144+
if (containerDefinitionInfo != null) {
145+
containerDefinitionInfos.add(containerDefinitionInfo);
146+
}
147+
} else if (executionWrapperInParallel.getStepGroup() != null
148+
&& !executionWrapperInParallel.getStepGroup().isNull()) {
149+
List<ContainerDefinitionInfo> temp =
150+
createStepContainerDefinitionsForStepGroup(executionWrapperInParallel, integrationStage, ciExecutionArgs,
151+
portFinder, accountId, os, extraMemoryPerStep, extraCPUPerStep, stepIndex);
152+
if (temp != null) {
153+
stepIndex += temp.size();
154+
if (temp.size() > 0) {
155+
containerDefinitionInfos.addAll(temp);
156+
}
157+
}
158+
}
159+
}
160+
161+
return containerDefinitionInfos;
162+
}
163+
91164
public List<ContainerDefinitionInfo> createStepContainerDefinitions(List<ExecutionWrapperConfig> steps,
92165
StageElementConfig integrationStage, CIExecutionArgs ciExecutionArgs, PortFinder portFinder, String accountId,
93166
OSType os) {
94167
List<ContainerDefinitionInfo> containerDefinitionInfos = new ArrayList<>();
95168
if (steps == null) {
96169
return containerDefinitionInfos;
97170
}
98-
171+
int stepIndex = 0;
99172
Integer stageMemoryRequest = getStageMemoryRequest(steps, accountId);
100173
Integer stageCpuRequest = getStageCpuRequest(steps, accountId);
101174

102-
int stepIndex = 0;
103175
for (ExecutionWrapperConfig executionWrapper : steps) {
104176
if (executionWrapper.getStep() != null && !executionWrapper.getStep().isNull()) {
105177
StepElementConfig stepElementConfig = IntegrationStageUtils.getStepElementConfig(executionWrapper);
@@ -149,6 +221,58 @@ public List<ContainerDefinitionInfo> createStepContainerDefinitions(List<Executi
149221
return containerDefinitionInfos;
150222
}
151223

224+
public List<ContainerDefinitionInfo> createStepContainerDefinitions_feature(List<ExecutionWrapperConfig> steps,
225+
StageElementConfig integrationStage, CIExecutionArgs ciExecutionArgs, PortFinder portFinder, String accountId,
226+
OSType os, int stepIndex) {
227+
List<ContainerDefinitionInfo> containerDefinitionInfos = new ArrayList<>();
228+
if (steps == null) {
229+
return containerDefinitionInfos;
230+
}
231+
232+
Integer stageMemoryRequest = getStageMemoryRequest(steps, accountId);
233+
Integer stageCpuRequest = getStageCpuRequest(steps, accountId);
234+
235+
for (ExecutionWrapperConfig executionWrapper : steps) {
236+
if (executionWrapper.getStep() != null && !executionWrapper.getStep().isNull()) {
237+
StepElementConfig stepElementConfig = IntegrationStageUtils.getStepElementConfig(executionWrapper);
238+
stepIndex++;
239+
Integer extraMemoryPerStep = calculateExtraMemory(executionWrapper, accountId, stageMemoryRequest);
240+
Integer extraCPUPerStep = calculateExtraCPU(executionWrapper, accountId, stageCpuRequest);
241+
ContainerDefinitionInfo containerDefinitionInfo =
242+
createStepContainerDefinition(stepElementConfig, integrationStage, ciExecutionArgs, portFinder, stepIndex,
243+
accountId, os, extraMemoryPerStep, extraCPUPerStep);
244+
if (containerDefinitionInfo != null) {
245+
containerDefinitionInfos.add(containerDefinitionInfo);
246+
}
247+
} else if (executionWrapper.getParallel() != null && !executionWrapper.getParallel().isNull()) {
248+
ParallelStepElementConfig parallelStepElementConfig =
249+
IntegrationStageUtils.getParallelStepElementConfig(executionWrapper);
250+
Integer extraMemory = calculateExtraMemory(executionWrapper, accountId, stageMemoryRequest);
251+
Integer extraCPU = calculateExtraCPU(executionWrapper, accountId, stageCpuRequest);
252+
List<ContainerDefinitionInfo> temp = createStepContainerDefinitionsForParallel(executionWrapper,
253+
integrationStage, ciExecutionArgs, portFinder, accountId, os, extraMemory, extraCPU, stepIndex);
254+
if (temp != null) {
255+
stepIndex += temp.size();
256+
if (temp.size() > 0) {
257+
containerDefinitionInfos.addAll(temp);
258+
}
259+
}
260+
261+
} else if (executionWrapper.getStepGroup() != null && !executionWrapper.getStepGroup().isNull()) {
262+
List<ContainerDefinitionInfo> temp =
263+
createStepContainerDefinitionsForStepGroup(executionWrapper, integrationStage, ciExecutionArgs, portFinder,
264+
accountId, os, stageMemoryRequest, stageCpuRequest, stepIndex);
265+
if (temp != null) {
266+
stepIndex += temp.size();
267+
if (temp.size() > 0) {
268+
containerDefinitionInfos.addAll(temp);
269+
}
270+
}
271+
}
272+
}
273+
return containerDefinitionInfos;
274+
}
275+
152276
private ContainerDefinitionInfo createStepContainerDefinition(StepElementConfig stepElement,
153277
StageElementConfig integrationStage, CIExecutionArgs ciExecutionArgs, PortFinder portFinder, int stepIndex,
154278
String accountId, OSType os, Integer extraMemoryPerStep, Integer extraCPUPerStep) {
@@ -450,7 +574,14 @@ private Integer getExecutionWrapperMemoryRequest(ExecutionWrapperConfig executio
450574
executionWrapperMemoryRequest += getExecutionWrapperMemoryRequest(wrapper, accountId);
451575
}
452576
}
577+
} else {
578+
StepGroupElementConfig stepGroupElementConfig = IntegrationStageUtils.getStepGroupElementConfig(executionWrapper);
579+
for (ExecutionWrapperConfig wrapper : stepGroupElementConfig.getSteps()) {
580+
Integer temp = getExecutionWrapperMemoryRequest(wrapper, accountId);
581+
executionWrapperMemoryRequest = Math.max(executionWrapperMemoryRequest, temp);
582+
}
453583
}
584+
454585
return executionWrapperMemoryRequest;
455586
}
456587

@@ -509,6 +640,12 @@ private Integer getExecutionWrapperCpuRequest(ExecutionWrapperConfig executionWr
509640
executionWrapperCpuRequest += getExecutionWrapperCpuRequest(wrapper, accountId);
510641
}
511642
}
643+
} else {
644+
StepGroupElementConfig stepGroupElementConfig = IntegrationStageUtils.getStepGroupElementConfig(executionWrapper);
645+
for (ExecutionWrapperConfig wrapper : stepGroupElementConfig.getSteps()) {
646+
Integer temp = getExecutionWrapperCpuRequest(wrapper, accountId);
647+
executionWrapperCpuRequest = Math.max(executionWrapperCpuRequest, temp);
648+
}
512649
}
513650
return executionWrapperCpuRequest;
514651
}
@@ -691,6 +828,12 @@ private Integer calculateExtraCPUForParallelStep(
691828
return extraCPUPerStep;
692829
}
693830

831+
private Integer calculateExtraCPU(
832+
ExecutionWrapperConfig executionWrapper, String accountId, Integer stageCpuRequest) {
833+
Integer executionWrapperCPURequest = getExecutionWrapperCpuRequest(executionWrapper, accountId);
834+
return Math.max(0, stageCpuRequest - executionWrapperCPURequest);
835+
}
836+
694837
private Integer calculateExtraMemoryForParallelStep(
695838
ParallelStepElementConfig parallelStepElementConfig, String accountId, Integer stageMemoryRequest) {
696839
Integer executionWrapperMemoryRequest = 0;
@@ -707,6 +850,12 @@ private Integer calculateExtraMemoryForParallelStep(
707850
return extraMemoryPerStep;
708851
}
709852

853+
private Integer calculateExtraMemory(
854+
ExecutionWrapperConfig executionWrapper, String accountId, Integer stageMemoryRequest) {
855+
Integer executionWrapperMemoryRequest = getExecutionWrapperMemoryRequest(executionWrapper, accountId);
856+
return Math.max(0, stageMemoryRequest - executionWrapperMemoryRequest);
857+
}
858+
710859
private StepElementConfig getStepElementConfig(ExecutionWrapperConfig executionWrapperConfig) {
711860
try {
712861
return YamlUtils.read(executionWrapperConfig.getStep().toString(), StepElementConfig.class);

320-ci-execution/src/main/java/io/harness/ci/integrationstage/K8InitializeTaskParamsBuilder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,9 +398,9 @@ private List<ContainerDefinitionInfo> getStageContainerDefinitions(
398398
.build();
399399
List<ContainerDefinitionInfo> serviceCtrDefinitionInfos =
400400
k8InitializeServiceUtils.createServiceContainerDefinitions(stageElementConfig, portFinder, os);
401-
List<ContainerDefinitionInfo> stepCtrDefinitionInfos =
402-
k8InitializeStepUtils.createStepContainerDefinitions(initializeStepInfo.getExecutionElementConfig().getSteps(),
403-
stageElementConfig, ciExecutionArgs, portFinder, AmbianceUtils.getAccountId(ambiance), os);
401+
List<ContainerDefinitionInfo> stepCtrDefinitionInfos = k8InitializeStepUtils.createStepContainerDefinitions_feature(
402+
initializeStepInfo.getExecutionElementConfig().getSteps(), stageElementConfig, ciExecutionArgs, portFinder,
403+
AmbianceUtils.getAccountId(ambiance), os, 0);
404404

405405
List<ContainerDefinitionInfo> containerDefinitionInfos = new ArrayList<>();
406406
containerDefinitionInfos.addAll(serviceCtrDefinitionInfos);

320-ci-execution/src/main/java/io/harness/states/InitializeTaskStep.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.harness.plancreator.execution.ExecutionWrapperConfig;
5050
import io.harness.plancreator.steps.ParallelStepElementConfig;
5151
import io.harness.plancreator.steps.StepElementConfig;
52+
import io.harness.plancreator.steps.StepGroupElementConfig;
5253
import io.harness.plancreator.steps.common.StepElementParameters;
5354
import io.harness.pms.contracts.ambiance.Ambiance;
5455
import io.harness.pms.contracts.execution.Status;
@@ -370,7 +371,11 @@ private void addLogKey(
370371
IntegrationStageUtils.getParallelStepElementConfig(executionWrapper);
371372
parallelStepElementConfig.getSections().forEach(section -> addLogKey(section, logPrefix, logKeyByStepId));
372373
} else {
373-
throw new InvalidRequestException("Only Parallel or StepElement is supported");
374+
StepGroupElementConfig stepGroupElementConfig =
375+
IntegrationStageUtils.getStepGroupElementConfig(executionWrapper);
376+
for (ExecutionWrapperConfig wrapper : stepGroupElementConfig.getSteps()) {
377+
addLogKey(wrapper, logPrefix, logKeyByStepId);
378+
}
374379
}
375380
}
376381
}

320-ci-execution/src/test/java/io/harness/ci/integrationstage/K8InitializeStepUtilsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.util.List;
3131
import org.junit.Test;
3232
import org.junit.experimental.categories.Category;
33+
import org.junit.runner.RunWith;
34+
import org.mockito.runners.MockitoJUnitRunner;
3335

3436
public class K8InitializeStepUtilsTest extends CIExecutionTestBase {
3537
private static Integer PORT_STARTING_RANGE = 20002;

330-ci-beans/src/main/java/io/harness/beans/stages/IntegrationStageStepParametersPMS.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.harness.plancreator.stages.stage.StageElementConfig;
2222
import io.harness.plancreator.steps.ParallelStepElementConfig;
2323
import io.harness.plancreator.steps.StepElementConfig;
24+
import io.harness.plancreator.steps.StepGroupElementConfig;
2425
import io.harness.plancreator.steps.common.SpecParameters;
2526
import io.harness.pms.plan.creation.PlanCreatorUtils;
2627
import io.harness.pms.sdk.core.plan.creation.beans.PlanCreationContext;
@@ -120,11 +121,22 @@ private static void addStepIdentifier(ExecutionWrapperConfig executionWrapper, L
120121
ParallelStepElementConfig parallelStepElementConfig = getParallelStepElementConfig(executionWrapper);
121122
parallelStepElementConfig.getSections().forEach(section -> addStepIdentifier(section, stepIdentifiers));
122123
} else {
123-
throw new InvalidRequestException("Only Parallel or StepElement is supported");
124+
StepGroupElementConfig stepGroupElementConfig = getStepGroupElementConfig(executionWrapper);
125+
for (ExecutionWrapperConfig wrapper : stepGroupElementConfig.getSteps()) {
126+
addStepIdentifier(wrapper, stepIdentifiers);
127+
}
124128
}
125129
}
126130
}
127131

132+
public static StepGroupElementConfig getStepGroupElementConfig(ExecutionWrapperConfig executionWrapperConfig) {
133+
try {
134+
return YamlUtils.read(executionWrapperConfig.getStepGroup().toString(), StepGroupElementConfig.class);
135+
} catch (Exception ex) {
136+
throw new CIStageExecutionException("Failed to deserialize ExecutionWrapperConfig step node", ex);
137+
}
138+
}
139+
128140
private static StepElementConfig getStepElementConfig(ExecutionWrapperConfig executionWrapperConfig) {
129141
try {
130142
return YamlUtils.read(executionWrapperConfig.getStep().toString(), StepElementConfig.class);

0 commit comments

Comments
 (0)