diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java index c9303f7eabf..ba7b00c3bbe 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java @@ -49,6 +49,18 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(true); options.addOption(opt); + opt = new Option("b", "beginTimestamp", true, "Begin timestamp(ms). default:0, eg:1676730526212"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("e", "endTimestamp", true, "End timestamp(ms). default:Long.MAX_VALUE, eg:1676730526212"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "maxNum", true, "The maximum number of messages returned by the query, default:64"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -62,7 +74,19 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t final String topic = commandLine.getOptionValue('t').trim(); final String key = commandLine.getOptionValue('k').trim(); - this.queryByKey(defaultMQAdminExt, topic, key); + long beginTimestamp = 0; + long endTimestamp = Long.MAX_VALUE; + int maxNum = 64; + if (commandLine.hasOption("b")) { + beginTimestamp = Long.parseLong(commandLine.getOptionValue("b").trim()); + } + if (commandLine.hasOption("e")) { + endTimestamp = Long.parseLong(commandLine.getOptionValue("e").trim()); + } + if (commandLine.hasOption("c")) { + maxNum = Integer.parseInt(commandLine.getOptionValue("c").trim()); + } + this.queryByKey(defaultMQAdminExt, topic, key, maxNum, beginTimestamp, endTimestamp); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { @@ -70,11 +94,12 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } } - private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key) + private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key, int maxNum, long begin, + long end) throws MQClientException, InterruptedException { admin.start(); - QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE); + QueryResult queryResult = admin.queryMessage(topic, key, maxNum, begin, end); System.out.printf("%-50s %4s %40s%n", "#Message ID", "#QID", diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java index 581b0fc5b4d..2b982efefdd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java @@ -47,6 +47,19 @@ public Options buildCommandlineOptions(Options options) { opt = new Option("t", "traceTopic", true, "The name value of message trace topic"); opt.setRequired(false); options.addOption(opt); + + opt = new Option("b", "beginTimestamp", true, "Begin timestamp(ms). default:0, eg:1676730526212"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("e", "endTimestamp", true, "End timestamp(ms). default:Long.MAX_VALUE, eg:1676730526212"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "maxNum", true, "The maximum number of messages returned by the query, default:64"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -78,7 +91,21 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (commandLine.hasOption('n')) { defaultMQAdminExt.setNamesrvAddr(commandLine.getOptionValue('n').trim()); } - this.queryTraceByMsgId(defaultMQAdminExt, traceTopic, msgId); + + long beginTimestamp = 0; + long endTimestamp = Long.MAX_VALUE; + int maxNum = 64; + if (commandLine.hasOption("b")) { + beginTimestamp = Long.parseLong(commandLine.getOptionValue("b").trim()); + } + if (commandLine.hasOption("e")) { + endTimestamp = Long.parseLong(commandLine.getOptionValue("e").trim()); + } + if (commandLine.hasOption("c")) { + maxNum = Integer.parseInt(commandLine.getOptionValue("c").trim()); + } + + this.queryTraceByMsgId(defaultMQAdminExt, traceTopic, msgId, maxNum, beginTimestamp, endTimestamp); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + "command failed", e); } finally { @@ -86,10 +113,11 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } } - private void queryTraceByMsgId(final DefaultMQAdminExt admin, String traceTopic, String msgId) + private void queryTraceByMsgId(final DefaultMQAdminExt admin, String traceTopic, String msgId, int maxNum, + long begin, long end) throws MQClientException, InterruptedException { admin.start(); - QueryResult queryResult = admin.queryMessage(traceTopic, msgId, 64, 0, System.currentTimeMillis()); + QueryResult queryResult = admin.queryMessage(traceTopic, msgId, maxNum, begin, end); List messageList = queryResult.getMessageList(); List traceViews = new ArrayList<>(); for (MessageExt message : messageList) {