0%

基于Redis Cluster的分布式缓存算法篇(四)

说明下这里的算法指的是内存淘汰算法。redis作为应用级缓存使用,在内存超过限制时,按照配置的策略,淘汰掉相应的kv,使得内存可以继续留有足够的空间保存新的数据。

算法介绍

redis 提供 6种数据淘汰策略:

  • noeviction : 默认,不淘汰
  • volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰
  • volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰
  • volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰
  • allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰
  • allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰

以上6中淘汰策略已经能够满足大多数应用场景,如何选择淘汰策略?根据实际业务情况进行选择。

源码分析

其缓存管理功能,由redis.c文件中的freeMemoryIfNeeded函数实现。如果maxmemory被设置,则在每次进行命令执行之前,该函数均被调用,用以判断是否有足够内存可用,释放内存或返回错误。如果没有找到足够多的内存,程序主逻辑将会阻止设置了REDIS_COM_DENYOOM flag的命令执行,对其返回command not allowed when used memory > ‘maxmemory’的错误消息。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
int freeMemoryIfNeeded(void) {
size_t mem_used, mem_tofree, mem_freed;
int slaves = listLength(server.slaves);

/* Remove the size of slaves output buffers and AOF buffer from the
* count of used memory. */
// 计算占用内存大小时,并不计算slave output buffer和aof buffer,因此maxmemory应该比实际内存小,为这两个buffer留足空间。
mem_used = zmalloc_used_memory();
if (slaves) {
listIter li;
listNode *ln;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = listNodeValue(ln);
unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
if (obuf_bytes > mem_used)
mem_used = 0;
else
mem_used -= obuf_bytes;
}
}
if (server.appendonly) {
mem_used -= sdslen(server.aofbuf);
mem_used -= sdslen(server.bgrewritebuf);
}

/* Check if we are over the memory limit. */
if (mem_used <= server.maxmemory) return REDIS_OK;

if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
return REDIS_ERR; /* We need to free memory, but policy forbids. */

/* Compute how much memory we need to free. */
mem_tofree = mem_used - server.maxmemory;
mem_freed = 0;
while (mem_freed < mem_tofree) {
int j, k, keys_freed = 0;

for (j = 0; j < server.dbnum; j++) {
long bestval = 0; /* just to prevent warning */
sds bestkey = NULL;
struct dictEntry *de;
redisDb *db = server.db+j;
dict *dict;

if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM)
{
dict = server.db[j].dict;
} else {
dict = server.db[j].expires;
}
if (dictSize(dict) == 0) continue;

/* volatile-random and allkeys-random policy */
if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM)
{
de = dictGetRandomKey(dict);
bestkey = dictGetEntryKey(de);
}//如果是random delete,则从dict中随机选一个key

/* volatile-lru and allkeys-lru policy */
else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
{
for (k = 0; k < server.maxmemory_samples; k++) {
sds thiskey;
long thisval;
robj *o;

de = dictGetRandomKey(dict);
thiskey = dictGetEntryKey(de);
/* When policy is volatile-lru we need an additonal lookup
* to locate the real key, as dict is set to db->expires. */
if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
de = dictFind(db->dict, thiskey); //因为dict->expires维护的数据结构里并没有记录该key的最后访问时间
o = dictGetEntryVal(de);
thisval = estimateObjectIdleTime(o);

/* Higher idle time is better candidate for deletion */
if (bestkey == NULL || thisval > bestval) {
bestkey = thiskey;
bestval = thisval;
}
}//为了减少运算量,redis的lru算法和expire淘汰算法一样,都是非最优解,lru算法是在相应的dict中,选择maxmemory_samples(默认设置是3)份key,挑选其中lru的,进行淘汰
}

/* volatile-ttl */
else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {
for (k = 0; k < server.maxmemory_samples; k++) {
sds thiskey;
long thisval;

de = dictGetRandomKey(dict);
thiskey = dictGetEntryKey(de);
thisval = (long) dictGetEntryVal(de);

/* Expire sooner (minor expire unix timestamp) is better
* candidate for deletion */
if (bestkey == NULL || thisval < bestval) {
bestkey = thiskey;
bestval = thisval;
}
}//注意ttl实现和上边一样,都是挑选出maxmemory_samples份进行挑选
}

