Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,21 +332,21 @@ private void handshakeWithMaster() throws IOException {
}
}

private boolean reportSlaveOffset(final long offsetToReport) throws IOException {
private boolean reportSlaveOffset(HAConnectionState currentState, final long offsetToReport) throws IOException {
this.transferHeaderBuffer.position(0);
this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
this.transferHeaderBuffer.putInt(this.currentState.ordinal());
this.transferHeaderBuffer.putInt(currentState.ordinal());
this.transferHeaderBuffer.putLong(offsetToReport);
this.transferHeaderBuffer.flip();
return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
}

private boolean reportSlaveMaxOffset() throws IOException {
private boolean reportSlaveMaxOffset(HAConnectionState currentState) throws IOException {
boolean result = true;
final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
if (maxPhyOffset > this.currentReportedOffset) {
this.currentReportedOffset = maxPhyOffset;
result = reportSlaveOffset(this.currentReportedOffset);
result = reportSlaveOffset(currentState, this.currentReportedOffset);
}
return result;
}
Expand All @@ -369,11 +369,11 @@ public boolean connectMaster() throws IOException {
return this.socketChannel != null;
}

private boolean transferFromMaster() throws IOException {
private boolean transferFromMaster(HAConnectionState currentState) throws IOException {
boolean result;
if (isTimeToReportOffset()) {
LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
result = reportSlaveOffset(this.currentReportedOffset);
result = reportSlaveOffset(currentState, this.currentReportedOffset);
if (!result) {
return false;
}
Expand All @@ -386,7 +386,7 @@ private boolean transferFromMaster() throws IOException {
return false;
}

return this.reportSlaveMaxOffset();
return this.reportSlaveMaxOffset(currentState);
}

@Override
Expand Down Expand Up @@ -415,7 +415,7 @@ public void run() {
handshakeWithMaster();
continue;
case TRANSFER:
if (!transferFromMaster()) {
if (!transferFromMaster(HAConnectionState.TRANSFER)) {
closeMasterAndWait();
continue;
}
Expand Down Expand Up @@ -445,7 +445,7 @@ public void run() {
/**
* Compare the master and slave's epoch file, find consistent point, do truncate.
*/
private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) throws IOException {
private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset, HAConnectionState currentState) throws IOException {
if (this.epochCache.getEntrySize() == 0) {
// If epochMap is empty, means the broker is a new replicas
LOGGER.info("Slave local epochCache is empty, skip truncate log");
Expand Down Expand Up @@ -475,7 +475,7 @@ private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOf
changeCurrentState(HAConnectionState.TRANSFER);
this.currentReportedOffset = truncateOffset;
}
if (!reportSlaveMaxOffset()) {
if (!reportSlaveMaxOffset(currentState)) {
LOGGER.error("AutoSwitchHAClient report max offset to master failed");
return false;
}
Expand Down Expand Up @@ -534,7 +534,7 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {
byteBufferRead.position(readSocketPos);
AutoSwitchHAClient.this.processPosition += bodySize;
LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries);
if (!doTruncate(epochEntries, masterOffset)) {
if (!doTruncate(epochEntries, masterOffset, HAConnectionState.HANDSHAKE)) {
waitForRunning(1000 * 2);
LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state");
return false;
Expand Down Expand Up @@ -573,7 +573,7 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {

haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));

if (!reportSlaveMaxOffset()) {
if (!reportSlaveMaxOffset(HAConnectionState.TRANSFER)) {
LOGGER.error("AutoSwitchHAClient report max offset to master failed");
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {
break;
default:
LOGGER.error("Current state illegal {}", currentState);
break;
return false;
}

if (!slaveState.equals(currentState)) {
Expand Down