Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/greetings.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: Little RocketMQ
on: [pull_request_target, issues]
uses: actions/[email protected]
with:
# Token for the repository. Can be passed in using {{ secrets.GITHUB_TOKEN }}
repo-token: ${{ secrets.GITHUB_TOKEN }}
# Comment to post on an individual's first issue
issue-message: 'Make sure your issue is not the existence through the issue search. Follow the issue template, make more details for us. But please be aware that Issue should not be used for FAQs: if you have a question or are simply not sure if it is really an issue or not, please contact [us](https://rocketmq.apache.org/about/contact/) first before you create a new issue.'
# Comment to post on an individual's first pull request
pr-message: 'We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).'
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public RangeRemoteAddressStrategy(String remoteAddr) {
String[] strArray = StringUtils.split(remoteAddr, ".");
if (analysis(strArray, 1) || analysis(strArray, 2) || analysis(strArray, 3)) {
AclUtils.verify(remoteAddr, index - 1);
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (int j = 0; j < index; j++) {
sb.append(strArray[j].trim()).append(".");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,19 @@ public synchronized void seek(MessageQueue messageQueue, long offset) throws MQC
}
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
assignedMessageQueue.setSeekOffset(messageQueue, offset);
clearMessageQueueInCache(messageQueue);

PullTaskImpl oldPullTaskImpl = this.taskTable.get(messageQueue);
if (oldPullTaskImpl != null) {
oldPullTaskImpl.tryInterrupt();
this.taskTable.remove(messageQueue);
}
assignedMessageQueue.setSeekOffset(messageQueue, offset);
if (!this.taskTable.containsKey(messageQueue)) {
PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
this.taskTable.put(messageQueue, pullTask);
this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
}
}
}

Expand Down Expand Up @@ -718,16 +729,29 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
public class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
private Thread currentThread;

public PullTaskImpl(final MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}

public void tryInterrupt() {
setCancelled(true);
if (currentThread == null) {
return;
}
if (!currentThread.isInterrupted()) {
currentThread.interrupt();
}
}

@Override
public void run() {

if (!this.isCancelled()) {

this.currentThread = Thread.currentThread();

if (assignedMessageQueue.isPaused(messageQueue)) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
log.debug("Message Queue: {} has been paused!", messageQueue);
Expand Down Expand Up @@ -803,7 +827,7 @@ public void run() {
} else {
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
}

PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
if (this.isCancelled() || processQueue.isDropped()) {
return;
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/apache/rocketmq/common/UtilAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,11 @@ public static String list2String(List<String> list, String splitor) {
if (list == null || list.size() == 0) {
return null;
}
StringBuffer str = new StringBuffer();
StringBuilder str = new StringBuilder();
for (int i = 0; i < list.size(); i++) {
str.append(list.get(i));
if (i == list.size() - 1) {
continue;
break;
}
str.append(splitor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ public Message(String topic, String tags, String keys, int flag, byte[] body, bo
this.flag = flag;
this.body = body;

if (tags != null && tags.length() > 0)
if (tags != null && tags.length() > 0) {
this.setTags(tags);
}

if (keys != null && keys.length() > 0)
if (keys != null && keys.length() > 0) {
this.setKeys(keys);
}

this.setWaitStoreMsgOK(waitStoreMsgOK);
}
Expand Down Expand Up @@ -127,7 +129,7 @@ public String getKeys() {
}

public void setKeys(Collection<String> keys) {
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (String k : keys) {
sb.append(k);
sb.append(MessageConst.KEY_SEPARATOR);
Expand All @@ -151,8 +153,9 @@ public void setDelayTimeLevel(int level) {

public boolean isWaitStoreMsgOK() {
String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
if (null == result)
if (null == result) {
return true;
}

return Boolean.parseBoolean(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static String wrapNamespaceAndRetry(String namespace, String consumerGrou
return null;
}

return new StringBuffer()
return new StringBuilder()
.append(MixAll.RETRY_GROUP_TOPIC_PREFIX)
.append(wrapNamespace(namespace, consumerGroup))
.toString();
Expand Down
13 changes: 13 additions & 0 deletions common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
import static org.junit.Assert.assertEquals;

public class UtilAllTest {

Expand Down Expand Up @@ -109,6 +113,15 @@ public void testIPv6Check() throws UnknownHostException {
assertThat(UtilAll.ipToIPv6Str(nonInternal.getAddress()).toUpperCase()).isEqualTo("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
}

@Test
public void testList2String() {
List<String> list = Arrays.asList("groupA=DENY", "groupB=PUB|SUB", "groupC=SUB");
String comma = ",";
assertEquals("groupA=DENY,groupB=PUB|SUB,groupC=SUB", UtilAll.list2String(list, comma));
assertEquals(null, UtilAll.list2String(null, comma));
assertEquals(null, UtilAll.list2String(Collections.emptyList(), comma));
}

static class DemoConfig {
private int demoWidth = 0;
private int demoLength = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testCopy() throws Exception {

@Test
public void testReadLines() throws Exception {
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10; i++) {
sb.append("testReadLines").append("\n");
}
Expand All @@ -95,7 +95,7 @@ public void testReadLines() throws Exception {

@Test
public void testToBufferedReader() throws Exception {
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10; i++) {
sb.append("testToBufferedReader").append("\n");
}
Expand Down
6 changes: 4 additions & 2 deletions docs/cn/RocketMQ_Example.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ public class AsyncProducer {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,11 @@

package org.apache.rocketmq.example.benchmark;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
Expand All @@ -42,6 +35,16 @@
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class Consumer {

public static void main(String[] args) throws MQClientException, IOException {
Expand Down Expand Up @@ -71,21 +74,22 @@ public static void main(String[] args) throws MQClientException, IOException {

final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();

final Timer timer = new Timer("BenchmarkTimerThread", true);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());

final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();

timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000);
}, 1000, 1000, TimeUnit.MILLISECONDS);

timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst();
Expand Down Expand Up @@ -116,7 +120,7 @@ public void run() {
e.printStackTrace();
}
}
}, 10000, 10000);
}, 10000, 10000, TimeUnit.MILLISECONDS);

RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,34 @@
*/
package org.apache.rocketmq.example.benchmark;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

public class Producer {

public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
Expand Down Expand Up @@ -73,7 +75,8 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco

final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer();

final Timer timer = new Timer("BenchmarkTimerThread", true);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());

final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();

Expand All @@ -87,17 +90,17 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco
}
}

timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000);
}, 1000, 1000, TimeUnit.MILLISECONDS);

timer.scheduleAtFixedRate(new TimerTask() {
executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
doPrintStats(snapshotList, statsBenchmark, false);
Expand All @@ -112,7 +115,7 @@ public void run() {
e.printStackTrace();
}
}
}, 10000, 10000);
}, 10000, 10000, TimeUnit.MILLISECONDS);

RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
Expand Down Expand Up @@ -224,7 +227,12 @@ public void run() {
try {
sendThreadPool.shutdown();
sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
timer.cancel();
executorService.shutdown();
try {
executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}

if (snapshotList.size() > 1) {
doPrintStats(snapshotList, statsBenchmark, true);
} else {
Expand Down
Loading