Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,26 +163,31 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.localAddress, newMasterEpoch);

// Change record
this.masterAddress = this.localAddress;
this.masterEpoch = newMasterEpoch;

// Change sync state set
final HashSet<String> newSyncStateSet = new HashSet<>();
newSyncStateSet.add(this.localAddress);
changeSyncStateSet(newSyncStateSet, syncStateSetEpoch);

// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SYNC_MASTER);
if (this.localAddress.equals(masterAddress) && brokerController.getBrokerConfig().getBrokerId() == MixAll.MASTER_ID) {
LOGGER.warn("The broker role is already master");
} else {
// Change record
this.masterAddress = this.localAddress;

// Notify ha service, change to master
this.haService.changeToMaster(newMasterEpoch);
// Handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SYNC_MASTER);

this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
this.brokerController.changeSpecialServiceStatus(true);
// Notify ha service, change to master
this.haService.changeToMaster(newMasterEpoch);

schedulingCheckSyncStateSet();
this.brokerController.getBrokerConfig().setBrokerId(MixAll.MASTER_ID);
this.brokerController.getMessageStoreConfig().setBrokerRole(BrokerRole.SYNC_MASTER);
this.brokerController.changeSpecialServiceStatus(true);

schedulingCheckSyncStateSet();
}

this.executorService.submit(() -> {
// Register broker to name-srv
Expand Down