Skip to content

Commit bb9d6e6

Browse files
authored
[ISSUE #3798] Support container in DLedger. (#4100)
* support container in DLedger Model * use BrokerIdentity to replace BrokerConfig * add logic to support remove dLedger Broker * add unit test for add&remove dLedger broker * modify mqadmin to adapt to dLedger
1 parent 52482d4 commit bb9d6e6

File tree

6 files changed

+127
-44
lines changed

6 files changed

+127
-44
lines changed

container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class BrokerContainer implements IBrokerContainer {
6666

6767
private final ConcurrentMap<BrokerIdentity, InnerSalveBrokerController> slaveBrokerControllers = new ConcurrentHashMap<>();
6868
private final ConcurrentMap<BrokerIdentity, InnerBrokerController> masterBrokerControllers = new ConcurrentHashMap<>();
69+
private final ConcurrentMap<BrokerIdentity, InnerBrokerController> dLedgerBrokerControllers = new ConcurrentHashMap<>();
6970
private final List<BrokerBootHook> brokerBootHookList = new ArrayList<>();
7071
private final BrokerContainerProcessor brokerContainerProcessor;
7172
private final Configuration configuration;
@@ -221,6 +222,10 @@ public void shutdown() {
221222

222223
masterBrokerControllers.clear();
223224

225+
// Shutdown dLedger brokers
226+
dLedgerBrokerControllers.values().forEach(InnerBrokerController::shutdown);
227+
dLedgerBrokerControllers.clear();
228+
224229
// Shutdown the remoting server with a high priority to avoid further traffic
225230
if (this.remotingServer != null) {
226231
this.remotingServer.shutdown();
@@ -258,16 +263,51 @@ public void registerBrokerBootHook(BrokerBootHook brokerBootHook) {
258263
@Override
259264
public InnerBrokerController addBroker(final BrokerConfig brokerConfig,
260265
final MessageStoreConfig storeConfig) throws Exception {
261-
if (brokerConfig.getBrokerId() == MixAll.MASTER_ID && storeConfig.getBrokerRole() != BrokerRole.SLAVE) {
262-
return this.addMasterBroker(brokerConfig, storeConfig);
263-
}
264-
if (brokerConfig.getBrokerId() != MixAll.MASTER_ID && storeConfig.getBrokerRole() == BrokerRole.SLAVE) {
265-
return this.addSlaveBroker(brokerConfig, storeConfig);
266+
if (storeConfig.isEnableDLegerCommitLog()) {
267+
return this.addDLedgerBroker(brokerConfig, storeConfig);
268+
} else {
269+
if (brokerConfig.getBrokerId() == MixAll.MASTER_ID && storeConfig.getBrokerRole() != BrokerRole.SLAVE) {
270+
return this.addMasterBroker(brokerConfig, storeConfig);
271+
}
272+
if (brokerConfig.getBrokerId() != MixAll.MASTER_ID && storeConfig.getBrokerRole() == BrokerRole.SLAVE) {
273+
return this.addSlaveBroker(brokerConfig, storeConfig);
274+
}
266275
}
267276

268277
return null;
269278
}
270279

280+
public InnerBrokerController addDLedgerBroker(final BrokerConfig brokerConfig, final MessageStoreConfig storeConfig) throws Exception {
281+
brokerConfig.setInBrokerContainer(true);
282+
if (storeConfig.isDuplicationEnable()) {
283+
LOG.error("Can not add broker to container when duplicationEnable is true currently");
284+
throw new Exception("Can not add broker to container when duplicationEnable is true currently");
285+
}
286+
InnerBrokerController brokerController = new InnerBrokerController(this, brokerConfig, storeConfig);
287+
BrokerIdentity brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
288+
brokerConfig.getBrokerName(), Integer.parseInt(storeConfig.getdLegerSelfId().substring(1)));
289+
final BrokerController previousBroker = dLedgerBrokerControllers.putIfAbsent(brokerIdentity, brokerController);
290+
if (previousBroker == null) {
291+
// New dLedger broker added, start it
292+
try {
293+
BrokerLogbackConfigurator.doConfigure(brokerIdentity);
294+
final boolean initResult = brokerController.initialize();
295+
if (!initResult) {
296+
brokerController.shutdown();
297+
dLedgerBrokerControllers.remove(brokerIdentity);
298+
throw new Exception("Failed to init dLedger broker " + brokerIdentity.getCanonicalName());
299+
}
300+
} catch (Exception e) {
301+
// Remove the failed dLedger broker and throw the exception
302+
brokerController.shutdown();
303+
dLedgerBrokerControllers.remove(brokerIdentity);
304+
throw new Exception("Failed to initialize dLedger broker " + brokerIdentity.getCanonicalName(), e);
305+
}
306+
return brokerController;
307+
}
308+
throw new Exception(brokerIdentity.getCanonicalName() + " has already been added to current broker");
309+
}
310+
271311
public InnerBrokerController addMasterBroker(final BrokerConfig masterBrokerConfig,
272312
final MessageStoreConfig storeConfig) throws Exception {
273313

@@ -356,6 +396,12 @@ public InnerSalveBrokerController addSlaveBroker(final BrokerConfig slaveBrokerC
356396
@Override
357397
public BrokerController removeBroker(final BrokerIdentity brokerIdentity) throws Exception {
358398

399+
InnerBrokerController dLedgerController = dLedgerBrokerControllers.remove(brokerIdentity);
400+
if (dLedgerController != null) {
401+
dLedgerController.shutdown();
402+
return dLedgerController;
403+
}
404+
359405
InnerSalveBrokerController slaveBroker = slaveBrokerControllers.remove(brokerIdentity);
360406
if (slaveBroker != null) {
361407
slaveBroker.shutdown();

container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -116,29 +116,31 @@ private synchronized RemotingCommand addBroker(ChannelHandlerContext ctx,
116116
brokerConfig.setBrokerConfigPath(configPath);
117117
}
118118

119-
switch (messageStoreConfig.getBrokerRole()) {
120-
case ASYNC_MASTER:
121-
case SYNC_MASTER:
122-
brokerConfig.setBrokerId(MixAll.MASTER_ID);
123-
break;
124-
case SLAVE:
125-
if (brokerConfig.getBrokerId() <= 0) {
126-
response.setCode(ResponseCode.SYSTEM_ERROR);
127-
response.setRemark("slave broker id must be > 0");
128-
return response;
129-
}
130-
break;
131-
default:
132-
break;
119+
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
120+
switch (messageStoreConfig.getBrokerRole()) {
121+
case ASYNC_MASTER:
122+
case SYNC_MASTER:
123+
brokerConfig.setBrokerId(MixAll.MASTER_ID);
124+
break;
125+
case SLAVE:
126+
if (brokerConfig.getBrokerId() <= 0) {
127+
response.setCode(ResponseCode.SYSTEM_ERROR);
128+
response.setRemark("slave broker id must be > 0");
129+
return response;
130+
}
131+
break;
132+
default:
133+
break;
133134

134-
}
135+
}
135136

136-
if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
137-
|| messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
138-
|| messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
139-
response.setCode(ResponseCode.SYSTEM_ERROR);
140-
response.setRemark("invalid replicas number");
141-
return response;
137+
if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
138+
|| messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
139+
|| messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
140+
response.setCode(ResponseCode.SYSTEM_ERROR);
141+
response.setRemark("invalid replicas number");
142+
return response;
143+
}
142144
}
143145

144146
BrokerController brokerController;
@@ -163,9 +165,14 @@ private synchronized RemotingCommand addBroker(ChannelHandlerContext ctx,
163165
}
164166
} catch (Exception e) {
165167
LOGGER.error("start broker exception {}", e);
166-
BrokerIdentity brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
167-
brokerConfig.getBrokerName(),
168-
brokerConfig.getBrokerId());
168+
BrokerIdentity brokerIdentity;
169+
if (messageStoreConfig.isEnableDLegerCommitLog()) {
170+
brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
171+
brokerConfig.getBrokerName(), Integer.parseInt(messageStoreConfig.getdLegerSelfId().substring(1)));
172+
} else {
173+
brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
174+
brokerConfig.getBrokerName(), brokerConfig.getBrokerId());
175+
}
169176
this.brokerContainer.removeBroker(brokerIdentity);
170177
brokerController.shutdown();
171178
response.setCode(ResponseCode.SYSTEM_ERROR);

container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import java.util.Set;
3737

3838
import ch.qos.logback.core.util.FileSize;
39-
import org.apache.rocketmq.common.BrokerConfig;
39+
import org.apache.rocketmq.common.BrokerIdentity;
4040
import org.apache.rocketmq.common.constant.LoggerName;
4141
import org.apache.rocketmq.logging.InternalLogger;
4242
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -53,8 +53,8 @@ public class BrokerLogbackConfigurator {
5353
public static final String SUFFIX_APPENDER = "Appender";
5454
public static final String SUFFIX_INNER_APPENDER = "_inner";
5555

56-
public static void doConfigure(BrokerConfig brokerConfig) {
57-
if (!CONFIGURED_BROKER_LIST.contains(brokerConfig.getCanonicalName())) {
56+
public static void doConfigure(BrokerIdentity brokerIdentity) {
57+
if (!CONFIGURED_BROKER_LIST.contains(brokerIdentity.getCanonicalName())) {
5858
try {
5959
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
6060
for (ch.qos.logback.classic.Logger tempLogger : lc.getLoggerList()) {
@@ -64,15 +64,15 @@ public static void doConfigure(BrokerConfig brokerConfig) {
6464
&& !loggerName.equals(LoggerName.ACCOUNT_LOGGER_NAME)
6565
&& !loggerName.equals(LoggerName.COMMERCIAL_LOGGER_NAME)
6666
&& !loggerName.equals(LoggerName.CONSUMER_STATS_LOGGER_NAME)) {
67-
ch.qos.logback.classic.Logger logger = lc.getLogger(brokerConfig.getLoggerIdentifier() + loggerName);
67+
ch.qos.logback.classic.Logger logger = lc.getLogger(brokerIdentity.getLoggerIdentifier() + loggerName);
6868
logger.setAdditive(tempLogger.isAdditive());
6969
logger.setLevel(tempLogger.getLevel());
7070
String appenderName = loggerName + SUFFIX_APPENDER;
7171
Appender<ILoggingEvent> tempAppender = tempLogger.getAppender(appenderName);
7272
if (tempAppender instanceof AsyncAppender) {
7373
AsyncAppender tempAsyncAppender = (AsyncAppender) tempAppender;
7474
AsyncAppender asyncAppender = new AsyncAppender();
75-
asyncAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
75+
asyncAppender.setName(brokerIdentity.getLoggerIdentifier() + appenderName);
7676
asyncAppender.setContext(tempAsyncAppender.getContext());
7777

7878
String innerAppenderName = appenderName + SUFFIX_INNER_APPENDER;
@@ -81,34 +81,34 @@ public static void doConfigure(BrokerConfig brokerConfig) {
8181
continue;
8282
}
8383
asyncAppender.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempInnerAppender,
84-
brokerConfig, innerAppenderName));
84+
brokerIdentity, innerAppenderName));
8585
asyncAppender.start();
8686
logger.addAppender(asyncAppender);
8787
} else if (tempAppender instanceof RollingFileAppender) {
8888
logger.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempAppender,
89-
brokerConfig, appenderName));
89+
brokerIdentity, appenderName));
9090
}
9191
}
9292
}
9393
} catch (Exception e) {
94-
LOG.error("Configure logback for broker {} failed, will use default broker log config instead. {}", brokerConfig.getCanonicalName(), e);
94+
LOG.error("Configure logback for broker {} failed, will use default broker log config instead. {}", brokerIdentity.getCanonicalName(), e);
9595
return;
9696
}
9797

98-
CONFIGURED_BROKER_LIST.add(brokerConfig.getCanonicalName());
98+
CONFIGURED_BROKER_LIST.add(brokerIdentity.getCanonicalName());
9999
}
100100
}
101101

