业务模型 读、写、删的比例大致是7:3:1,至少要支持500w条缓存,平均每条缓存6k,要求设计一套性能比较好的缓存算法。 不考虑MemCached,Velocity等现成的key-value缓存方案,也不考虑脱离.net gc自己管理内存,不考虑随机读取数据及顺序读取数据的场景,目前想到的有如下几种LRU方案
目前几种方案在多线程下应该都需要加锁,不太好设计无锁的方案,下面这个链接是一个支持多线程的方案,但原理至今没搞特明白 用普通链表简单实现LRU缓存 Get方法的时候直接从字典里获取到要移动的节点,然后把这个节点的上一个节点的Next指针指向给下一个节点,下一个节点的Previous指针指向上一个节点,这样就把移动节点的操作简化成O(1)了,提高了缓存读取的效率。 public class LRUCacheHelper<K, V> {
readonly Dictionary<K, V> _dict; readonly LinkedList<K> _queue = new LinkedList<K>(); readonly object _syncRoot = new object(); private readonly int _max; public LRUCacheHelper(int capacity, int max) { _dict = new Dictionary<K, V>(capacity); _max = max; } public void Add(K key, V value) { lock (_syncRoot) { checkAndTruncate(); _queue.AddFirst(key); //O(1) _dict[key] = value; //O(1) } } private void checkAndTruncate() { lock (_syncRoot) { int count = _dict.Count; //O(1) if (count >= _max) { int needRemoveCount = count / 10; for (int i = 0; i < needRemoveCount; i++) { _dict.Remove(_queue.Last.Value); //O(1) _queue.RemoveLast(); //O(1) } } } } public void Delete(K key) { lock (_syncRoot) { _dict.Remove(key); //(1) _queue.Remove(key); // O(n) } } public V Get(K key) { lock (_syncRoot) { V ret; _dict.TryGetValue(key, out ret); //O(1) _queue.Remove(key); //O(n) _queue.AddFirst(key); //(1) return ret; } } }
_dict.TryGetValue(key, out ret); //O(1) 我改进后的链表就差不多满足需求了,
其中截断的时候可以指定当缓存满的时候截断百分之多少的最少使用的缓存项。 其它的就是多线程的时候锁再看看怎么优化,字典有线程安全的版本,就把.net 3.0的读写锁扣出来再把普通的泛型字典保证成ThreadSafelyDictionay就行了,性能应该挺好的。 链表的话不太好用读写锁来做线程同步,大不了用互斥锁,但得考虑下锁的粒度,Move,AddFirst,RemoveLast的时候只操作一两个节点,是不是想办法只lock这几个节点就行了,Truncate的时候因为要批量操作很多节点,所以要上个大多链表锁,但这时候怎么让其它操作停止得考虑考虑,类似数据库的表锁和行锁。 80多w,每秒钟写操作越20多w。 实现代码
public class DoubleLinkedListNode<T> {
public T Value { get; set; } public DoubleLinkedListNode<T> Next { get; set; } public DoubleLinkedListNode<T> Prior { get; set; } public DoubleLinkedListNode(T t) { Value = t; } public DoubleLinkedListNode() { } public void RemoveSelf() { Prior.Next = Next; Next.Prior = Prior; } } public class DoubleLinkedList<T> { protected DoubleLinkedListNode<T> m_Head; private DoubleLinkedListNode<T> m_Tail; public DoubleLinkedList() { m_Head = new DoubleLinkedListNode<T>(); m_Tail = m_Head; } public DoubleLinkedList(T t) : this() { m_Head.Next = new DoubleLinkedListNode<T>(t); m_Tail = m_Head.Next; m_Tail.Prior = m_Head; } public DoubleLinkedListNode<T> Tail { get { return m_Tail; } } public DoubleLinkedListNode<T> AddHead(T t) { DoubleLinkedListNode<T> insertNode = new DoubleLinkedListNode<T>(t); DoubleLinkedListNode<T> currentNode = m_Head; insertNode.Prior = null; insertNode.Next = currentNode; currentNode.Prior = insertNode; m_Head = insertNode; return insertNode; } public void RemoveTail() { m_Tail = m_Tail.Prior; m_Tail.Next = null; return; } } public class LRUCacheHelper<K, V> { class DictItem { public DoubleLinkedListNode<K> Node { get; set; } public V Value { get; set; } } readonly Dictionary<K, DictItem> _dict; readonly DoubleLinkedList<K> _queue = new DoubleLinkedList<K>(); readonly object _syncRoot = new object(); private readonly int _max; public LRUCacheHelper(int capacity, int max) { _dict = new Dictionary<K, DictItem>(capacity); _max = max; } public void Add(K key, V value) { lock (this) { checkAndTruncate(); DoubleLinkedListNode<K> v = _queue.AddHead(key); //O(1) _dict[key] = new DictItem() { Node = v, Value = value }; //O(1) } } private void checkAndTruncate() { int count = _dict.Count; //O(1) if (count >= _max) { int needRemoveCount = count / 10; for (int i = 0; i < needRemoveCount; i++) { _dict.Remove(_queue.Tail.Value); //O(1) _queue.RemoveTail(); //O(1) } } } public void Delete(K key) { lock (this) { _dict[key].Node.RemoveSelf(); _dict.Remove(key); //(1) } } public V Get(K key) { lock (this) { DictItem ret; if (_dict.TryGetValue(key, out ret)) { ret.Node.RemoveSelf(); _queue.AddHead(key); return ret.Value; } return default(V); } } }
程序初始化200w条缓存,然后不断的加,每加到500w,截断掉10分之一,然后继续加。 (责任编辑:admin) |