Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b42acdd
[ISSUE #3148]Support metadata export
Jul 15, 2021
a729957
[ISSUE #3148]Support metadata export
Jul 15, 2021
bc6b433
[ISSUE #3148]Support metadata export
Jul 16, 2021
18251c1
[ISSUE #3148]Support metadata export
Jul 20, 2021
9f0b5cc
[ISSUE #3148]Support metadata export
Jul 20, 2021
c9a647d
[ISSUE #3148]Support metadata export
Jul 30, 2021
248f1a0
[ISSUE #3199]two timed task for RequestFutureTable
panzhi33 Aug 5, 2021
a643a50
[ISSUE #3148]Support metadata export
panzhi33 Aug 5, 2021
8d01b47
[ISSUE #3148]Support metadata export
panzhi33 Aug 5, 2021
3800f23
[ISSUE #3148]Support metadata export
panzhi33 Aug 6, 2021
5da18ea
[ISSUE #3148]Support metadata export
panzhi33 Aug 6, 2021
d0111c5
[ISSUE #3148]Support metadata export
panzhi33 Aug 10, 2021
f646fcc
[ISSUE #3148]Support metadata export
panzhi33 Aug 11, 2021
064a3b8
[ISSUE #3148]Support metadata export
panzhi33 Aug 12, 2021
b610091
[ISSUE #3148]Support metadata export
panzhi33 Aug 12, 2021
d47f051
[ISSUE #3148]Support metadata export
panzhi33 Aug 12, 2021
6013bfb
[ISSUE #3148]Support metadata export
panzhi33 Aug 16, 2021
d904d91
[ISSUE #3148]Support metadata export
panzhi33 Aug 30, 2021
6fdc201
[ISSUE #3148]Support metadata export
panzhi33 Aug 31, 2021
303500b
[ISSUE #3148]Support metadata export
panzhi33 Aug 31, 2021
2bff096
[ISSUE #3148]Support metadata export
panzhi33 Aug 31, 2021
107ba89
[ISSUE #3148]Support metadata export
panzhi33 Aug 31, 2021
da4f873
[ISSUE #3148]Support metadata export
panzhi33 Sep 8, 2021
27b0cfe
[ISSUE #3148]Support metadata export
panzhi33 Sep 9, 2021
7b2c1f6
[ISSUE #3148]Support metadata export
panzhi33 Sep 10, 2021
bc35d5c
[ISSUE #3148]Support metadata export
panzhi33 Sep 12, 2021
6de3ce7
[ISSUE #3148]Support metadata export
panzhi33 Sep 15, 2021
55bdbfe
[ISSUE #3148]Support metadata export
panzhi33 Sep 18, 2021
14be0bf
[ISSUE #3148]Support metadata export
panzhi33 Sep 24, 2021
75271df
[ISSUE #3148]Support metadata export
panzhi33 Sep 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions distribution/bin/export.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/bin/bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ -z "$ROCKETMQ_HOME" ]; then
## resolve links - $0 may be a link to maven's home
PRG="$0"

# need this for relative symlinks
while [ -h "$PRG" ]; do
ls=$(ls -ld "$PRG")
link=$(expr "$ls" : '.*-> \(.*\)$')
if expr "$link" : '/.*' >/dev/null; then
PRG="$link"
else
PRG="$(dirname "$PRG")/$link"
fi
done

saveddir=$(pwd)

ROCKETMQ_HOME=$(dirname "$PRG")/..

# make it fully qualified
ROCKETMQ_HOME=$(cd "$ROCKETMQ_HOME" && pwd)

cd "$saveddir"
fi

export ROCKETMQ_HOME

namesrvAddr=
while [ -z "${namesrvAddr}" ]; do
read -p "Enter name server address list:" namesrvAddr
done

clusterName=
while [ -z "${clusterName}" ]; do
read -p "Choose a cluster to export:" clusterName
done

read -p "Enter file path to export [default /tmp/rocketmq/export]:" filePath
if [ -z "${filePath}" ]; then
filePath="/tmp/rocketmq/config"
fi

if [[ -e ${filePath} ]]; then
rm -rf ${filePath}
fi

sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetrics -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
sh ${ROCKETMQ_HOME}/bin/mqadmin exportConfigs -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetadata -c ${clusterName} -n ${namesrvAddr} -f ${filePath}

cd ${filePath} || exit

configs=$(cat ./configs.json)
if [ -z "$configs" ]; then
configs="{}"
fi
metadata=$(cat ./metadata.json)
if [ -z "$metadata" ]; then
metadata="{}"
fi
metrics=$(cat ./metrics.json)
if [ -z "$metrics" ]; then
metrics="{}"
fi

echo "{
\"configs\": ${configs},
\"metadata\": ${metadata},
\"metrics\": ${metrics}
}" >rocketmq-metadata-export.json

echo -e "[INFO] The RocketMQ metadata has been exported to the file:${filePath}/rocketmq-metadata-export.json"
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,24 @@ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
}

@Override
public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis);
return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr, timeoutMillis);
}

@Override
public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, timeoutMillis);
}

@Override
public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException {
return this.defaultMQAdminExtImpl.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis);
}

/* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,24 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
private long timeoutMillis = 20000;
private Random random = new Random();

private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();

static {
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
}

public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) {
this(defaultMQAdminExt, null, timeoutMillis);
}
Expand Down Expand Up @@ -955,12 +973,49 @@ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
}

@Override
public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl()
.getAllSubscriptionGroup(brokerAddr, timeoutMillis);

Iterator<Entry<String, SubscriptionGroupConfig>> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable()
.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
if (MixAll.isSysConsumerGroup(configEntry.getKey()) || SYSTEM_GROUP_SET.contains(configEntry.getKey())) {
iterator.remove();
}
}

return subscriptionGroupWrapper;
}

@Override
public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
}

@Override
public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.getAllTopicConfig(brokerAddr, timeoutMillis);
TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr,
timeoutMillis);
Iterator<Entry<String, TopicConfig>> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet()
.iterator();
while (iterator.hasNext()) {
String topic = iterator.next().getKey();
if (topicList.getTopicList().contains(topic) || (!specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) {
iterator.remove();
}
}
return topicConfigSerializeWrapper;
}

@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,18 @@ SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;

TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;

TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;

TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException;

void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,4 @@ public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final Stri
}
throw new Exception(ERROR_MESSAGE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand;
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
Expand All @@ -72,6 +74,7 @@
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicListSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand;
Expand Down Expand Up @@ -213,6 +216,10 @@ public static void initCommand() {
initCommand(new ClusterAclConfigVersionListSubCommand());
initCommand(new UpdateGlobalWhiteAddrSubCommand());
initCommand(new GetAccessConfigSubCommand());

initCommand(new ExportMetadataCommand());
initCommand(new ExportConfigsCommand());
initCommand(new ExportMetricsCommand());
}

private static void initLogback() throws JoranException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.export;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.alibaba.fastjson.JSON;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;

public class ExportConfigsCommand implements SubCommand {
@Override
public String commandName() {
return "exportConfigs";
}

@Override
public String commandDesc() {
return "export configs";
}

@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("f", "filePath", true,
"export configs.json path | default /tmp/rocketmq/export");
opt.setRequired(false);
options.addOption(opt);
return options;
}

@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

try {
String clusterName = commandLine.getOptionValue('c').trim();
String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define a constant for "/tmp/rocketmq/export"

.trim();

defaultMQAdminExt.start();
Map<String, Object> result = new HashMap<>();
// name servers
List<String> nameServerAddressList = defaultMQAdminExt.getNameServerAddressList();

//broker
int masterBrokerSize = 0;
int slaveBrokerSize = 0;
Map<String, Properties> brokerConfigs = new HashMap<>();
Map<String, List<String>> masterAndSlaveMap
= CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
for (String masterAddr : masterAndSlaveMap.keySet()) {
Properties masterProperties = defaultMQAdminExt.getBrokerConfig(masterAddr);
masterBrokerSize++;
slaveBrokerSize += masterAndSlaveMap.get(masterAddr).size();

brokerConfigs.put(masterProperties.getProperty("brokerName"), needBrokerProprties(masterProperties));
}

Map<String, Integer> clusterScaleMap = new HashMap<>();
clusterScaleMap.put("namesrvSize", nameServerAddressList.size());
clusterScaleMap.put("masterBrokerSize", masterBrokerSize);
clusterScaleMap.put("slaveBrokerSize", slaveBrokerSize);

result.put("brokerConfigs", brokerConfigs);
result.put("clusterScale", clusterScaleMap);

String path = filePath + "/configs.json";
MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path);
System.out.printf("export %s success", path);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}

private Properties needBrokerProprties(Properties properties) {
Properties newProperties = new Properties();
newProperties.setProperty("brokerClusterName", properties.getProperty("brokerClusterName"));
newProperties.setProperty("brokerId", properties.getProperty("brokerId"));
newProperties.setProperty("brokerName", properties.getProperty("brokerName"));
newProperties.setProperty("brokerRole", properties.getProperty("brokerRole"));
newProperties.setProperty("fileReservedTime", properties.getProperty("fileReservedTime"));
newProperties.setProperty("filterServerNums", properties.getProperty("filterServerNums"));
newProperties.setProperty("flushDiskType", properties.getProperty("flushDiskType"));
newProperties.setProperty("maxMessageSize", properties.getProperty("maxMessageSize"));
newProperties.setProperty("messageDelayLevel", properties.getProperty("messageDelayLevel"));
newProperties.setProperty("msgTraceTopicName", properties.getProperty("msgTraceTopicName"));
newProperties.setProperty("slaveReadEnable", properties.getProperty("slaveReadEnable"));
newProperties.setProperty("traceOn", properties.getProperty("traceOn"));
newProperties.setProperty("traceTopicEnable", properties.getProperty("traceTopicEnable"));
newProperties.setProperty("useTLS", properties.getProperty("useTLS"));
newProperties.setProperty("autoCreateTopicEnable", properties.getProperty("autoCreateTopicEnable"));
newProperties.setProperty("autoCreateSubscriptionGroup", properties.getProperty("autoCreateSubscriptionGroup"));
return newProperties;
}
}
Loading