diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 638d0380691..1885a3cf3a1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -77,6 +77,7 @@ public class MixAll { public static final long FIRST_SLAVE_ID = 1L; public static final long CURRENT_JVM_PID = getPID(); public final static int UNIT_PRE_SIZE_FOR_MSG = 28; + public final static int ALL_ACK_IN_SYNC_STATE_SET = -1; public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; diff --git a/docs/cn/controller/design.md b/docs/cn/controller/design.md index 0080986e1f6..e100881fd74 100644 --- a/docs/cn/controller/design.md +++ b/docs/cn/controller/design.md @@ -2,9 +2,8 @@ 当前 RocketMQ Raft 模式主要是利用 DLedger Commitlog 替换原来的 Commitlog,使 Commitlog 拥有选举复制能力,但这也造成了一些问题: -- Raft 模式下,Broker组内副本数必须是三副本及以上。 +- Raft 模式下,Broker组内副本数必须是三副本及以上,副本的ACK也必须遵循多数派协议。 - RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。 -- Raft 模式下, 日志复制性能并不高效。 因此我们希望利用 DLedger 实现一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件, 支持独立部署, 也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举, 从而解决上述问题, 我们将该新模式称为 Controller 模式。 @@ -27,7 +26,7 @@ ![image-20220605213143645](../image/controller/quick-start/controller.png) -如果, 是 DledgerController 的核心设计: +如图是 DledgerController 的核心设计: - DLedgerController 可以内嵌在 Namesrv 中, 也可以独立的部署。 - Active DLedgerController 是 DLedger 选举出来的 Leader, 其会接受来自客户端的事件请求, 并通过 DLedger 发起共识, 最后应用到内存元数据状态机中。 @@ -195,11 +194,9 @@ Shrink SyncStateSet ,指把 SyncStateSet 副本集合中那些与Master差距 - ReadSocketService 接收到 slaveAckOffset 时若 slaveAckOffset >= lastMasterMaxOffset 则将lastCaughtUpTimeMs 更新为 lastTransferTimeMs。 -- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time - connection.lastCaughtUpTimeMs) > +- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time - connection.lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchUp,则该 Slave 是 Out-of-sync 的。 - haMaxTimeSlaveNotCatchUp,则该 Slave 是 Out-of-sync 的 。 - -- 如果检测到 Slave out of sync , master 会立刻和 Controller 上报, 从而 Shrink SyncStateSet 。 +- 如果检测到 Slave out of sync , master 会立刻和 Controller 上报, 从而 Shrink SyncStateSet。 #### Expand diff --git a/docs/cn/controller/quick_start.md b/docs/cn/controller/quick_start.md index c125b48ab00..b3c86d75d4b 100644 --- a/docs/cn/controller/quick_start.md +++ b/docs/cn/controller/quick_start.md @@ -2,7 +2,7 @@ ## 前言 -该文档主要介绍如何快速构建和部署基于 Controller 的可以自动容灾切换的 RocketMQ 集群。 +该文档主要介绍如何快速构建和部署基于 Controller 的可以自动切换的 RocketMQ 集群。 详细的新集群部署和旧集群升级指南请参考 [部署指南](deploy_guide.md)。 @@ -26,7 +26,7 @@ 如果上面的步骤执行成功,可以通过运维命令查看集群状态。 -至此, 启动成功,现在可以向集群收发消息,并进行容灾切换测试了。 +至此, 启动成功,现在可以向集群收发消息,并进行切换测试了。 如果需要关闭快速集群,可以执行: @@ -58,7 +58,7 @@ ![image-20220605205247476](../image/controller/quick-start/epoch.png) -## 容灾切换 +## 切换 部署成功后,现在尝试进行 Master 切换。 diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index b95804bb394..75213b83a7a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -349,7 +349,6 @@ else if (!dispatchRequest.isSuccess()) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } - } else { // Commitlog case files are deleted log.warn("The commitlog files are deleted, and delete the consume queue files"); @@ -803,13 +802,18 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke boolean needHandleHA = needHandleHA(msg); int needAckNums = 1; - if (needHandleHA && !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) { - int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(), - this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1); - needAckNums = calcNeedAckNums(inSyncReplicas); - if (needAckNums > inSyncReplicas) { - // Tell the producer, don't have enough slaves to handle the send request - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null)); + if (needHandleHA) { + if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode() && this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) { + // -1 means all ack in SyncStateSet + needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET; + } else { + int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(), + this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1); + needAckNums = calcNeedAckNums(inSyncReplicas); + if (needAckNums > inSyncReplicas) { + // Tell the producer, don't have enough slaves to handle the send request + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null)); + } } } @@ -950,13 +954,18 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc int needAckNums = 1; boolean needHandleHA = needHandleHA(messageExtBatch); - if (needHandleHA && !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) { - int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(), - this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1); - needAckNums = calcNeedAckNums(inSyncReplicas); - if (needAckNums > inSyncReplicas) { - // Tell the producer, don't have enough slaves to handle the send request - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null)); + if (needHandleHA) { + if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode() && this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) { + // -1 means all ack in SyncStateSet + needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET; + } else { + int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(), + this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1); + needAckNums = calcNeedAckNums(inSyncReplicas); + if (needAckNums > inSyncReplicas) { + // Tell the producer, don't have enough slaves to handle the send request + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null)); + } } } @@ -1101,8 +1110,7 @@ private CompletableFuture handleDiskFlush(AppendMessageResult private CompletableFuture handleHA(AppendMessageResult result, PutMessageResult putMessageResult, int needAckNums) { - final boolean allAckInSyncStateSet = this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet(); - if (needAckNums <= 1 && !allAckInSyncStateSet) { + if (needAckNums >= 0 && needAckNums <= 1) { return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } @@ -1122,7 +1130,7 @@ private CompletableFuture handleHA(AppendMessageResult result, // } // Wait enough acks from different slaves - GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums - 1, allAckInSyncStateSet); + GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums); haService.putRequest(request); haService.getWaitNotifyObject().wakeupAll(); return request.future(); @@ -1389,7 +1397,6 @@ public static class GroupCommitRequest { private final CompletableFuture flushOKFuture = new CompletableFuture<>(); private volatile int ackNums = 1; private final long deadLine; - private boolean allAckInSyncStateSet; public GroupCommitRequest(long nextOffset, long timeoutMillis) { this.nextOffset = nextOffset; @@ -1401,11 +1408,6 @@ public GroupCommitRequest(long nextOffset, long timeoutMillis, int ackNums) { this.ackNums = ackNums; } - public GroupCommitRequest(long nextOffset, long timeoutMillis, int ackNums, boolean allAckInSyncStateSet) { - this(nextOffset, timeoutMillis, ackNums); - this.allAckInSyncStateSet = allAckInSyncStateSet; - } - public long getNextOffset() { return nextOffset; } @@ -1418,10 +1420,6 @@ public long getDeadLine() { return deadLine; } - public boolean isAllAckInSyncStateSet() { - return allAckInSyncStateSet; - } - public void wakeupCustomer(final PutMessageStatus status) { this.flushOKFuture.complete(status); } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java index 02198ad05d9..f84bdbf052e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; @@ -74,14 +75,14 @@ private void doWaitTransfer() { boolean transferOK = false; long deadLine = req.getDeadLine(); - final boolean allAckInSyncStateSet = req.isAllAckInSyncStateSet(); + final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET; for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) { if (i > 0) { this.notifyTransferObject.waitForRunning(1000); } - if (req.getAckNums() <= 1 && !allAckInSyncStateSet) { + if (!allAckInSyncStateSet && req.getAckNums() <= 1) { transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset(); continue; } @@ -95,12 +96,12 @@ private void doWaitTransfer() { transferOK = true; break; } - // Include master. + // Include master int ackNums = 1; for (HAConnection conn : haService.getConnectionList()) { final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn; if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) { - ackNums ++; + ackNums++; } if (ackNums >= syncStateSet.size()) { transferOK = true; @@ -108,7 +109,8 @@ private void doWaitTransfer() { } } } else { - int ackNums = 0; + // Include master + int ackNums = 1; for (HAConnection conn : haService.getConnectionList()) { // TODO: We must ensure every HAConnection represents a different slave // Solution: Consider assign a unique and fixed IP:ADDR for each different slave diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java index e55fedc9760..5304bec4673 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java @@ -176,7 +176,7 @@ public Boolean call() throws Exception { @Test public void putRequest_SingleAck() throws IOException, ExecutionException, InterruptedException, TimeoutException { - CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000,1); + CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000, 1); this.haService.putRequest(request); assertThat(request.future().get()).isEqualTo(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); @@ -186,17 +186,17 @@ public void putRequest_SingleAck() throws IOException, ExecutionException, Inter doReturn(124L).when(messageStore).getMasterFlushedOffset(); setUpOneHAClient(messageStore); - request = new CommitLog.GroupCommitRequest(124, 4000,1); + request = new CommitLog.GroupCommitRequest(124, 4000, 1); this.haService.putRequest(request); assertThat(request.future().get()).isEqualTo(PutMessageStatus.PUT_OK); } @Test public void putRequest_MultipleAckAndRequests() throws IOException, ExecutionException, InterruptedException { - CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000,1); + CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000, 2); this.haService.putRequest(oneAck); - CommitLog.GroupCommitRequest twoAck = new CommitLog.GroupCommitRequest(124,4000, 2); + CommitLog.GroupCommitRequest twoAck = new CommitLog.GroupCommitRequest(124, 4000, 3); this.haService.putRequest(twoAck); DefaultMessageStore messageStore = mockMessageStore(); @@ -207,13 +207,12 @@ public void putRequest_MultipleAckAndRequests() throws IOException, ExecutionExc assertThat(oneAck.future().get()).isEqualTo(PutMessageStatus.PUT_OK); assertThat(twoAck.future().get()).isEqualTo(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); - messageStore = mockMessageStore(); doReturn(128L).when(messageStore).getMaxPhyOffset(); doReturn(128L).when(messageStore).getMasterFlushedOffset(); setUpOneHAClient(messageStore); - twoAck = new CommitLog.GroupCommitRequest(124, 4000,2); + twoAck = new CommitLog.GroupCommitRequest(124, 4000, 3); this.haService.putRequest(twoAck); assertThat(twoAck.future().get()).isEqualTo(PutMessageStatus.PUT_OK); }