Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 91 additions & 13 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
*/
package org.apache.rocketmq.broker;

import com.google.common.collect.Maps;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not import a new package.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -33,6 +38,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
Expand All @@ -42,6 +49,7 @@
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
Expand Down Expand Up @@ -79,23 +87,30 @@
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
Expand Down Expand Up @@ -159,6 +174,7 @@ public class BrokerController {
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ConcurrentMap<String, String> brokerName2AddrMap = Maps.newConcurrentMap();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
Expand Down Expand Up @@ -277,9 +293,9 @@ public boolean initialize() throws CloneNotSupportedException {

if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
DefaultMessageStore messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
messageStore.registerCleanFileHook(topicConfigManager.getLogicalQueueCleanHook());
this.messageStore = messageStore;
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
Expand Down Expand Up @@ -467,6 +483,14 @@ public void run() {
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.refreshBrokerNameMapping();
} catch (Exception e) {
log.error("ScheduledTask examineBrokerClusterInfo exception", e);
}
}, 10, 10, TimeUnit.SECONDS);

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
Expand Down Expand Up @@ -593,6 +617,18 @@ private void initialRpcHooks() {
}
}

private void refreshBrokerNameMapping() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
ClusterInfo brokerClusterInfo = this.brokerOuterAPI.getBrokerClusterInfo();
brokerClusterInfo.getBrokerAddrTable().forEach((brokerName, data) -> {
String masterBrokerAddr = data.getBrokerAddrs().get(MixAll.MASTER_ID);
this.brokerName2AddrMap.put(brokerName, masterBrokerAddr);
});
}

public String getBrokerAddrByName(String brokerName) {
return this.brokerName2AddrMap.get(brokerName);
}

