diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 068187e4013..b638f1bdb9c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -227,17 +227,17 @@ public void shutdown() { } public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress, final Integer newMasterEpoch, - final Integer syncStateSetEpoch) { + final Integer syncStateSetEpoch, final Set syncStateSet) { if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) { if (newMasterBrokerId.equals(this.brokerControllerId)) { - changeToMaster(newMasterEpoch, syncStateSetEpoch); + changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet); } else { changeToSlave(newMasterAddress, newMasterEpoch, newMasterBrokerId); } } } - public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) { + public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set syncStateSet) { synchronized (this) { if (newMasterEpoch > this.masterEpoch) { LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch); @@ -245,8 +245,7 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch this.masterEpoch = newMasterEpoch; // Change SyncStateSet - final HashSet newSyncStateSet = new HashSet<>(); - newSyncStateSet.add(this.brokerControllerId); + final HashSet newSyncStateSet = new HashSet<>(syncStateSet); changeSyncStateSet(newSyncStateSet, syncStateSetEpoch); // Change record @@ -365,8 +364,10 @@ private void handleSlaveSynchronize(final BrokerRole role) { private boolean brokerElect() { // Broker try to elect itself as a master in broker set. try { - ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), + Pair> tryElectResponsePair = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerControllerId); + ElectMasterResponseHeader tryElectResponse = tryElectResponsePair.getObject1(); + Set syncStateSet = tryElectResponsePair.getObject2(); final String masterAddress = tryElectResponse.getMasterAddress(); final Long masterBrokerId = tryElectResponse.getMasterBrokerId(); if (StringUtils.isEmpty(masterAddress) || masterBrokerId == null) { @@ -375,7 +376,7 @@ private boolean brokerElect() { } if (masterBrokerId.equals(this.brokerControllerId)) { - changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch()); + changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch(), syncStateSet); } else { changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getMasterBrokerId()); } @@ -544,15 +545,17 @@ private boolean createMetadataFileAndDeleteTemp() { */ private boolean registerBrokerToController() { try { - RegisterBrokerToControllerResponseHeader response = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress); - if (response == null) return false; + Pair> responsePair = this.brokerOuterAPI.registerBrokerToController(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerControllerId, brokerAddress, controllerLeaderAddress); + if (responsePair == null) return false; + RegisterBrokerToControllerResponseHeader response = responsePair.getObject1(); + Set syncStateSet = responsePair.getObject2(); final Long masterBrokerId = response.getMasterBrokerId(); final String masterAddress = response.getMasterAddress(); if (masterBrokerId == null) { return true; } if (this.brokerControllerId.equals(masterBrokerId)) { - changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch()); + changeToMaster(response.getMasterEpoch(), response.getSyncStateSetEpoch(), syncStateSet); } else { changeToSlave(masterAddress, response.getMasterEpoch(), masterBrokerId); } @@ -635,7 +638,7 @@ private void schedulingSyncBrokerMetadata() { if (StringUtils.isNoneEmpty(newMasterAddress) && masterBrokerId != null) { if (masterBrokerId.equals(this.brokerControllerId)) { // If this broker is now the master - changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch()); + changeToMaster(newMasterEpoch, syncStateSet.getSyncStateSetEpoch(), syncStateSet.getSyncStateSet()); } else { // If this broker is now the slave, and master has been changed changeToSlave(newMasterAddress, newMasterEpoch, masterBrokerId); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 4f298e1aaec..144f05016b9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -80,6 +80,7 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody; import org.apache.rocketmq.remoting.protocol.body.GetBrokerMemberGroupResponseBody; import org.apache.rocketmq.remoting.protocol.body.KVTable; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; @@ -1172,7 +1173,7 @@ public SyncStateSet alterSyncStateSet( /** * Broker try to elect itself as a master in broker set */ - public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName, + public Pair> brokerElect(String controllerAddress, String clusterName, String brokerName, Long brokerId) throws Exception { final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId); @@ -1184,12 +1185,15 @@ public ElectMasterResponseHeader brokerElect(String controllerAddress, String cl throw new MQBrokerException(response.getCode(), "Controller leader was changed"); } case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED: - throw new MQBrokerException(response.getCode(), response.getRemark()); case CONTROLLER_ELECT_MASTER_FAILED: + throw new MQBrokerException(response.getCode(), response.getRemark()); case CONTROLLER_MASTER_STILL_EXIST: case SUCCESS: - return (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class); + final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class); + final ElectMasterResponseBody responseBody = RemotingSerializable.decode(response.getBody(), ElectMasterResponseBody.class); + return new Pair<>(responseHeader, responseBody.getSyncStateSet()); } + throw new MQBrokerException(response.getCode(), response.getRemark()); } @@ -1215,13 +1219,15 @@ public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final throw new MQBrokerException(response.getCode(), response.getRemark()); } - public RegisterBrokerToControllerResponseHeader registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception { + public Pair> registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception { final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, brokerAddress); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader); final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); assert response != null; if (response.getCode() == SUCCESS) { - return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class); + RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class); + Set syncStateSet = RemotingSerializable.decode(response.getBody(), SyncStateSet.class).getSyncStateSet(); + return new Pair<>(responseHeader, syncStateSet); } throw new MQBrokerException(response.getCode(), response.getRemark()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index f3e446a652e..53729a43bca 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -111,6 +111,7 @@ import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody; import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; @@ -2628,6 +2629,7 @@ private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx, private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { NotifyBrokerRoleChangedRequestHeader requestHeader = (NotifyBrokerRoleChangedRequestHeader) request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class); + SyncStateSet syncStateSetInfo = RemotingSerializable.decode(request.getBody(), SyncStateSet.class); RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -2635,7 +2637,7 @@ private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx, final ReplicasManager replicasManager = this.brokerController.getReplicasManager(); if (replicasManager != null) { - replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch()); + replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet()); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java index f0e82b6388e..7fb9d9aeb59 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java @@ -22,6 +22,7 @@ import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; @@ -45,6 +46,8 @@ import java.io.File; import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; import java.util.UUID; import static org.awaitility.Awaitility.await; @@ -70,6 +73,7 @@ public class ReplicasManagerRegisterTest { public static final String CONTROLLER_ADDR = "127.0.0.1:8888"; public static final BrokerConfig BROKER_CONFIG; + private final HashSet syncStateSet = new HashSet<>(Arrays.asList(1L)); static { BROKER_CONFIG = new BrokerConfig(); @@ -122,10 +126,11 @@ public void setUp() throws Exception { @Test public void testBrokerRegisterSuccess() throws Exception { + when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); replicasManager0.start(); @@ -144,8 +149,8 @@ public void testBrokerRegisterSuccess() throws Exception { public void testBrokerRegisterSuccessAndRestartWithChangedBrokerConfig() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController); replicasManager0.start(); @@ -196,8 +201,8 @@ public void testRegisterFailedAtCreateTempFile() throws Exception { ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); PowerMockito.doReturn(false).when(spyReplicasManager, "createTempMetadataFile", anyLong()); @@ -216,8 +221,8 @@ public void testRegisterFailedAtApplyBrokerIdFailed() throws Exception { ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); replicasManager.start(); @@ -236,8 +241,8 @@ public void testRegisterFailedAtCreateMetadataFileAndDeleteTemp() throws Excepti ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController); when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager); PowerMockito.doReturn(false).when(spyReplicasManager, "createMetadataFileAndDeleteTemp"); @@ -280,7 +285,7 @@ public void testRegisterFailedAtRegisterSuccess() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L)); when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader()); when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException()); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); replicasManager.start(); @@ -299,7 +304,7 @@ public void testRegisterFailedAtRegisterSuccess() throws Exception { replicasManager.shutdown(); Mockito.reset(mockedBrokerOuterAPI); - when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1)); + when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new Pair<>(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1), syncStateSet)); when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn( new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR)); when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true); @@ -310,7 +315,7 @@ public void testRegisterFailedAtRegisterSuccess() throws Exception { when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 2L)); // because apply brokerId: 1 has succeeded, so next request which try to apply brokerId: 1 will be failed when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), eq(1L), any(), any())).thenThrow(new RuntimeException()); - when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterBrokerToControllerResponseHeader()); + when(mockedBrokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), syncStateSet)); replicasManagerNew.start(); Assert.assertEquals(ReplicasManager.State.RUNNING, replicasManagerNew.getState()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java index a5cf63d371c..e03828cff40 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.broker.controller; import java.io.File; +import java.util.Arrays; +import java.util.HashSet; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -116,6 +118,10 @@ public class ReplicasManagerTest { private static final Long SYNC_STATE = 1L; + private static final HashSet SYNC_STATE_SET_1 = new HashSet(Arrays.asList(BROKER_ID_1)); + + private static final HashSet SYNC_STATE_SET_2 = new HashSet(Arrays.asList(BROKER_ID_2)); + @Before public void before() throws Exception { UtilAll.deleteFile(new File(STORE_BASE_PATH)); @@ -154,9 +160,9 @@ public void before() throws Exception { when(brokerOuterAPI.checkAddressReachable(any())).thenReturn(true); when(brokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(getNextBrokerIdResponseHeader); when(brokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(applyBrokerIdResponseHeader); - when(brokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(registerBrokerToControllerResponseHeader); + when(brokerOuterAPI.registerBrokerToController(any(), any(), anyLong(), any(), any())).thenReturn(new Pair<>(new RegisterBrokerToControllerResponseHeader(), SYNC_STATE_SET_1)); when(brokerOuterAPI.getReplicaInfo(any(), any())).thenReturn(result); - when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(brokerTryElectResponseHeader); + when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(new Pair<>(brokerTryElectResponseHeader, SYNC_STATE_SET_1)); replicasManager = new ReplicasManager(brokerController); autoSwitchHAService.init(defaultMessageStore); replicasManager.start(); @@ -173,18 +179,24 @@ public void after() { @Test public void changeBrokerRoleTest() { + HashSet syncStateSetA = new HashSet<>(); + syncStateSetA.add(BROKER_ID_1); + HashSet syncStateSetB = new HashSet<>(); + syncStateSetA.add(BROKER_ID_2); // not equal to localAddress - Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH)) + Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_2, NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSetB)) .doesNotThrowAnyException(); // equal to localAddress - Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH)) + Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(BROKER_ID_1, OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSetA)) .doesNotThrowAnyException(); } @Test public void changeToMasterTest() { - Assertions.assertThatCode(() -> replicasManager.changeToMaster(NEW_MASTER_EPOCH, OLD_MASTER_EPOCH)).doesNotThrowAnyException(); + HashSet syncStateSet = new HashSet<>(); + syncStateSet.add(BROKER_ID_1); + Assertions.assertThatCode(() -> replicasManager.changeToMaster(NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, syncStateSet)).doesNotThrowAnyException(); } @Test diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java index 18e9992d31c..3da861c99a7 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry; +import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; @@ -179,6 +180,7 @@ public void doNotifyBrokerRoleChanged(final String brokerAddr, final RoleChangeN final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(), entry.getMasterBrokerId(), entry.getMasterEpoch(), entry.getSyncStateSetEpoch()); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader); + request.setBody(new SyncStateSet(entry.getSyncStateSet(), entry.getSyncStateSetEpoch()).encode()); try { this.remotingClient.invokeOneway(brokerAddr, request, 3000); } catch (final Exception e) { diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 728cf87e211..103cb68e249 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -44,6 +44,7 @@ import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup; import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; +import org.apache.rocketmq.remoting.protocol.body.ElectMasterResponseBody; import org.apache.rocketmq.remoting.protocol.body.SyncStateSet; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader; @@ -201,6 +202,8 @@ public ControllerResult electMaster(final ElectMaster response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); response.setMasterBrokerId(oldMaster); response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(oldMaster)); + + result.setBody(new ElectMasterResponseBody(syncStateSet).encode()); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } @@ -209,14 +212,21 @@ public ControllerResult electMaster(final ElectMaster if (newMaster != null) { final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); + final HashSet newSyncStateSet = new HashSet<>(); + newSyncStateSet.add(newMaster); + response.setMasterBrokerId(newMaster); response.setMasterAddress(brokerReplicaInfo.getBrokerAddress(newMaster)); response.setMasterEpoch(masterEpoch + 1); response.setSyncStateSetEpoch(syncStateSetEpoch + 1); + ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet); + BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName); if (null != brokerMemberGroup) { - result.setBody(brokerMemberGroup.encode()); + responseBody.setBrokerMemberGroup(brokerMemberGroup); } + + result.setBody(responseBody.encode()); final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); result.addEvent(event); return result; @@ -315,6 +325,7 @@ public ControllerResult registerBroker response.setMasterEpoch(syncStateInfo.getMasterEpoch()); response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch()); } + result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode()); // if this broker's address has been changed, we need to update it if (!brokerAddress.equals(brokerReplicaInfo.getBrokerAddress(brokerId))) { final UpdateBrokerAddressEvent event = new UpdateBrokerAddressEvent(clusterName, brokerName, brokerAddress, brokerId); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java new file mode 100644 index 00000000000..8aef636fa4c --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ElectMasterResponseBody.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.body; + +import com.google.common.base.Objects; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import java.util.HashSet; +import java.util.Set; + +public class ElectMasterResponseBody extends RemotingSerializable { + private BrokerMemberGroup brokerMemberGroup; + private Set syncStateSet; + + // Provide default constructor for serializer + public ElectMasterResponseBody() { + this.syncStateSet = new HashSet(); + this.brokerMemberGroup = null; + } + + public ElectMasterResponseBody(final Set syncStateSet) { + this.syncStateSet = syncStateSet; + this.brokerMemberGroup = null; + } + + public ElectMasterResponseBody(final BrokerMemberGroup brokerMemberGroup, final Set syncStateSet) { + this.brokerMemberGroup = brokerMemberGroup; + this.syncStateSet = syncStateSet; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ElectMasterResponseBody that = (ElectMasterResponseBody) o; + return Objects.equal(brokerMemberGroup, that.brokerMemberGroup) && + Objects.equal(syncStateSet, that.syncStateSet); + } + + @Override + public int hashCode() { + return Objects.hashCode(brokerMemberGroup, syncStateSet); + } + + @Override + public String toString() { + return "BrokerMemberGroup{" + + "brokerMemberGroup='" + brokerMemberGroup.toString() + '\'' + + ", syncStateSet='" + syncStateSet.toString() + + '}'; + } + + public void setBrokerMemberGroup(BrokerMemberGroup brokerMemberGroup) { + this.brokerMemberGroup = brokerMemberGroup; + } + + public BrokerMemberGroup getBrokerMemberGroup() { + return brokerMemberGroup; + } + + public void setSyncStateSet(Set syncStateSet) { + this.syncStateSet = syncStateSet; + } + + public Set getSyncStateSet() { + return syncStateSet; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java index 4f3f31218f3..ab25df0d1d0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java @@ -22,6 +22,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; +import java.util.Set; + public class RoleChangeNotifyEntry { private final BrokerMemberGroup brokerMemberGroup; @@ -34,21 +36,29 @@ public class RoleChangeNotifyEntry { private final int syncStateSetEpoch; - public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch) { + private final Set syncStateSet; + + public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterAddress, Long masterBrokerId, int masterEpoch, int syncStateSetEpoch, Set syncStateSet) { this.brokerMemberGroup = brokerMemberGroup; this.masterAddress = masterAddress; this.masterEpoch = masterEpoch; this.syncStateSetEpoch = syncStateSetEpoch; this.masterBrokerId = masterBrokerId; + this.syncStateSet = syncStateSet; } public static RoleChangeNotifyEntry convert(RemotingCommand electMasterResponse) { final ElectMasterResponseHeader header = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader(); BrokerMemberGroup brokerMemberGroup = null; + Set syncStateSet = null; + if (electMasterResponse.getBody() != null && electMasterResponse.getBody().length > 0) { - brokerMemberGroup = RemotingSerializable.decode(electMasterResponse.getBody(), BrokerMemberGroup.class); + ElectMasterResponseBody body = RemotingSerializable.decode(electMasterResponse.getBody(), ElectMasterResponseBody.class); + brokerMemberGroup = body.getBrokerMemberGroup(); + syncStateSet = body.getSyncStateSet(); } - return new RoleChangeNotifyEntry(brokerMemberGroup, header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch()); + + return new RoleChangeNotifyEntry(brokerMemberGroup, header.getMasterAddress(), header.getMasterBrokerId(), header.getMasterEpoch(), header.getSyncStateSetEpoch(), syncStateSet); } @@ -71,4 +81,8 @@ public int getSyncStateSetEpoch() { public Long getMasterBrokerId() { return masterBrokerId; } + + public Set getSyncStateSet() { + return syncStateSet; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java index d3c8975383f..aaf3b10b829 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java @@ -19,6 +19,7 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; + public class ElectMasterResponseHeader implements CommandCustomHeader { private Long masterBrokerId; diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index 42586145521..683018b67ca 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -246,6 +246,16 @@ public Set maybeShrinkSyncStateSet() { } } } + + // If the slaveBrokerId is in syncStateSet but not in connectionCaughtUpTimeTable, + // it means that the broker has not connected. + for (Long slaveBrokerId : newSyncStateSet) { + if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) { + newSyncStateSet.remove(slaveBrokerId); + isSyncStateSetChanged = true; + } + } + if (isSyncStateSetChanged) { markSynchronizingSyncStateSet(newSyncStateSet); }