Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.hook;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;

public class EndTransactionContext {
private String producerGroup;
private Message message;
private String brokerAddr;
private String msgId;
private String transactionId;
private LocalTransactionState transactionState;
private boolean fromTransactionCheck;

public String getProducerGroup() {
return producerGroup;
}

public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}

public Message getMessage() {
return message;
}

public void setMessage(Message message) {
this.message = message;
}

public String getBrokerAddr() {
return brokerAddr;
}

public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}

public String getMsgId() {
return msgId;
}

public void setMsgId(String msgId) {
this.msgId = msgId;
}

public String getTransactionId() {
return transactionId;
}

public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}

public LocalTransactionState getTransactionState() {
return transactionState;
}

public void setTransactionState(LocalTransactionState transactionState) {
this.transactionState = transactionState;
}

public boolean isFromTransactionCheck() {
return fromTransactionCheck;
}

public void setFromTransactionCheck(boolean fromTransactionCheck) {
this.fromTransactionCheck = fromTransactionCheck;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.hook;

public interface EndTransactionHook {
String hookName();

void endTransaction(final EndTransactionContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.EndTransactionContext;
import org.apache.rocketmq.client.hook.EndTransactionHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
Expand Down Expand Up @@ -101,6 +103,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
Expand Down Expand Up @@ -171,6 +174,11 @@ public void registerSendMessageHook(final SendMessageHook hook) {
log.info("register sendMessage Hook, {}", hook.hookName());
}

public void registerEndTransactionHook(final EndTransactionHook hook) {
this.endTransactionHookList.add(hook);
log.info("register endTransaction Hook, {}", hook.hookName());
}

public void start() throws MQClientException {
this.start(true);
}
Expand Down Expand Up @@ -386,6 +394,7 @@ private void processTransactionState(
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);

try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
Expand Down Expand Up @@ -967,6 +976,36 @@ public void executeSendMessageHookAfter(final SendMessageContext context) {
}
}

public boolean hasEndTransactionHook() {
return !this.endTransactionHookList.isEmpty();
}

public void executeEndTransactionHook(final EndTransactionContext context) {
if (!this.endTransactionHookList.isEmpty()) {
for (EndTransactionHook hook : this.endTransactionHookList) {
try {
hook.endTransaction(context);
} catch (Throwable e) {
log.warn("failed to executeEndTransactionHook", e);
}
}
}
}

public void doExecuteEndTransactionHook(Message msg, String msgId, String brokerAddr, LocalTransactionState state,
boolean fromTransactionCheck) {
if (hasEndTransactionHook()) {
EndTransactionContext context = new EndTransactionContext();
context.setProducerGroup(defaultMQProducer.getProducerGroup());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMsgId(msgId);
context.setTransactionId(msg.getTransactionId());
context.setTransactionState(state);
context.setFromTransactionCheck(fromTransactionCheck);
executeEndTransactionHook(context);
}
}
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
Expand Down Expand Up @@ -1266,7 +1305,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
}

try {
this.endTransaction(sendResult, localTransactionState, localException);
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
Expand All @@ -1290,6 +1329,7 @@ public SendResult send(
}

public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
Expand Down Expand Up @@ -1318,6 +1358,7 @@ public void endTransaction(
break;
}

doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
Expand Down Expand Up @@ -167,6 +168,8 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean en
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
Expand Down Expand Up @@ -252,6 +255,8 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
Expand Down Expand Up @@ -916,24 +921,24 @@ public SendResult send(Collection<Message> msgs, MessageQueue messageQueue,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}

@Override
public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
}

@Override
public void send(Collection<Message> msgs, SendCallback sendCallback,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout);
}

@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback);
}

@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public TransactionMQProducer(final String namespace, final String producerGroup,
super(namespace, producerGroup, rpcHook);
}

public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
super(namespace, producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
}

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;

Expand All @@ -32,7 +33,9 @@ public class TraceBean {
private int retryTimes;
private int bodyLength;
private MessageType msgType;

private LocalTransactionState transactionState;
private String transactionId;
private boolean fromTransactionCheck;

public MessageType getMsgType() {
return msgType;
Expand Down Expand Up @@ -141,4 +144,28 @@ public int getBodyLength() {
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}

public LocalTransactionState getTransactionState() {
return transactionState;
}

public void setTransactionState(LocalTransactionState transactionState) {
this.transactionState = transactionState;
}

public String getTransactionId() {
return transactionId;
}

public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}

public boolean isFromTransactionCheck() {
return fromTransactionCheck;
}

public void setFromTransactionCheck(boolean fromTransactionCheck) {
this.fromTransactionCheck = fromTransactionCheck;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageType;

import java.util.ArrayList;
Expand Down Expand Up @@ -109,6 +110,27 @@ public static List<TraceContext> decoderFromTraceDataString(String traceData) {
subAfterContext.setGroupName(line[8]);
}
resList.add(subAfterContext);
} else if (line[0].equals(TraceType.EndTransaction.name())) {
TraceContext endTransactionContext = new TraceContext();
endTransactionContext.setTraceType(TraceType.EndTransaction);
endTransactionContext.setTimeStamp(Long.parseLong(line[1]));
endTransactionContext.setRegionId(line[2]);
endTransactionContext.setGroupName(line[3]);
TraceBean bean = new TraceBean();
bean.setTopic(line[4]);
bean.setMsgId(line[5]);
bean.setTags(line[6]);
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
bean.setClientHost(line[10]);
bean.setTransactionId(line[11]);
bean.setTransactionState(LocalTransactionState.valueOf(line[12]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));

endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
endTransactionContext.getTraceBeans().add(bean);
resList.add(endTransactionContext);
}
}
return resList;
Expand Down Expand Up @@ -173,9 +195,26 @@ public static TraceTransferBean encoderFromContextBean(TraceContext ctx) {
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);

}
}
case EndTransaction: {
TraceBean bean = ctx.getTraceBeans().get(0);
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
}
break;
default:
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ public enum TraceType {
Pub,
SubBefore,
SubAfter,
EndTransaction,
}
Loading