Skip to content

Commit 63a014b

Browse files
authored
[ISSUE#6342] Local SyncStatSet sync to remote value when changeToMaster (#6352)
* fix ISSUE#6342 * refactor * correct the syncStateSet in elect process. * move syncStateSet into request/response's body. * optimize the broker electing switch's branch.
1 parent 3db8d03 commit 63a014b

File tree

11 files changed

+191
-39
lines changed

11 files changed

+191
-39
lines changed

broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -227,26 +227,25 @@ public void shutdown() {
227227
}
228228

229229
public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress, final Integer newMasterEpoch,
230-
final Integer syncStateSetEpoch) {
230+
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
231231
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
232232
if (newMasterBrokerId.equals(this.brokerControllerId)) {
233-
changeToMaster(newMasterEpoch, syncStateSetEpoch);
233+
changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet);
234234
} else {
235235
changeToSlave(newMasterAddress, newMasterEpoch, newMasterBrokerId);
236236
}
237237
}
238238
}
239239

240-
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) {
240+
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) {
241241
synchronized (this) {
242242
if (newMasterEpoch > this.masterEpoch) {
243243
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch);
244244

245245
this.masterEpoch = newMasterEpoch;
246246

247247
// Change SyncStateSet
248-
final HashSet<Long> newSyncStateSet = new HashSet<>();
249-
newSyncStateSet.add(this.brokerControllerId);
248+
final HashSet<Long> newSyncStateSet = new HashSet<>(syncStateSet);
250249
changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);
251250

252251
// Change record
@@ -365,8 +364,10 @@ private void handleSlaveSynchronize(final BrokerRole role) {
365364
private boolean brokerElect() {
366365
// Broker try to elect itself as a master in broker set.
367366
try {
368-
ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
367+
Pair<ElectMasterResponseHeader, Set<Long>> tryElectResponsePair = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
369368
this.brokerConfig.getBrokerName(), this.brokerControllerId);
369+
ElectMasterResponseHeader tryElectResponse = tryElectResponsePair.getObject1();
370+
Set<Long> syncStateSet = tryElectResponsePair.getObject2();
370371
final String masterAddress = tryElectResponse.getMasterAddress();
371372
final Long masterBrokerId = tryElectResponse.getMasterBrokerId();
372373
if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) {
@@ -375,7 +376,7 @@ private boolean brokerElect() {
375376
}
376377

377378
if (masterBrokerId.equals(this.brokerControllerId)) {
378-
changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
379+
changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch(), syncStateSet);
379380
} else {
380381
changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId());
381382
}
@@ -544,15 +545,17 @@ private boolean createMetadataFileAndDeleteTemp() {
544545
*/
545546
private boolean registerBrokerToController() {
546547
try {
547-
RegisterBrokerToControllerResponseHeader response = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress);
548-
if (response == null) return false;
548+
Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> responsePair = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress);
549+
if (responsePair == null) return false;
550+
RegisterBrokerToControllerResponseHeader response = responsePair.getObject1();
551+
Set<Long> syncStateSet = responsePair.getObject2();
549552
final Long masterBrokerId = response.getMasterBrokerId();
550553
final String masterAddress = response.getMasterAddress();
551554
if (masterBrokerId == null) {
552555
return true;
553556
}
554557
if (this.brokerControllerId.equals(masterBrokerId)) {
555-
changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch());
558+
changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch(), syncStateSet);
556559
} else {
557560
changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId);
558561
}
@@ -635,7 +638,7 @@ private void schedulingSyncBrokerMetadata() {
635638
if (StringUtils.isNoneEmpty(newMasterAddress) && masterBrokerId != null) {
636639
if (masterBrokerId.equals(this.brokerControllerId)) {
637640
// If this broker is now the master
638-
changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch());
641+
changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch(), syncStateSet.getSyncStateSet());
639642
} else {
640643
// If this broker is now the slave, and master has been changed
641644
changeToSlave(newMasterAddress, newMasterEpoch, masterBrokerId);

broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
8181
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
8282
import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
83+
import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody;
8384
import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody;
8485
import org.apache.rocketmq.remoting.protocol.body.KVTable;
8586
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
@@ -1172,7 +1173,7 @@ public SyncStateSet alterSyncStateSet(
11721173
/**
11731174
* Broker try to elect itself as a master in broker set
11741175
*/
1175-
public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName,
1176+
public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String controllerAddress, String clusterName, String brokerName,
11761177
Long brokerId) throws Exception {
11771178

11781179
final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
@@ -1184,12 +1185,15 @@ public ElectMasterResponseHeader brokerElect(String controllerAddress, String cl
11841185
throw new MQBrokerException(response.getCode(), "Controller leader was changed");
11851186
}
11861187
case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
1187-
throw new MQBrokerException(response.getCode(), response.getRemark());
11881188
case CONTROLLER_ELECT_MASTER_FAILED:
1189+
throw new MQBrokerException(response.getCode(), response.getRemark());
11891190
case CONTROLLER_MASTER_STILL_EXIST:
11901191
case SUCCESS:
1191-
return (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
1192+
final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
1193+
final ElectMasterResponseBody responseBody = RemotingSerializable.decode(response.getBody(), ElectMasterResponseBody.class);
1194+
return new Pair<>(responseHeader, responseBody.getSyncStateSet());
11921195
}
1196+
11931197
throw new MQBrokerException(response.getCode(), response.getRemark());
11941198
}
11951199

@@ -1215,13 +1219,15 @@ public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final
12151219
throw new MQBrokerException(response.getCode(), response.getRemark());
12161220
}
12171221

1218-
public RegisterBrokerToControllerResponseHeader registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
1222+
public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception {
12191223
final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, brokerAddress);
12201224
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
12211225
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
12221226
assert response != null;
12231227
if (response.getCode() == SUCCESS) {
1224-
return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
1228+
RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
1229+
Set<Long> syncStateSet = RemotingSerializable.decode(response.getBody(), SyncStateSet.class).getSyncStateSet();
1230+
return new Pair<>(responseHeader, syncStateSet);
12251231
}
12261232
throw new MQBrokerException(response.getCode(), response.getRemark());
12271233
}

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
112112
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
113113
import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
114+
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
114115
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
115116
import org.apache.rocketmq.remoting.protocol.body.TopicList;
116117
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
@@ -2628,14 +2629,15 @@ private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,
26282629
private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,
26292630
RemotingCommand request) throws RemotingCommandException {
26302631
NotifyBrokerRoleChangedRequestHeader requestHeader = (NotifyBrokerRoleChangedRequestHeader) request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
2632+
SyncStateSet syncStateSetInfo = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
26312633

26322634
RemotingCommand response = RemotingCommand.createResponseCommand(null);
26332635

26342636
LOGGER.info("Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", requestHeader);
26352637

26362638
final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
26372639
if (replicasManager != null) {
2638-
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch());
2640+
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
26392641
}
26402642
response.setCode(ResponseCode.SUCCESS);
26412643
response.setRemark(null);

0 commit comments

Comments
 (0)