Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class MixAll {
public static final long FIRST_SLAVE_ID = 1L;
public static final long CURRENT_JVM_PID = getPID();
public final static int UNIT_PRE_SIZE_FOR_MSG = 28;
public final static int ALL_ACK_IN_SYNC_STATE_SET = -1;

public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
Expand Down
11 changes: 4 additions & 7 deletions docs/cn/controller/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

当前 RocketMQ Raft 模式主要是利用 DLedger Commitlog 替换原来的 Commitlog,使 Commitlog 拥有选举复制能力,但这也造成了一些问题:

- Raft 模式下,Broker组内副本数必须是三副本及以上。
- Raft 模式下,Broker组内副本数必须是三副本及以上,副本的ACK也必须遵循多数派协议
- RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。
- Raft 模式下, 日志复制性能并不高效。

因此我们希望利用 DLedger 实现一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件, 支持独立部署, 也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举, 从而解决上述问题, 我们将该新模式称为 Controller 模式。

Expand All @@ -27,7 +26,7 @@

![image-20220605213143645](../image/controller/quick-start/controller.png)

如果, 是 DledgerController 的核心设计:
如图是 DledgerController 的核心设计:

- DLedgerController 可以内嵌在 Namesrv 中, 也可以独立的部署。
- Active DLedgerController 是 DLedger 选举出来的 Leader, 其会接受来自客户端的事件请求, 并通过 DLedger 发起共识, 最后应用到内存元数据状态机中。
Expand Down Expand Up @@ -195,11 +194,9 @@ Shrink SyncStateSet ,指把 SyncStateSet 副本集合中那些与Master差距

- ReadSocketService 接收到 slaveAckOffset 时若 slaveAckOffset >= lastMasterMaxOffset 则将lastCaughtUpTimeMs 更新为 lastTransferTimeMs。

- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time - connection.lastCaughtUpTimeMs) >
- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time - connection.lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchUp,则该 Slave 是 Out-of-sync 的。

haMaxTimeSlaveNotCatchUp,则该 Slave 是 Out-of-sync 的 。

- 如果检测到 Slave out of sync , master 会立刻和 Controller 上报, 从而 Shrink SyncStateSet 。
- 如果检测到 Slave out of sync , master 会立刻和 Controller 上报, 从而 Shrink SyncStateSet。

#### Expand

Expand Down
6 changes: 3 additions & 3 deletions docs/cn/controller/quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## 前言

该文档主要介绍如何快速构建和部署基于 Controller 的可以自动容灾切换的 RocketMQ 集群。
该文档主要介绍如何快速构建和部署基于 Controller 的可以自动切换的 RocketMQ 集群。

详细的新集群部署和旧集群升级指南请参考 [部署指南](deploy_guide.md)。

Expand All @@ -26,7 +26,7 @@

如果上面的步骤执行成功,可以通过运维命令查看集群状态。

至此, 启动成功,现在可以向集群收发消息,并进行容灾切换测试了
至此, 启动成功,现在可以向集群收发消息,并进行切换测试了

如果需要关闭快速集群,可以执行:

Expand Down Expand Up @@ -58,7 +58,7 @@

![image-20220605205247476](../image/controller/quick-start/epoch.png)

## 容灾切换
## 切换

部署成功后,现在尝试进行 Master 切换。

Expand Down
54 changes: 26 additions & 28 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ else if (!dispatchRequest.isSuccess()) {
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}


} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
Expand Down Expand Up @@ -803,13 +802,18 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
boolean needHandleHA = needHandleHA(msg);
int needAckNums = 1;

