Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/issue_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ about: Describe this issue template's purpose here.

---

The issue tracker is **ONLY** used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.
The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ It offers a variety of features:
* Home: <https://rocketmq.apache.org>
* Docs: <https://rocketmq.apache.org/docs/quick-start/>
* Issues: <https://github.com/apache/rocketmq/issues>
* Rips: <https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal>
* Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
* Slack: <https://rocketmq-invite-automation.herokuapp.com/>

Expand All @@ -43,7 +44,7 @@ It offers a variety of features:
----------

## Contributing
We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).

----------
## License
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,15 @@ public SocketAddress getStoreHost() {
}

private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
double physicRatio = -1;
if (this.brokerController.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
for (String storePathPhysic : this.brokerController.getMessageStoreConfig().getCommitLogStorePaths()) {
physicRatio = Math.max(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
}
} else {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
}

String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
Expand Down
12 changes: 10 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,16 @@ public class CommitLog {
private final PutMessageLock putMessageLock;

public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
if (defaultMessageStore.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
} else {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
}

this.defaultMessageStore = defaultMessageStore;

if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand Down Expand Up @@ -693,11 +694,18 @@ public String getRunningDataInfo() {
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();

{
if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
double maxValue = Double.MIN_VALUE;
for (String clPath : DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths()) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(clPath);
result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
maxValue = Math.max(maxValue, physicRatio);
}
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(maxValue));
} else {
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));

}

{
Expand Down Expand Up @@ -1537,27 +1545,35 @@ private boolean isSpaceToDelete() {
cleanImmediately = false;

{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (physicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
}

cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
List<String> storePaths;
if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
storePaths = DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths();
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
}
storePaths = Collections.singletonList(DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog());
}

if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
return true;
for (String storePathPhysic : storePaths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (physicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full, storePathPhysic=" + storePathPhysic);
}

cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok, storePathPhysic=" + storePathPhysic);
}
}

if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio + ", storePathPhysic=" + storePathPhysic);
return true;
}
}
}

Expand Down
111 changes: 62 additions & 49 deletions store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
Expand All @@ -37,13 +38,13 @@ public class MappedFileQueue {

private final String storePath;

private final int mappedFileSize;
protected final int mappedFileSize;

private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

private final AllocateMappedFileService allocateMappedFileService;

private long flushedWhere = 0;
protected long flushedWhere = 0;
private long committedWhere = 0;

private volatile long storeTimestamp = 0;
Expand Down Expand Up @@ -144,35 +145,39 @@ void deleteExpiredFile(List<MappedFile> files) {
}
}


public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {

if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
File[] ls = dir.listFiles();
if (ls != null) {
return doLoad(Arrays.asList(ls));
}
return true;
}

public boolean doLoad(List<File> files) {
// ascending order
Collections.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
}
return true;
}

try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}

return true;
}

Expand Down Expand Up @@ -204,33 +209,41 @@ public MappedFile getLastMappedFile(final long startOffset, boolean needCreate)
}

if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
return tryCreateMappedFile(createOffset);
}

return mappedFileLast;
}

protected MappedFile tryCreateMappedFile(long createOffset) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}

if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;

if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}

if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}

return mappedFile;
this.mappedFiles.add(mappedFile);
}

return mappedFileLast;
return mappedFile;
}

public MappedFile getLastMappedFile(final long startOffset) {
Expand Down Expand Up @@ -398,7 +411,7 @@ public int deleteExpiredFileByOffset(long offset, int unitSize) {
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
+ maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
Expand Down Expand Up @@ -466,7 +479,7 @@ public MappedFile findMappedFileByOffset(final long offset, final boolean return
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
Expand All @@ -480,7 +493,7 @@ public MappedFile findMappedFileByOffset(final long offset, final boolean return
}

if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}

Expand Down
Loading