目录

LRU 原理和 Redis 实现

当需要从缓存中淘汰数据时,我们希望能淘汰那些将来不可能再被使用的数据,保留那些将来还会频繁访问的数据,但最大的问题是缓存并不能预言未来。一个解决方法就是通过 LRU 进行预测:最近被频繁访问的数据将来被访问的可能性也越大。缓存中的数据一般会有这样的访问分布:一部分数据拥有绝大部分的访问量。当访问模式很少改变时,可以记录每个数据的最后一次访问时间,拥有最少空闲时间的数据可以被认为将来最有可能被访问到。

基于 HashMap 和 双向链表实现 LRU 的

/img/lru.png

head 代表双向链表的表头,tail 代表尾部。首先预先设置 LRU 的容量,如果存储满了,可以通过 O(1) 的时间淘汰掉双向链表的尾部,每次新增和访问数据,都可以通过 O(1)的效率把新的节点增加到对头,或者把已经存在的节点移动到队头。

LRU Go 代码实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main

import "fmt"

type DLinkedNode struct {
key string
value int
pre *DLinkedNode
post *DLinkedNode
}

type LRUCache struct {
cache map[string]*DLinkedNode
count int
capacity int
head, tail *DLinkedNode
}

func NewLRUCache(capacity int) \*LRUCache {
head := &DLinkedNode{}
tail := &DLinkedNode{}

    head.post = tail
    tail.pre = head

    return &LRUCache{
    	cache:    map[string]*DLinkedNode{},
    	count:    0,
    	capacity: capacity,
    	head:     head,
    	tail:     tail,
    }

}

func (c \*LRUCache) Get(key string) int {
node, ok := c.cache[key]
if !ok {
return -1
}

    c.moveHead(node)

    return node.value

}

func (c \*LRUCache) Set(key string, value int) {
node, ok := c.cache[key]
if !ok {
newNode := &DLinkedNode{
key: key,
value: value,
}
c.cache[key] = newNode

    	c.count++

    	if c.count > c.capacity {
    		tail := c.popTail()
    		delete(c.cache, tail.key)
    		c.count--
    	}

    	c.addNode(newNode)
    } else {
    	node.value = value
    	c.moveHead(node)
    }

}

func (c *LRUCache) moveHead(node *DLinkedNode) {
c.removeNode(node)
c.addNode(node)
}

func (c *LRUCache) popTail() *DLinkedNode {
node := c.tail.pre
c.removeNode(node)
return node
}

func (c *LRUCache) removeNode(node *DLinkedNode) {
pre := node.pre
post := node.post

    pre.post = post
    post.pre = pre

}

func (c *LRUCache) addNode(node *DLinkedNode) {
node.pre = c.head
node.post = c.head.post

    c.head.post.pre = node
    c.head.post = node

}

func (c \*LRUCache) print() {
head := c.head

    for head.post != nil {
    	fmt.Printf("%s ", head.post.key)
    	head = head.post
    }
    fmt.Println()

}

func main() {
l := NewLRUCache(3)

    l.Set("1", 1)
    l.Set("2", 2)
    l.Set("3", 3)
    l.print() // 3 2 1

    l.Set("4", 4)
    l.print() // 4 3 2

    l.Get("2")
    l.print() // 2 4 3

}

Redis 近似 LRU 实现

Redis 作为缓存使用时,一些场景下要考虑内存的空间消耗问题。Redis 会删除过期键以释放空间,过期键的删除策略有两种:

  • 惰性删除:每次从键空间中获取键时,都检查取得的键是否过期,如果过期的话,就删除该键;如果没有过期,就返回该键。
  • 定期删除:每隔一段时间,程序就对数据库进行一次检查,删除里面的过期键。

另外,Redis 也可以开启 LRU 功能来自动淘汰一些键值对。

Redis 配置中和 LRU 有关的有三个:

  • maxmemory: 配置 Redis 存储数据时指定限制的内存大小,比如 100m。当缓存消耗的内存超过这个数值时, 将触发数据淘汰。该数据配置为 0 时,表示缓存的数据量没有限制, 即 LRU 功能不生效。64 位的系统默认值为 0,32 位的系统默认内存限制为 3GB
  • maxmemory_policy: 触发数据淘汰后的淘汰策略
  • maxmemory_samples: 随机采样的精度,也就是随即取出 key 的数目。该数值配置越大, 越接近于真实的 LRU 算法,但是数值越大,相应消耗也变高,对性能有一定影响,样本值默认为 5。

出于节省内存的考虑,Redis 的 LRU 算法并非完整的实现。Redis 并不会选择最久未被访问的键进行回收,相反它会尝试运行一个近似 LRU 的算法,通过对少量键进行取样,然后回收其中的最久未被访问的键。通过调整每次回收时的采样数量 maxmemory-samples,可以实现调整算法的精度。每个 Redis Object 可以挤出 24 bits 的空间,但 24 bits 是不够存储两个指针的,而存储一个低位时间戳是足够的,Redis Object 以秒为单位存储了对象新建或者更新时的 unix time,也就是 LRU clock,24 bits 数据要溢出的话需要 194 天,而缓存的数据更新非常频繁,已经足够了。

Redis 的键空间是放在一个哈希表中的,要从所有的键中选出一个最久未被访问的键,需要另外一个数据结构存储这些源信息,这显然不划算。最初,Redis 只是随机的选 3 个 key,然后从中淘汰,后来算法改进到了 N 个 key 的策略,默认是 5 个。

Redis3.0 之后又改善了算法的性能,会提供一个待淘汰候选 key 的 pool,里面默认有 16 个 key,按照空闲时间排好序。更新时从 Redis 键空间随机选择 N 个 key,分别计算它们的空闲时间 idle,key 只会在 pool 不满或者空闲时间大于 pool 里最小的时,才会进入 pool,然后从 pool 中选择空闲时间最大的 key 淘汰掉。

数据访问模式非常接近幂次分布时,也就是大部分的访问集中于部分键时,LRU 近似算法会处理得很好。