if (needHandleHA && !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
needAckNums = calcNeedAckNums(inSyncReplicas);
if (needAckNums > inSyncReplicas) {
// Tell the producer, don't have enough slaves to handle the send request
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
if (needHandleHA) {
if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode() && this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
// -1 means all ack in SyncStateSet
needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
} else {
int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
needAckNums = calcNeedAckNums(inSyncReplicas);
if (needAckNums > inSyncReplicas) {
// Tell the producer, don't have enough slaves to handle the send request
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
}
}
}

Expand Down Expand Up @@ -950,13 +954,18 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc
int needAckNums = 1;
boolean needHandleHA = needHandleHA(messageExtBatch);

if (needHandleHA && !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
needAckNums = calcNeedAckNums(inSyncReplicas);
if (needAckNums > inSyncReplicas) {
// Tell the producer, don't have enough slaves to handle the send request
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
if (needHandleHA) {
if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode() && this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
// -1 means all ack in SyncStateSet
needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
} else {
int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
needAckNums = calcNeedAckNums(inSyncReplicas);
if (needAckNums > inSyncReplicas) {
// Tell the producer, don't have enough slaves to handle the send request
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
}
}
}

Expand Down Expand Up @@ -1101,8 +1110,7 @@ private CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult

private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
int needAckNums) {
final boolean allAckInSyncStateSet = this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet();
if (needAckNums <= 1 && !allAckInSyncStateSet) {
if (needAckNums >= 0 && needAckNums <= 1) {
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}

Expand All @@ -1122,7 +1130,7 @@ private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result,
// }

// Wait enough acks from different slaves
GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums - 1, allAckInSyncStateSet);
GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
haService.putRequest(request);
haService.getWaitNotifyObject().wakeupAll();
return request.future();
Expand Down Expand Up @@ -1389,7 +1397,6 @@ public static class GroupCommitRequest {
private final CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
private volatile int ackNums = 1;
private final long deadLine;
private boolean allAckInSyncStateSet;

public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
Expand All @@ -1401,11 +1408,6 @@ public GroupCommitRequest(long nextOffset, long timeoutMillis, int ackNums) {
this.ackNums = ackNums;
}

public GroupCommitRequest(long nextOffset, long timeoutMillis, int ackNums, boolean allAckInSyncStateSet) {
this(nextOffset, timeoutMillis, ackNums);
this.allAckInSyncStateSet = allAckInSyncStateSet;
}

public long getNextOffset() {
return nextOffset;
}
Expand All @@ -1418,10 +1420,6 @@ public long getDeadLine() {
return deadLine;
}

public boolean isAllAckInSyncStateSet() {
return allAckInSyncStateSet;
}

public void wakeupCustomer(final PutMessageStatus status) {
this.flushOKFuture.complete(status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
Expand Down Expand Up @@ -74,14 +75,14 @@ private void doWaitTransfer() {
boolean transferOK = false;

long deadLine = req.getDeadLine();
final boolean allAckInSyncStateSet = req.isAllAckInSyncStateSet();
final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;

for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) {
if (i > 0) {
this.notifyTransferObject.waitForRunning(1000);
}

if (req.getAckNums() <= 1 && !allAckInSyncStateSet) {
if (!allAckInSyncStateSet && req.getAckNums() <= 1) {
transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
continue;
}
Expand All @@ -95,20 +96,21 @@ private void doWaitTransfer() {
transferOK = true;
break;
}
// Include master.
// Include master
int ackNums = 1;
for (HAConnection conn : haService.getConnectionList()) {
final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
ackNums ++;
ackNums++;
}
if (ackNums >= syncStateSet.size()) {
transferOK = true;
break;
}
}
} else {
int ackNums = 0;
// Include master
int ackNums = 1;
for (HAConnection conn : haService.getConnectionList()) {
// TODO: We must ensure every HAConnection represents a different slave
// Solution: Consider assign a unique and fixed IP:ADDR for each different slave
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Boolean call() throws Exception {

@Test
public void putRequest_SingleAck() throws IOException, ExecutionException, InterruptedException, TimeoutException {
CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000,1);
CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000, 1);
this.haService.putRequest(request);

assertThat(request.future().get()).isEqualTo(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
Expand All @@ -186,17 +186,17 @@ public void putRequest_SingleAck() throws IOException, ExecutionException, Inter
doReturn(124L).when(messageStore).getMasterFlushedOffset();
setUpOneHAClient(messageStore);

request = new CommitLog.GroupCommitRequest(124, 4000,1);
request = new CommitLog.GroupCommitRequest(124, 4000, 1);
this.haService.putRequest(request);
assertThat(request.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
}

@Test
public void putRequest_MultipleAckAndRequests() throws IOException, ExecutionException, InterruptedException {
CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000,1);
CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000, 2);
this.haService.putRequest(oneAck);

CommitLog.GroupCommitRequest twoAck = new CommitLog.GroupCommitRequest(124,4000, 2);
CommitLog.GroupCommitRequest twoAck = new CommitLog.GroupCommitRequest(124, 4000, 3);
this.haService.putRequest(twoAck);

DefaultMessageStore messageStore = mockMessageStore();
Expand All @@ -207,13 +207,12 @@ public void putRequest_MultipleAckAndRequests() throws IOException, ExecutionExc
assertThat(oneAck.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
assertThat(twoAck.future().get()).isEqualTo(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);


messageStore = mockMessageStore();
doReturn(128L).when(messageStore).getMaxPhyOffset();
doReturn(128L).when(messageStore).getMasterFlushedOffset();
setUpOneHAClient(messageStore);

twoAck = new CommitLog.GroupCommitRequest(124, 4000,2);
twoAck = new CommitLog.GroupCommitRequest(124, 4000, 3);
this.haService.putRequest(twoAck);
assertThat(twoAck.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
}
Expand Down