0%

基于Redis Cluster的分布式缓存性能篇(五)

redis良好的性能是做缓存的最佳选择之一,其自带的benchmarks性能测试工具的数据也说明。但Cluster集群,不同业务场景下的性能有什么不同吗?耳听为虚,眼见为实,让我们来测试下吧。

集群节点

请参考基于Redis Cluster的分布式缓存部署篇(三)

主机名主机地址端口备注
nodea-700010.128.31.1047000主机A7000端口节点
nodea-700110.128.31.1047001主机A7001端口节点
nodeb-700010.128.31.1087000主机B7000端口节点
nodeb-700110.128.31.1087001主机B7001端口节点
nodec-700010.128.31.1097000主机C7000端口节点
nodec-700110.128.31.1097001主机C7001端口节点

测试脚本

java编写的测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package jcache;

import jcache.clients.jcachecluster.base.JCacheClient;
import jcache.clients.jcachecluster.common.PropertiesConst;
import jcache.clients.jcachecluster.factory.CacheFactorySingle;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class JcacheClusterTPS {
private static String HostAndPort =
"10.128.31.104:7000;" +
"10.128.31.104:7001;" +
"10.128.31.105:7000;" +
"10.128.31.105:7001;" +
"10.128.31.109:7000;" +
"10.128.31.109:7001";

public static void main(String[] args) {
final JCacheClient cluster = getCluster();
final int nThreads = args.length >= 1 ? Integer.parseInt(args[0]) : 8;
final int sendNumOnceTime = args.length >= 2 ? Integer.parseInt(args[1]) : 100;
final int keySize = args.length >= 3 ? Integer.parseInt(args[2]) : 10;
final int messageSize = args.length >= 4 ? Integer.parseInt(args[3]) : 10;
final int times = args.length >= 5 ? Integer.parseInt(args[4]) : 1;
final int setOrGet = args.length >= 6 ? Integer.parseInt(args[5]) : 0;
final String keyParam = args.length >= 7 ? args[6] : "0";
final AtomicLong atomicSuccessNums = new AtomicLong(0);
final List<Long> tpsList = new ArrayList<Long>();
final String msg = buildMessage(messageSize);
final List<String> keys = getKeys(keySize, nThreads * sendNumOnceTime, times);
doRun(cluster, tpsList, atomicSuccessNums, nThreads, times, sendNumOnceTime, keys, msg, keySize, messageSize, setOrGet, keyParam);
}

private static JCacheClient getCluster() {
Properties properties = new Properties();
properties.put(PropertiesConst.Keys.HOST_AND_PORT, HostAndPort);
properties.put(PropertiesConst.Keys.AUTH_KEY, "13F455A8E9DC2BBEBE1BD906C82B3C0A1");
properties.put(PropertiesConst.Keys.NAMESPACE, "weidian-1");
return CacheFactorySingle.createJCacheClient(properties);
}

private static void doRun(final JCacheClient cluster,
final List<Long> tpsList, final AtomicLong atomicSuccessNums,
final int nThreads, final int times, final int sendNumOnceTime,
final List<String> keys, final String msg, final int keySize, final int messageSize,
final int setOrGet, final String keyParam) {
final AtomicLong atomicFailNum = new AtomicLong(0);
for (int time = 0; time < times; time++) {
final Object object = new Object();
synchronized (object) {
final int t = time + 1;
final ExecutorService exec = Executors.newCachedThreadPool();
final long startCurrentTimeMillis = System.currentTimeMillis();
final CyclicBarrier barrier = new CyclicBarrier(nThreads, new Runnable() { // 设置几个线程为一组,当这一组的几个线程都执行完成后,然后执行住线程的
public void run() {
synchronized (object) {
long endCurrentTimeMillis = System.currentTimeMillis();
long sendNums = nThreads * sendNumOnceTime;
long escapedTimeMillis = endCurrentTimeMillis - startCurrentTimeMillis;
long tps = sendNums * 1000 / escapedTimeMillis;
String type = "set";
if (setOrGet != 0) {
type = "get";
}
tpsList.add(tps);
System.out.printf("第 %d 次, 发送完成, 用时 : %d ms, " + "线程大小 : %d , " + "key大小 : %d , " + "msg大小 : %d , " + "发送数量 : %d , " + "成功数量 : %d , " + "失败数量 : %d , " + "统计方式 : %s , " + "TPS : %d !!!",
t, escapedTimeMillis, nThreads, keySize, messageSize, sendNums, atomicSuccessNums.intValue(), atomicFailNum.intValue(), type, tps);
exec.shutdown();
object.notify();
System.out.println();
}
}
});
for (int i = 0; i < nThreads; i++) {

final String finalI = "i" + i;
exec.execute(new Runnable() {
public void run() {
try {
for (int j = 0; j < sendNumOnceTime; j++) {
if (setOrGet == 0) {
String key = "ke" + keyParam + finalI + "j" + j;
try {
String resp = cluster.set(key, msg);
if (!"OK".equals(resp)) {
atomicFailNum.incrementAndGet();
} else {
atomicSuccessNums.incrementAndGet();
}
} catch (Exception e) {
atomicFailNum.incrementAndGet();
}
} else {
cluster.get(keys.get(atomicSuccessNums.intValue()));
}
}
barrier.await();
} catch (Exception e) {
try {
barrier.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (BrokenBarrierException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
}
});
}
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
long sum = 0;
for (Long tps : tpsList) {
sum += tps;
}
System.out.printf("全部发送完成, 平均TPS : %d !!!", sum / tpsList.size());
}

private static String buildMessage(final int messageSize) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < messageSize; i += 8) {
sb.append("hello baby");
}
return sb.toString();
}

private static List<String> getKeys(int keySize, int keys, int times) {
System.out.println(">>>>>>>>>>>>>>正在生成Key>>>>>>>>>>>>>>");
List<String> keysList = new ArrayList<String>();
// for (int i = 0; i < keys * times; i++) {
// keysList.add(getUId(keySize, i));
// }
System.out.println(">>>>>>>>>>>>>>生成成功Key>>>>>>>>>>>>>>");
return keysList;
}

private static String getUId(int keySize, int i) {
return String.format("%0" + keySize + "d", i);
}

private static String getUUId() {
return UUID.randomUUID().toString();
}

}

用例执行命令

1
java -cp jedis-2.9.0.jar:commons-pool2-2.3.jar: RedisClusterDataCorrect 100 10000 10 10 1 0

用例参数说明

  • 第一个参数是线程数量
  • 第二个参数是每个线程执行操作次数
  • 第三个参数是key的大小(字节)
  • 第四个参数是val的大小(字节)
  • 第五个参数是运行次数
  • 第六个参数是操作类型,0标示set,1标示get

场景测试

客户端线程数量

分别设置Key和Value大小固定为10个字节,写操作1000000次,线程数逐步增加,统计每次发送数据的TPS,如下:

序号Key(字节)Value(字节)执行次数客户端线程数集群TPS
11010100000059704
2101010000001019534
3101010000002036406
4101010000005042634
51010100000010043050
61010100000020044086
71010100000050042902
810101000000100043046
910101000000200042542
1010101000000500042858

Alt text

缓存Value长度

分别设置Key固定10个字节,客户端线程数固定100,进行写操作1000000次,缓存的value长度逐步增加,统计每次发送数据的TPS,如下:

序号Key(字节)Value(字节)执行次数客户端线程数集群TPS
11010100000010047777
21020100000010046213
31050100000010045209
410100100000010043539
510200100000010040692
610500100000010032023
7101000100000010030835
8102000100000010015550
9105000100000010013956
10101000010000001009608

Alt text

读取数据性能

客户端读取缓存数据,影响TPS的因素主要就是key的长度。因此只需要保持缓存value固定10个字节,设置客户端线程数为100,在逐步增加缓存Key大小的同时,客户端多线程读取当前测试用例的相同key,如下:

序号Key(字节)Value(字节)执行次数客户端线程数集群TPS
11010100000010014468
22010100000010014753
33010100000010015874
44010100000010014588
55010100000010016880
66010100000010016283
710010100000010014543
820010100000010013992
950010100000010013283
10100010100000010012062

Alt text

Swap性能开启

设置并发300线程,每个线程执行20972次,每个key设置10字节,每个value设置512字节,如下:

Alt text

Swap性能关闭

设置并发300线程,每个线程执行20972次,每个key设置10字节,每个value设置512字节,如下:

Alt text

持久化策略性能

everysec策略

Alt text
平均的TPS是22855。

always策略

Alt text
平均的TPS是11604

no策略

Alt text
平均的TPS是27383。注意:通常情况都不会配置“no”持久化策略。

总结

读写性能

  1. 在一定范围内,提高客户端线程数可以提高集群写数据的性能,超过范围再提供线程数则会导致集群写数据性能急速下降。
  2. 缓存Value长度对redis写数据性能影响很大,它们呈现相反的变化趋势;增加Value长度则集群写数据的TPS下降,因此redis合适缓存小数据。

Swap性能

  1. 关闭swap内存,集群整体性能有所提高;在高并发环境下,关闭swap性能会提高,也是以处理能力下降为代价。
  2. 开启swap内存,集群的整体内存使用量会增加,当服务器有足够的内存,建议关闭swap以提高集群的效率。

持久化策略

  1. always策略每次持久化,性能最低。建议数据安全性要求很到的业务场景使用,作为缓存建议不使用。
  2. everysec策略每秒持久化,测试数据看是always性能的两倍以上。建议采用everysec持久化策略,即保证了数据稳定性,又兼顾缓存的效率。
  3. no策略由操作系统置换页时持久化,性能最高,例如linux是30秒一次。no策略数据安全性较差,请在合适的业务场景下选择。