Skip to content

Commit 86535d7

Browse files
committed
style(store): optimize some typos in store module
1. optimize some typos in store module
1 parent 0f878b5 commit 86535d7

File tree

4 files changed

+48
-19
lines changed

4 files changed

+48
-19
lines changed

store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,26 @@
3535

3636
public class DefaultHAClient extends ServiceThread implements HAClient {
3737

38+
/**
39+
* Report header buffer size. Schema: slaveMaxOffset. Format:
40+
*
41+
* <pre>
42+
* ┌───────────────────────┬───────────────────────┐
43+
* │ slaveMaxOffset │
44+
* │ (8bytes) │
45+
* ├───────────────────────┴───────────────────────┤
46+
* │ │
47+
* │ Report Header │
48+
* </pre>
49+
* <p>
50+
*/
51+
public static final int REPORT_HEADER = 8;
52+
3853
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
3954
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
4055
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
4156
private final AtomicReference<String> masterAddress = new AtomicReference<>();
42-
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
57+
private final ByteBuffer reportOffset = ByteBuffer.allocate(REPORT_HEADER);
4358
private SocketChannel socketChannel;
4459
private Selector selector;
4560
/**
@@ -94,10 +109,10 @@ private boolean isTimeToReportOffset() {
94109

95110
private boolean reportSlaveMaxOffset(final long maxOffset) {
96111
this.reportOffset.position(0);
97-
this.reportOffset.limit(8);
112+
this.reportOffset.limit(REPORT_HEADER);
98113
this.reportOffset.putLong(maxOffset);
99114
this.reportOffset.position(0);
100-
this.reportOffset.limit(8);
115+
this.reportOffset.limit(REPORT_HEADER);
101116

102117
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
103118
try {
@@ -167,12 +182,11 @@ private boolean processReadEvent() {
167182
}
168183

169184
private boolean dispatchReadRequest() {
170-
final int msgHeaderSize = 8 + 4; // phyoffset + size
171185
int readSocketPos = this.byteBufferRead.position();
172186

173187
while (true) {
174188
int diff = this.byteBufferRead.position() - this.dispatchPosition;
175-
if (diff >= msgHeaderSize) {
189+
if (diff >= DefaultHAConnection.TRANSFER_HEADER_SIZE) {
176190
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
177191
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
178192

@@ -186,15 +200,15 @@ private boolean dispatchReadRequest() {
186200
}
187201
}
188202

189-
if (diff >= (msgHeaderSize + bodySize)) {
203+
if (diff >= (DefaultHAConnection.TRANSFER_HEADER_SIZE + bodySize)) {
190204
byte[] bodyData = byteBufferRead.array();
191-
int dataStart = this.dispatchPosition + msgHeaderSize;
205+
int dataStart = this.dispatchPosition + DefaultHAConnection.TRANSFER_HEADER_SIZE;
192206

193207
this.defaultMessageStore.appendToCommitLog(
194208
masterPhyOffset, bodyData, dataStart, bodySize);
195209

196210
this.byteBufferRead.position(readSocketPos);
197-
this.dispatchPosition += msgHeaderSize + bodySize;
211+
this.dispatchPosition += DefaultHAConnection.TRANSFER_HEADER_SIZE + bodySize;
198212

199213
if (!reportSlaveMaxOffsetPlus()) {
200214
return false;

store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,22 @@
3131
import org.apache.rocketmq.store.SelectMappedBufferResult;
3232

3333
public class DefaultHAConnection implements HAConnection {
34+
35+
/**
36+
* Transfer Header buffer size. Schema: physic offset and body size. Format:
37+
*
38+
* <pre>
39+
* ┌───────────────────────┬───────────────────────┬───────────────────────┐
40+
* │ physicOffset │ bodySize │
41+
* │ (8bytes) │ (4bytes) │
42+
* ├───────────────────────┴───────────────────────┴───────────────────────┤
43+
* │ │
44+
* │ Transfer Header │
45+
* </pre>
46+
* <p>
47+
*/
48+
public static final int TRANSFER_HEADER_SIZE = 8 + 4;
49+
3450
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
3551
private final DefaultHAService haService;
3652
private final SocketChannel socketChannel;
@@ -204,8 +220,8 @@ private boolean processReadEvent() {
204220
if (readSize > 0) {
205221
readSizeZeroTimes = 0;
206222
this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
207-
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
208-
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
223+
if ((this.byteBufferRead.position() - this.processPosition) >= DefaultHAClient.REPORT_HEADER) {
224+
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % DefaultHAClient.REPORT_HEADER);
209225
long readOffset = this.byteBufferRead.getLong(pos - 8);
210226
this.processPosition = pos;
211227

@@ -239,8 +255,7 @@ class WriteSocketService extends ServiceThread {
239255
private final Selector selector;
240256
private final SocketChannel socketChannel;
241257

242-
private final int headerSize = 8 + 4;
243-
private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
258+
private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
244259
private long nextTransferFromWhere = -1;
245260
private SelectMappedBufferResult selectMappedBufferResult;
246261
private boolean lastWriteOver = true;
@@ -298,7 +313,7 @@ public void run() {
298313

299314
// Build Header
300315
this.byteBufferHeader.position(0);
301-
this.byteBufferHeader.limit(headerSize);
316+
this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
302317
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
303318
this.byteBufferHeader.putInt(0);
304319
this.byteBufferHeader.flip();
@@ -340,7 +355,7 @@ public void run() {
340355

341356
// Build Header
342357
this.byteBufferHeader.position(0);
343-
this.byteBufferHeader.limit(headerSize);
358+
this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
344359
this.byteBufferHeader.putLong(thisOffset);
345360
this.byteBufferHeader.putInt(size);
346361
this.byteBufferHeader.flip();

store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void calculateSpeed() {
4545
}
4646

4747
public int canTransferMaxByteNum() {
48-
//Flow control is not started at present
48+
// Flow control is not started at present
4949
if (this.isFlowControlEnable()) {
5050
long res = Math.max(this.maxTransferByteInSecond() - this.transferredByte.get(), 0);
5151
return res > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) res;

store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {
495495
int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 4);
496496
long masterEpochStartOffset = 0;
497497
long confirmOffset = 0;
498-
// if master send transfer header data, set masterEpochStartOffset and confirmOffset value.
498+
// If master send transfer header data, set masterEpochStartOffset and confirmOffset value.
499499
if (masterState == HAConnectionState.TRANSFER.ordinal() && diff >= AutoSwitchHAConnection.TRANSFER_HEADER_SIZE) {
500500
masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 16);
501501
confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 8);
@@ -509,12 +509,12 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {
509509
return false;
510510
}
511511

512-
//flag whether the received data is complete
512+
// Flag whether the received data is complete
513513
boolean isComplete = true;
514514
switch (AutoSwitchHAClient.this.currentState) {
515515
case HANDSHAKE: {
516516
if (diff < AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
517-
//The received HANDSHAKE data is not complete
517+
// The received HANDSHAKE data is not complete
518518
isComplete = false;
519519
break;
520520
}
@@ -540,7 +540,7 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {
540540
break;
541541
case TRANSFER: {
542542
if (diff < AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize) {
543-
//The received TRANSFER data is not complete
543+
// The received TRANSFER data is not complete
544544
isComplete = false;
545545
break;
546546
}

0 commit comments

Comments
 (0)