102102
private static RollingFileAppender<ILoggingEvent> configureRollingFileAppender(
103-
RollingFileAppender<ILoggingEvent> tempRollingFileAppender, BrokerConfig brokerConfig, String appenderName)
103+
RollingFileAppender<ILoggingEvent> tempRollingFileAppender, BrokerIdentity brokerIdentity, String appenderName)
104104
throws NoSuchFieldException, IllegalAccessException {
105105
RollingFileAppender<ILoggingEvent> rollingFileAppender = new RollingFileAppender<>();
106106

107107
// configure appender name
108-
rollingFileAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
108+
rollingFileAppender.setName(brokerIdentity.getLoggerIdentifier() + appenderName);
109109

110110
// configure file name
111-
rollingFileAppender.setFile(tempRollingFileAppender.getFile().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
111+
rollingFileAppender.setFile(tempRollingFileAppender.getFile().replaceAll(ROCKETMQ_LOGS, brokerIdentity.getCanonicalName() + "_" + ROCKETMQ_LOGS));
112112

113113
// configure append
114114
rollingFileAppender.setAppend(true);
@@ -144,7 +144,7 @@ private static RollingFileAppender<ILoggingEvent> configureRollingFileAppender(
144144
FixedWindowRollingPolicy tempRollingPolicy = (FixedWindowRollingPolicy) originalRollingPolicy;
145145
FixedWindowRollingPolicy rollingPolicy = new FixedWindowRollingPolicy();
146146
rollingPolicy.setContext(tempRollingPolicy.getContext());
147-
rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
147+
rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern().replaceAll(ROCKETMQ_LOGS, brokerIdentity.getCanonicalName() + "_" + ROCKETMQ_LOGS));
148148
rollingPolicy.setMaxIndex(tempRollingPolicy.getMaxIndex());
149149
rollingPolicy.setMinIndex(tempRollingPolicy.getMinIndex());
150150
rollingPolicy.setParent(rollingFileAppender);

container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,36 @@ public void testAddAndRemoveMaster() throws Exception {
241241
master.getMessageStore().destroy();
242242
}
243243

244+
@Test
245+
public void testAddAndRemoveDLedgerBroker() throws Exception {
246+
BrokerContainer brokerContainer = new BrokerContainer(
247+
new BrokerContainerConfig(),
248+
new NettyServerConfig(),
249+
new NettyClientConfig());
250+
assertThat(brokerContainer.initialize()).isTrue();
251+
brokerContainer.start();
252+
253+
BrokerConfig dLedgerBrokerConfig = new BrokerConfig();
254+
String baseDir = createBaseDir("unnittest-dLedger").getAbsolutePath();
255+
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
256+
messageStoreConfig.setStorePathRootDir(baseDir);
257+
messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
258+
messageStoreConfig.setEnableDLegerCommitLog(true);
259+
messageStoreConfig.setdLegerSelfId("n0");
260+
messageStoreConfig.setdLegerGroup("group");
261+
messageStoreConfig.setdLegerPeers(String.format("n0-localhost:%d", generatePort(30900, 10000)));
262+
InnerBrokerController dLedger = brokerContainer.addBroker(dLedgerBrokerConfig, messageStoreConfig);
263+
assertThat(dLedger).isNotNull();
264+
dLedger.start();
265+
assertThat(dLedger.isIsolated()).isFalse();
266+
267+
brokerContainer.removeBroker(new BrokerIdentity(dLedgerBrokerConfig.getBrokerClusterName(), dLedgerBrokerConfig.getBrokerName(), Integer.parseInt(messageStoreConfig.getdLegerSelfId().substring(1))));
268+
assertThat(brokerContainer.getMasterBrokers().size()).isEqualTo(0);
269+
270+
brokerContainer.shutdown();
271+
dLedger.getMessageStore().destroy();
272+
}
273+
244274
@Test
245275
public void testAddAndRemoveSlaveSuccess() throws Exception {
246276
BrokerContainer brokerContainer = new BrokerContainer(

tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, RPCHook rpcHoo
206206

207207
@Override public void addBrokerToContainer(String brokerContainerAddr,
208208
String brokerConfig) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
209-
this.mqClientInstance.getMQClientAPIImpl().addBroker(brokerContainerAddr, brokerConfig, timeoutMillis);
209+
this.mqClientInstance.getMQClientAPIImpl().addBroker(brokerContainerAddr, brokerConfig, 20000);
210210
}
211211

212212
@Override public void removeBrokerFromContainer(String brokerContainerAddr, String clusterName, String brokerName,

tools/src/main/java/org/apache/rocketmq/tools/command/container/RemoveBrokerSubCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class RemoveBrokerSubCommand implements SubCommand {
3939
opt.setRequired(true);
4040
options.addOption(opt);
4141

42-
opt = new Option("b", "brokerIdentity", true, "Information to identify a broker: clusterName:brokerName:brokerId");
42+
opt = new Option("b", "brokerIdentity", true, "Information to identify a broker: clusterName:brokerName:brokerId(dLedgerId for dLedger)");
4343
opt.setRequired(true);
4444
options.addOption(opt);
4545

0 commit comments

Comments
 (0)