public void registerProcessor() {
/**
* SendMessageProcessor
Expand Down Expand Up @@ -1009,20 +1045,54 @@ public void run() {
}

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
TopicConfig registerTopicConfig = topicConfig;
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion);
}

public synchronized void registerIncrementBrokerData(List<TopicConfig> topicConfigList, DataVersion dataVersion) {
if (topicConfigList == null || topicConfigList.isEmpty()) {
return;
}

ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);

ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
.map(topicConfig -> {
TopicConfig registerTopicConfig;
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(),
topicConfig.getReadQueueNums(),
topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
} else {
registerTopicConfig = new TopicConfig(topicConfig);
}
return registerTopicConfig;
})
.collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);

String brokerName = this.brokerConfig.getBrokerName();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigList.stream()
.map(TopicConfig::getTopicName)
.map(topicName -> Optional.ofNullable(this.topicConfigManager.selectLogicalQueuesInfo(topicName))
.map(info -> {
info.readLock().lock();
try {
return new AbstractMap.SimpleImmutableEntry<>(topicName, new LogicalQueuesInfoInBroker(info, data -> Objects.equals(data.getBrokerName(), brokerName)));
} finally {
info.readLock().unlock();
}
})
.orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!logicalQueuesInfoMap.isEmpty()) {
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
}

doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}

Expand All @@ -1032,13 +1102,21 @@ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boole
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = Maps.newHashMapWithExpectedSize(topicConfigWrapper.getTopicConfigTable().size());
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
String topicName = topicConfig.getTopicName();
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
new TopicConfig(topicName, topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
topicConfigTable.put(topicName, tmp);
LogicalQueuesInfoInBroker logicalQueuesInfo = this.topicConfigManager.selectLogicalQueuesInfo(topicName);
if (logicalQueuesInfo != null) {
String brokerName = this.brokerConfig.getBrokerName();
logicalQueuesInfoMap.put(topicName, new LogicalQueuesInfoInBroker(logicalQueuesInfo, data -> Objects.equals(data.getBrokerName(), brokerName)));
}
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
topicConfigWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
}

if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.broker.domain;

import com.alibaba.fastjson.parser.ParserConfig;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.rocketmq.common.fastjson.GenericMapSuperclassDeserializer;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;

import static java.util.Optional.ofNullable;

public class LogicalQueuesInfoInBroker extends LogicalQueuesInfo {
private final ConcurrentMap<Integer, ConcurrentNavigableMap<Long, LogicalQueueRouteData>> queueId2LogicalQueueMap = Maps.newConcurrentMap();

public LogicalQueuesInfoInBroker() {
}

public LogicalQueuesInfoInBroker(LogicalQueuesInfoInBroker other) {
this(other, null);
}

// deep copy
public LogicalQueuesInfoInBroker(LogicalQueuesInfoInBroker other, Predicate<LogicalQueueRouteData> predicate) {
other.readLock().lock();
try {
for (Entry<Integer, List<LogicalQueueRouteData>> entry : other.entrySet()) {
Stream<LogicalQueueRouteData> stream = entry.getValue().stream();
if (predicate != null) {
stream = stream.filter(predicate);
}
this.put(entry.getKey(), stream.map(LogicalQueueRouteData::new).collect(Collectors.toList()));
}
} finally {
other.readLock().unlock();
}
}

public void updateQueueRouteDataByQueueId(int queueId, LogicalQueueRouteData queueRouteData) {
if (queueRouteData == null) {
return;
}
ConcurrentHashMapUtil.computeIfAbsent(queueId2LogicalQueueMap, queueId, k -> new ConcurrentSkipListMap<>()).put(queueRouteData.getOffsetDelta(), queueRouteData);
}

/**
* find logical queue route data for message queues owned by this broker
*/
public LogicalQueueRouteData queryQueueRouteDataByQueueId(int queueId, long offset) {
ConcurrentNavigableMap<Long, LogicalQueueRouteData> m = this.queueId2LogicalQueueMap.get(queueId);
if (m == null || m.isEmpty()) {
return null;
}
Entry<Long, LogicalQueueRouteData> entry = m.floorEntry(offset);
if (entry == null) {
return null;
}
return entry.getValue();
}

public void deleteQueueRouteData(LogicalQueueRouteData logicalQueueRouteData) {
ConcurrentNavigableMap<Long, LogicalQueueRouteData> m = this.queueId2LogicalQueueMap.get(logicalQueueRouteData.getQueueId());
if (m != null) {
m.remove(logicalQueueRouteData.getOffsetDelta(), logicalQueueRouteData);
}
}

public LogicalQueueRouteData nextAvailableLogicalRouteData(LogicalQueueRouteData queueRouteData,
Predicate<LogicalQueueRouteData> predicate) {
this.readLock().lock();
try {
List<LogicalQueueRouteData> queueRouteDataList = ofNullable(this.get(queueRouteData.getLogicalQueueIndex())).orElse(Collections.emptyList());
int idx = Collections.binarySearch(queueRouteDataList, queueRouteData);
if (idx >= 0) {
for (int i = idx + 1, size = queueRouteDataList.size(); i < size; i++) {
LogicalQueueRouteData tmp = queueRouteDataList.get(i);
if (predicate.test(tmp)) {
return tmp;
}
}
}
} finally {
this.readLock().unlock();
}
return null;
}

static {
// workaround https://github.com/alibaba/fastjson/issues/3730
ParserConfig.getGlobalInstance().putDeserializer(LogicalQueuesInfoInBroker.class, GenericMapSuperclassDeserializer.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
Expand All @@ -48,6 +49,7 @@
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
Expand Down Expand Up @@ -432,4 +434,23 @@ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final

throw new MQBrokerException(response.getCode(), response.getRemark());
}

public ClusterInfo getBrokerClusterInfo() throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, 3_000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
}
default:
break;
}

throw new MQBrokerException(response.getCode(), response.getRemark());
}

public void forwardRequest(String brokerAddr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public long getMaxOffsetInQueue(String topic, int queueId) {
return next.getMaxOffsetInQueue(topic, queueId);
}

@Override
public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
return next.getMaxOffsetInQueue(topic, queueId, committed);
}

@Override
public long getMinOffsetInQueue(String topic, int queueId) {
return next.getMinOffsetInQueue(topic, queueId);
Expand Down
Loading