/* Finally remove the selected key. */
if (bestkey) {
long long delta;

robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj); //将del命令扩散给slaves
/* We compute the amount of memory freed by dbDelete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
dbDelete(db,keyobj);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
server.stat_evictedkeys++;
decrRefCount(keyobj);
keys_freed++;

/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop. */
if (slaves) flushSlavesOutputBuffers();
}
}//在所有的db中遍历一遍,然后判断删除的key释放的空间是否足够
if (!keys_freed) return REDIS_ERR; /* nothing to free... */
}
return REDIS_OK;
}

算法源码解析

淘汰算法redis.h的相关定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* The actual Redis Object */  
#define REDIS_LRU_BITS 24
#define REDIS_LRU_CLOCK_MAX ((1<<REDIS_LRU_BITS)-1) /* Max value of obj->lru */
#define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
typedef struct redisObject {
unsigned type:4; //存放的对象类型
unsigned encoding:4; //内容编码
//与server.lruclock的时间差值
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
int refcount; //引用计数算法使用的引用计数器
void *ptr; //数据指针
} robj;

/* Macro used to obtain the current LRU clock.
* If the current resolution is lower than the frequency we refresh the
* LRU clock (as it should be in production servers) we return the
* precomputed value, otherwise we need to resort to a function call. */
#define LRU_CLOCK() ((1000/server.hz <= REDIS_LRU_CLOCK_RESOLUTION) ? server.lruclock : getLRUClock())
1
2
3
4
5
unsigned int getLRUClock(void) {  
return (mstime()/REDIS_LRU_CLOCK_RESOLUTION) & REDIS_LRU_CLOCK_MAX;
}

unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */

以最简单的all-keys-lru淘汰策略为例,该策略随机选出16个,通过过期时间对pool内存数据进行淘汰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* volatile-lru and allkeys-lru policy */  
else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
{
struct evictionPoolEntry *pool = db->eviction_pool;

while(bestkey == NULL) {
evictionPoolPopulate(dict, db->dict, db->eviction_pool);
/* Go backward from best to worst element to evict. */
for (k = REDIS_EVICTION_POOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
de = dictFind(dict,pool[k].key);

/* Remove the entry from the pool. */
sdsfree(pool[k].key);
/* Shift all elements on its right to left. */
memmove(pool+k,pool+k+1,
sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));
/* Clear the element on the right which is empty
* since we shifted one position to the left. */
pool[REDIS_EVICTION_POOL_SIZE-1].key = NULL;
pool[REDIS_EVICTION_POOL_SIZE-1].idle = 0;

/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... */
continue;
}
}
}
}

通过lru淘汰返回lru时间,最后与当前lru比较

