Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/pulsar-design.docx
Binary file not shown.
Binary file added docs/~$lsar-design.docx
Binary file not shown.
70 changes: 70 additions & 0 deletions tools/MQTest/MQTestResult.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
## Test Environment

Five machines with ubuntu 18.04

- CPU: Intel Xeon Gold 6152(22 cores, 44 threads)

- Memory:32G * 4

## Test Result

### RocketMQ

#### 1.one broker

| topicNumber | CPU (%) | TPS Produce | TPS Consume | disk util(%) | delay(ms) |
| ----------- | ------- | ----------- | ----------- | ------------ | --------- |
| 0 | 2.11 | 0 | 0 | 0 | 0 |
| 100 | 625 | 76,245.38 | 76,233.48 | 82.84 | 10 |
| 1000 | 780 | 73,644.00 | 72,582.00 | 91.75 | 30 |
| 3000 | 780 | 74,737.85 | 61,315.64 | 93.01 | 101 |
| 5000 | 801 | 73,919.80 | 52,581.76 | 96.89 | 167 |
| 6000 | 789 | 71,059.79 | 44,962.30 | 92.72 | 197 |
| 7000 | 823 | 72,114.44 | 43,316.24 | 93.33 | 221 |

- **Exception occurs when topic number becomes 8000**
- **The Number of threads used to produce and consume message is 100 and the payload is 1Kb**
- **Average data over a period of one minute**

#### 2.three brokers

| topicNumber | TPS Producer | TPS Consumer | disk util(%) | delay(ms) |
| ----------- | :----------- | ------------ | ------------ | ------------- |
| 100 | 30,985.00 | 30,845.00 | 97.78 | 29 |
| 1000 | 30,529.19 | 29,950.50 | 99.56 | 93 |
| 3000 | 30,477.55 | 27,195.28 | 99.92 | 230 |
| 5000 | 30,477.55 | 27,195.28 | 100 | 339 |
| 7000 | 30,902.93 | 11,646.71 | 100 | up to 170000 |
| 8000 | 31,491.40 | 9,426.19 | 99.99 | up to 140000 |
| 9000 | 31,994.80 | 7,031.48 | 100 | up to 350000 |

- **When the topic number comes to 10000, there is an exception**
- **The TPS is the average data of three brokers**



### Pulsar

#### 1. one broker

| topicNumber | TPS Produce | TPS Consume | disk util(%) |
| ----------- | ----------- | ----------- | ------------ |
| 4000 | 31977 | 31691 | >=97 |
| 6000 | 29,553.00 | 28,473.00 | >=97 |
| 8000 | 29680.9 | 29931.55 | >=97 |
| 10000 | 0 | 0 | 0 |

#### 2. three brokers

| topicNumber | CPU (%) | TPS Produce(average) | TPS Consume(average) | disk util(%) | Mem(G) |
| -------------- | ------- | -------------------- | -------------------- | -------------- | ------ |
| 1000(10*100) | 780 | 64,603.00 | 50,615.29 | >=97 | 22 |
| 5000(10*500) | 801 | 36134.42 | 32257.58 | >=97 | 38 |
| 10000(10*1000) | | 31378.69 | 32,078 | >=97 | 46.7 |
| 10000(20*500) | | 59832.1 | 59,773 | >=97 | 47.1 |
| 30000(100*300) | | 45,669.31 | 46,779.04 | >=97 | 59.2 |

- **Disks generally maintain a write rate of 89% and a read rate of 9%. Writing occupies most of the IO, which may cause timeout.**
- **Tps is 0 when topics are created in the early stage, and the time is about 6-8 minutes**

- **Delays are maintained at 100ms-300ms**
26 changes: 26 additions & 0 deletions tools/MQTest/RocketMQTest/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Compiled class file
*.class

# Log file
*.log

# BlueJ files
*.ctxt

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

# Store files for rmq
docker-rocketmq/rmq/rmq/store/*
66 changes: 66 additions & 0 deletions tools/MQTest/RocketMQTest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# RocketMQ Test

## 1. Preparation

Before starting, the following information should be prepared:

- Java 1.8 or above
- docker-compose 1.24.0 or above

Move to this directory and give scripts execution permission
```SHELL
cd ~/RocketMQTest
chmod +x ./scripts/*.sh
```

## 2. Dockers for RocketMQ
Docker files are from:

[github.com/foxiswho/docker-rocketmq](github.com/foxiswho/docker-rocketmq)

- Start one nameserver, one broker and one console with:

```shell
./scripts/start.sh
```

- If only need to setup one broker on the server,use:
```SHELL
./scripts/start_broker.sh
```

- Move to the corresponding directory and use the following command to end dockers:

```
docker-compose down
```

- Open dashboard in browser:

```
localhost:8180
```



## 3. Producer and Consumer Client

- Firstly, compile and package the target files:

```shell
./scripts/build.sh
```

- Run producer client with parameters:

```sh
./scripts/run_producer.sh $0 $1
```

- Run consumer client with parameters:

```shell
./scripts/run_consumer.sh $0 $1
```

where $0 stands for the thread numbers and $1 stands for the topic numbers.
46 changes: 46 additions & 0 deletions tools/MQTest/RocketMQTest/client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.rmq</groupId>
<artifactId>test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>jar-with-dependencies</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.demo;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener. ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Arrays;



public class ConsumerRunnable implements Runnable{

private DefaultMQPushConsumer consumer = null;

private String url = null;
private String topicName = null;
private String tagName = null;
private int topicNumber = 0;

public ConsumerRunnable(String url, String topicName, String tagName, int topicNumber) {
this.url = url;
this.topicName = topicName;
this.tagName = tagName;
this.topicNumber = topicNumber;
this.consumer = new DefaultMQPushConsumer("consumerGroup-"+topicName);
this.consumer.setNamesrvAddr(url);

}


@Override
public void run(){
try {
for(int i = 0; i < topicNumber; i++){
consumer.subscribe(topicName+"-"+Integer.toString(i), tagName);
}

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.printf("%s Consumer Started.%n", Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.demo;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.cli.*;


public class ConsumerThreadPool {
private Config config;

public ConsumerThreadPool(String[] args){
this.config = new Config();
this.getOptions(args);
}

private void getOptions(String[] args){
Options options = new Options();
options.addOption("t",true,"topic number");
options.addOption("n",true,"thread number");
options.addOption("u",true,"url of namesrv");


CommandLineParser parser = new DefaultParser();
try{
CommandLine cmd = parser.parse(options,args);
config.topicNumber = Integer.parseInt(cmd.getOptionValue("t","100"));
config.threadNumber = Integer.parseInt(cmd.getOptionValue("n","100"));
config.url = cmd.getOptionValue("u","localhost:9876");
}catch (ParseException e){
e.printStackTrace();
System.err.println("-- fail to get options --");
HelpFormatter hf = new HelpFormatter();
hf.printHelp("Consumer",options,true);
}

}
public static void main(String[] args) {
ConsumerThreadPool ctp = new ConsumerThreadPool(args);
int threadNumber = ctp.config.threadNumber;
int topicNumberPerThread = ctp.config.topicNumber/ctp.config.threadNumber;
String url = ctp.config.url;

String topicName = "topic";
String tagName = "*";

ExecutorService pool = Executors.newCachedThreadPool();
for (int threadIndex = 0; threadIndex < threadNumber; threadIndex++) {
pool.submit(new ConsumerRunnable(url, topicName + Integer.toString(threadIndex), tagName, topicNumberPerThread));
}
pool.shutdown();
}

private class Config{
int topicNumber;
int threadNumber;
String url;
}
}

Loading