diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index b5c749d3bb8..85c96056209 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -204,6 +204,7 @@ protected void registerRemotingServer(RemotingServer remotingServer) { remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManagerActivity, this.updateOffsetExecutor); remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, consumerManagerActivity, this.updateOffsetExecutor); remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, consumerManagerActivity, this.updateOffsetExecutor); + remotingServer.registerProcessor(RequestCode.GET_CONSUMER_CONNECTION_LIST, consumerManagerActivity, this.updateOffsetExecutor); remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManagerActivity, this.defaultExecutor); remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManagerActivity, this.defaultExecutor); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java index 1c1993ff0a6..e9d42afc2c9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java @@ -17,17 +17,25 @@ package org.apache.rocketmq.proxy.remoting.activity; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.time.Duration; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader; @@ -62,6 +70,9 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom case RequestCode.GET_EARLIEST_MSG_STORETIME: { return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); } + case RequestCode.GET_CONSUMER_CONNECTION_LIST: { + return getConsumerConnectionList(ctx, request, context); + } default: break; } @@ -81,6 +92,43 @@ protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, Remo return response; } + protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerConnectionListRequestHeader.class); + GetConsumerConnectionListRequestHeader header = (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); + ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup()); + if (consumerGroupInfo != null) { + ConsumerConnection bodydata = new ConsumerConnection(); + bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere()); + bodydata.setConsumeType(consumerGroupInfo.getConsumeType()); + bodydata.setMessageModel(consumerGroupInfo.getMessageModel()); + bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable()); + + Iterator> it = consumerGroupInfo.getChannelInfoTable().entrySet().iterator(); + while (it.hasNext()) { + ClientChannelInfo info = it.next().getValue(); + Connection connection = new Connection(); + connection.setClientId(info.getClientId()); + connection.setLanguage(info.getLanguage()); + connection.setVersion(info.getVersion()); + connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel())); + + bodydata.getConnectionSet().add(connection); + } + + byte[] body = bodydata.encode(); + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + } + + response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); + response.setRemark("the consumer group[" + header.getConsumerGroup() + "] not online"); + return response; + } + protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(null);