1
2
3
4
5
6
7
8
9
10
11
/* Given an object returns the min number of milliseconds the object was never 
* requested, using an approximated LRU algorithm. */
unsigned long long estimateObjectIdleTime(robj *o) {
unsigned long long lruclock = LRU_CLOCK();
if (lruclock >= o->lru) {
return (lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;
} else {
return (lruclock + (REDIS_LRU_CLOCK_MAX - o->lru)) *
REDIS_LRU_CLOCK_RESOLUTION;
}
}

测试验证

要将redis最终应用到生产环境中,稳定性、可靠性测试更为重要,下面将对淘汰算法相关进行测试。对于性能的测试另有文章,请关注之后的性能篇。

说明

基于Redis Cluster的分布式缓存部署篇(三)中设置maxmemory为100M,保证服务不会在高并发情况下宕机。最后设置maxmemory-policy要测试的淘汰策略。

用例代码

java编写的测试程序,没有讲究代码结构,请见谅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package jcache;

import jcache.clients.jcachecluster.base.JCacheClient;
import jcache.clients.jcachecluster.common.PropertiesConst;
import jcache.clients.jcachecluster.factory.CacheFactorySingle;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class JcacheClusterTPS {
private static String HostAndPort =
"10.128.31.104:7000;" +
"10.128.31.104:7001;" +
"10.128.31.105:7000;" +
"10.128.31.105:7001;" +
"10.128.31.109:7000;" +
"10.128.31.109:7001";

public static void main(String[] args) {
final JCacheClient cluster = getCluster();
final int nThreads = args.length >= 1 ? Integer.parseInt(args[0]) : 8;
final int sendNumOnceTime = args.length >= 2 ? Integer.parseInt(args[1]) : 100;
final int keySize = args.length >= 3 ? Integer.parseInt(args[2]) : 10;
final int messageSize = args.length >= 4 ? Integer.parseInt(args[3]) : 10;
final int times = args.length >= 5 ? Integer.parseInt(args[4]) : 1;
final int setOrGet = args.length >= 6 ? Integer.parseInt(args[5]) : 0;
final String keyParam = args.length >= 7 ? args[6] : "0";
final AtomicLong atomicSuccessNums = new AtomicLong(0);
final List<Long> tpsList = new ArrayList<Long>();
final String msg = buildMessage(messageSize);
final List<String> keys = getKeys(keySize, nThreads * sendNumOnceTime, times);
doRun(cluster, tpsList, atomicSuccessNums, nThreads, times, sendNumOnceTime, keys, msg, keySize, messageSize, setOrGet, keyParam);
}

private static JCacheClient getCluster() {
Properties properties = new Properties();
properties.put(PropertiesConst.Keys.HOST_AND_PORT, HostAndPort);
properties.put(PropertiesConst.Keys.AUTH_KEY, "13F455A8E9DC2BBEBE1BD906C82B3C0A1");
properties.put(PropertiesConst.Keys.NAMESPACE, "weidian-1");
return CacheFactorySingle.createJCacheClient(properties);
}

private static void doRun(final JCacheClient cluster,
final List<Long> tpsList, final AtomicLong atomicSuccessNums,
final int nThreads, final int times, final int sendNumOnceTime,
final List<String> keys, final String msg, final int keySize, final int messageSize,
final int setOrGet, final String keyParam) {
final AtomicLong atomicFailNum = new AtomicLong(0);
for (int time = 0; time < times; time++) {
final Object object = new Object();
synchronized (object) {
final int t = time + 1;
final ExecutorService exec = Executors.newCachedThreadPool();
final long startCurrentTimeMillis = System.currentTimeMillis();
final CyclicBarrier barrier = new CyclicBarrier(nThreads, new Runnable() { // 设置几个线程为一组,当这一组的几个线程都执行完成后,然后执行住线程的
public void run() {
synchronized (object) {
long endCurrentTimeMillis = System.currentTimeMillis();
long sendNums = nThreads * sendNumOnceTime;
long escapedTimeMillis = endCurrentTimeMillis - startCurrentTimeMillis;
long tps = sendNums * 1000 / escapedTimeMillis;
String type = "set";
if (setOrGet != 0) {
type = "get";
}
tpsList.add(tps);
System.out.printf("第 %d 次, 发送完成, 用时 : %d ms, " + "线程大小 : %d , " + "key大小 : %d , " + "msg大小 : %d , " + "发送数量 : %d , " + "成功数量 : %d , " + "失败数量 : %d , " + "统计方式 : %s , " + "TPS : %d !!!",
t, escapedTimeMillis, nThreads, keySize, messageSize, sendNums, atomicSuccessNums.intValue(), atomicFailNum.intValue(), type, tps);
exec.shutdown();
object.notify();
System.out.println();
}
}
});
for (int i = 0; i < nThreads; i++) {

final String finalI = "i" + i;
exec.execute(new Runnable() {
public void run() {
try {
for (int j = 0; j < sendNumOnceTime; j++) {
if (setOrGet == 0) {
String key = "ke" + keyParam + finalI + "j" + j;
try {
String resp = cluster.set(key, msg);
if (!"OK".equals(resp)) {
atomicFailNum.incrementAndGet();
} else {
atomicSuccessNums.incrementAndGet();
}
} catch (Exception e) {
atomicFailNum.incrementAndGet();
}
} else {
cluster.get(keys.get(atomicSuccessNums.intValue()));
}
}
barrier.await();
} catch (Exception e) {
try {
barrier.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (BrokenBarrierException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
}
});
}
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
long sum = 0;
for (Long tps : tpsList) {
sum += tps;
}
System.out.printf("全部发送完成, 平均TPS : %d !!!", sum / tpsList.size());
}

private static String buildMessage(final int messageSize) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < messageSize; i += 8) {
sb.append("hello baby");
}
return sb.toString();
}

private static List<String> getKeys(int keySize, int keys, int times) {
System.out.println(">>>>>>>>>>>>>>正在生成Key>>>>>>>>>>>>>>");
List<String> keysList = new ArrayList<String>();
// for (int i = 0; i < keys * times; i++) {
// keysList.add(getUId(keySize, i));
// }
System.out.println(">>>>>>>>>>>>>>生成成功Key>>>>>>>>>>>>>>");
return keysList;
}

private static String getUId(int keySize, int i) {
return String.format("%0" + keySize + "d", i);
}

private static String getUUId() {
return UUID.randomUUID().toString();
}

}

用例执行命令

1
java -cp jedis-2.9.0.jar:commons-pool2-2.3.jar: RedisClusterDataCorrect 100 10000 10 10 1 0

用例参数说明

  • 第一个参数是线程数量
  • 第二个参数是每个线程执行操作次数
  • 第三个参数是key的大小(字节)
  • 第四个参数是val的大小(字节)
  • 第五个参数是运行次数
  • 第六个参数是操作类型,0标示set,1标示get

noeviction策略

测试用例

每条数据1K,运行200M的数据,也就是测试204800条数据的读取和写入。

1
java  -cp  jedis-2.9.0.jar:commons-pool2-2.3.jar: RedisClusterStrategy  1024  204800
测试结果

当集群的运行内存超过最大使用内存后,测试程序抛异常。noeviction策略不淘汰数据。
Alt text

查看redis的key数量
Alt text

allkeys-lru策略

测试用例

每条数据1K,运行200M的数据,也就是测试204800条数据的读取和写入。

1
java  -cp  jedis-2.9.0.jar:commons-pool2-2.3.jar: RedisClusterStrategy  1024  204800
测试结果

内存占用到达最大值后会置换之前的数据,所有测试数据能全部跑完

查看redis的key数量
Alt text

volatile-lru策略

测试用例

每条数据1K,运行200M的数据,也就是测试204800条数据的读取和写入。

1
java  -cp  jedis-2.9.0.jar:commons-pool2-2.3.jar: RedisClusterStrategy  1024  204800
测试结果
  • 内存占用到达最大值后,如果没有设置过期的数据仍在添加,程序会抛出异常。
    Alt text
  • 调整测试用例,前100条不设置过期时间,后面全部设置过期时间,程序能正常跑完。

查看redis的key数量
Alt text

allkeys-random策略

测试用例

每条数据1K,运行200M的数据,也就是测试204800条数据的读取和写入。

1
java  -cp  jedis-2.9.0.jar:commons-pool2-2.3.jar: RedisClusterStrategy  1024  204800
测试结果

内存占用到达最大值后会置换之前的数据,所有测试数据能全部跑完

查看redis的key数量
Alt text

volatile-random策略

测试用例

每条数据1K,运行200M的数据,也就是测试204800条数据的读取和写入。

1
java  -cp  jedis-2.9.0.jar:commons-pool2-2.3.jar: RedisClusterStrategy  1024  204800
测试结果
  • 内存占用到达最大值后,如果没有设置过期的数据仍在添加,程序会抛出异常。
    Alt text
  • 调整测试用例,前100条不设置过期时间,后面全部设置过期时间,程序能正常跑完。

查看redis的key数量
Alt text

volatile-ttl策略

测试用例

每条数据1K,运行200M的数据,也就是测试204800条数据的读取和写入。

1
java  -cp  jedis-2.9.0.jar:commons-pool2-2.3.jar: RedisClusterStrategy  1024  204800
测试结果

调整测试用例过期时间设置从3600秒开始逐渐增加。内存占用到达最大值后会先移除最近过期的数据,所以程序能全部跑完。

查看redis的key数量
Alt text

总结

  • 启用内存策略之前,需要设置maxmeory配置项,即节点的最大内存使用值。
  • 考虑到持久化数据的缓存场景,选择自己的淘汰策略,一般情况建议设置为noeviction策略。
  • 根据使用场景,适当调低rewrite-percentage百分比,降低内存此时的最小内存值,以防止在进行rewrite时,使用大量内存导致进程挂死问题。