Skip to content

门泊吴船亦已谋

Lock-free 数据结构设计

根据名字我们知道它的内部没有使用锁,可以通过原语和一些蛇皮操作来实现

无锁与无等待

需要注意的一个问题是无锁并不一定意味着访问一个资源的时候不需要等待
大多数情况下,无锁数据结构看起来像使用了一个自旋锁,但可以避免产生死锁

不需要等待的数据结构称为无等待数据结构(废话)
无等待数据结构首先是一个无锁数据结构,然后就是访问资源时候不会有冲突,也不需要等待,通常需要很精巧的设计,比较难以实现

CAS 原语

Compare and swap,比较并交换的原语
描述起来比较绕,我们直接看看怎么用

如果希望修改某个变量,我们首先对它进行备份,然后这个备份上进行修改
最后把原变量用备份的指针替换掉

在最后一步的时候如果检测到这个变量已经被其他线程修改了,那就要重新对新的变量执行上述的修改,然后再检测修改尝试替换,不断重复

这个检测并替换的步骤就是用 CAS 原语来实现的

另外,如果多个线程同时访问同一个资源,就可能因为资源被修改而陷入重新修改资源再 CAS 的循环。这种情况被称为活锁,也就是上文中所说的「等待」
只要线程赶在其他线程修改资源之前,完成了修改,并且成功 CAS,他就解开了活锁
活锁的解开是随机的,看起来一个线程永远陷入活锁是可能的,但是一般来说线程之间竞争不会这么激烈(如果会的话那可以考虑重写了)

无锁的有序数组

有序数组在查找的时候使用二分,复杂度 O(log(n));在修改的时候需要移动数组中元素的位置,复杂度 O(n)
其实不是那么常用,在高频访问,低频修改的情况下会比较好

不过它实现无锁非常简单,因此作为例子

读的时候随便读,写的时候先把原数组拷贝一份,然后修改那份拷贝,最后用原语把那份拷贝数组的指针存回去就完事了
需要注意的是数组内存的销毁问题,使用智能指针就能立刻解决这个问题

然后贴一下代码

template <typename K, typename V>
class LockFreeOrderedArray {
   private:
    shared_ptr<vector<pair<K, shared_ptr<V>>>> array;

   public:
    LockFreeOrderedArray()
        : array(make_shared<vector<pair<K, shared_ptr<V>>>>()) {}
    void Put(K key, shared_ptr<V> value) {
        auto oriArray = atomic_load(&array);
        auto newArray =
            make_shared<vector<pair<K, shared_ptr<V>>>>(oriArray->size() + 1);
        do {
            // 不允许键值重复
            if (Get(key)) return;
            // 构建新的数组
            newArray->clear();
            auto it = oriArray->begin();
            for (; it != oriArray->end() && it->first < key; it++)
                newArray->push_back(*it);
            newArray->push_back(make_pair(key, value));
            for (; it != oriArray->end(); it++) newArray->push_back(*it);
        } while (!atomic_compare_exchange_weak(&array, &oriArray, newArray));
    }
    shared_ptr<V> Get(K key) {
        auto oriArray = atomic_load(&array);
        auto res = lower_bound(
            oriArray->begin(), oriArray->end(), make_pair(key, shared_ptr<V>()),
            [](auto a, auto b) { return a.first < b.first; });
        if (res == oriArray->end()  res->first != key)
            return shared_ptr<V>();
        return res->second;
    }
};

无锁的栈

我们可以用邻接链表来实现

读的时候 xjb 读
写的时候生成一个新的 head,然后用 CAS 不停的尝试把当前的 head 替换掉,同时注意保证新的 head 指向当前的 head

下面是代码

template <typename T>
class LockFreeStack {
    struct Node {
        shared_ptr<T> content;
        shared_ptr<Node> next;
        Node(shared_ptr<T> content, shared_ptr<Node> next)
            : content(content), next(next) {}
    };

   private:
    shared_ptr<Node> head;

   public:
    void Push(shared_ptr<T> value) {
        auto newHead = make_shared<Node>(value, atomic_load(&head));
        while (!atomic_compare_exchange_weak(&head, &newHead->next, newHead))
            ;
    }
    shared_ptr<T> Pop() {
        auto oriHead = atomic_load(&head);
        while (oriHead &&
               !atomic_compare_exchange_weak(&head, &oriHead, oriHead->next))
            ;
        return oriHead ? oriHead->content : shared_ptr<T>();
    }
    shared_ptr<T> Top() {
        auto top = atomic_load(&head);
        return top ? top->content : shared_ptr<T>();
    }
};

一些需要注意的地方

上面代码中操作 shared_ptr 的原语内部可能不是无锁的
因为 shared_ptr 的内部存了两个指针,而目前在硬件层面上对两个指针进行 CAS 操作还不太能实现(据说 TSX 指令集支持但是效率不高)

另外操作 shared_ptr 的 atomic 模版方法在 C++20 中已经弃用,以后应当用 atomic 模板类(然而国内普及 C++20 可能要等到 2200 年)