diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java index a25924e842b..bb88b6c8404 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java @@ -152,6 +152,8 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch if (newMasterEpoch > this.masterEpoch) { LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.localAddress, newMasterEpoch); + brokerController.getMessageStore().disableWrite(); + // Change record this.masterAddress = this.localAddress; this.masterEpoch = newMasterEpoch; @@ -172,6 +174,8 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch // Notify ha service, change to master this.haService.changeToMaster(newMasterEpoch); + brokerController.getMessageStore().enableWrite(); + this.executorService.submit(() -> { // Register broker to name-srv try { @@ -191,6 +195,8 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc if (newMasterEpoch > this.masterEpoch) { LOGGER.info("Begin to change to slave, brokerName={}, replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getBrokerId()); + brokerController.getMessageStore().disableWrite(); + // Change record this.masterAddress = newMasterAddress; this.masterEpoch = newMasterEpoch; @@ -208,6 +214,8 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc // Notify ha service, change to slave this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId()); + brokerController.getMessageStore().enableWrite(); + this.executorService.submit(() -> { // Register broker to name-srv try { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 42542210e54..74680979c30 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -531,4 +531,12 @@ public void setPhysicalOffset(long phyOffset) { @Override public boolean isShutdown() { return next.isShutdown(); } + + @Override public void disableWrite() { + next.disableWrite(); + } + + @Override public void enableWrite() { + next.enableWrite(); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 17561acb375..b08bc757438 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1675,6 +1675,16 @@ public RunningFlags getRunningFlags() { return runningFlags; } + @Override + public void disableWrite() { + runningFlags.getAndMakeNotWriteable(); + } + + @Override + public void enableWrite() { + runningFlags.getAndMakeWriteable(); + } + public void doDispatch(DispatchRequest req) { for (CommitLogDispatcher dispatcher : this.dispatcherList) { dispatcher.dispatch(req); diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 9dc27faff23..b0ca83979b6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -796,4 +796,14 @@ DispatchRequest checkMessageAndReturnSize(final ByteBuffer byteBuffer, final boo * @return whether shutdown */ boolean isShutdown(); + + /* + * Make MessageStore not writeable, default is writeable + */ + void disableWrite(); + + /* + * Make MessageStore not writeable, default is writeable + */ + void enableWrite(); }