Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.localAddress, newMasterEpoch);

brokerController.getMessageStore().disableWrite();

// Change record
this.masterAddress = this.localAddress;
this.masterEpoch = newMasterEpoch;
Expand All @@ -172,6 +174,8 @@ public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch
// Notify ha service, change to master
this.haService.changeToMaster(newMasterEpoch);

brokerController.getMessageStore().enableWrite();

this.executorService.submit(() -> {
// Register broker to name-srv
try {
Expand All @@ -191,6 +195,8 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to slave, brokerName={}, replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getBrokerId());

brokerController.getMessageStore().disableWrite();

// Change record
this.masterAddress = newMasterAddress;
this.masterEpoch = newMasterEpoch;
Expand All @@ -208,6 +214,8 @@ public void changeToSlave(final String newMasterAddress, final int newMasterEpoc
// Notify ha service, change to slave
this.haService.changeToSlave(newMasterAddress, newMasterEpoch, this.brokerConfig.getBrokerId());

brokerController.getMessageStore().enableWrite();

this.executorService.submit(() -> {
// Register broker to name-srv
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,12 @@ public void setPhysicalOffset(long phyOffset) {
@Override public boolean isShutdown() {
return next.isShutdown();
}

@Override public void disableWrite() {
next.disableWrite();
}

@Override public void enableWrite() {
next.enableWrite();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,16 @@ public RunningFlags getRunningFlags() {
return runningFlags;
}

@Override
public void disableWrite() {
runningFlags.getAndMakeNotWriteable();
}

@Override
public void enableWrite() {
runningFlags.getAndMakeWriteable();
}

public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
Expand Down
10 changes: 10 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -796,4 +796,14 @@ DispatchRequest checkMessageAndReturnSize(final ByteBuffer byteBuffer, final boo
* @return whether shutdown
*/
boolean isShutdown();

/*
* Make MessageStore not writeable, default is writeable
*/
void disableWrite();

/*
* Make MessageStore not writeable, default is writeable
*/
void enableWrite();
}