Skip to content

门泊吴船亦已谋

C++ 标准库 多线程编程不完全指南

某门课在讲管程,给的伪码十分丑陋看的我头痛,于是自己用 C++写了一遍

本来是 boost 库的东西,然后在 C++11 被转入标准库了,从此 C++标准库终于有多线程了

本文涉及线程、互斥锁、条件变量、返回值、线程池

thread

线程
被创建之后就能直接自己独立运行了

但需要注意的是线程的销毁
调用线程的 detach 方法使得子线程独立运行,子线程结束时他能自己销毁
调用 join 方法,主线程会被阻塞,直到子线程结束才继续执行,然后将子线程销毁
当然,不是父子线程也可以 detach 或者 join(孤儿?)

此外,当进程退出时所有的线程都会被强制销毁。

#include <chrono>
#include <cstdio>
#include <thread>
using namespace std;

void ProcessA() {
    for (int i = 0; i < 20; i++)
        this_thread::sleep_for(chrono::milliseconds(50)), puts("A");
    puts("A 线程结束");
}
void ProcessB() {
    for (int i = 0; i < 10; i++)
        this_thread::sleep_for(chrono::milliseconds(50)), puts("B");
    puts("B 线程结束");
}
void ProcessPrt() {
    thread a(ProcessA);
    a.detach();
    thread b(ProcessB);
    b.join();
    puts("父线程结束");
}

int main() {
    thread prt(ProcessPrt);
    prt.detach();
    getchar();
    return 0;
}

解释:
main 中创建了 prt 线程,prt 线程创建了 A、B 两个子线程,然后分离 A 线程,并等待 B 线程结束
运行后可以看到 B 线程结束后 prt 线程才结束,但线程 A 还在跑

mutex

互斥锁
多个线程访问同一个资源时可以使用互斥锁,阻塞其他线程以保证一个一个访问,防止同时访问发生问题(当然也可以玩一些骚操作)

#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;

mutex mtx;

void ProcessA() {
    for (int i = 0; i < 10; i++) {
        mtx.lock();
        cout << "A, time = " << clock() << endl;
        mtx.unlock();
    }
}
void ProcessB() {
    for (int i = 0; i < 10; i++) {
        mtx.lock();
        cout << "B, " << "time = " << clock() << endl;
        mtx.unlock();
    }
}

int main() {
    thread a(ProcessA);
    thread b(ProcessB);
    a.join();
    b.join();
    return 0;
}

解释:
如果把 mutex 相关的东西注释掉,会发现输出奇奇怪怪,因为标准输出被两个线程来回抢。
使用 mutex 加锁后,得到正常的输出

lock_guard

它在构造函数中对 mutex 进行 lock,析构的时候 unlock,很方便
因此可以把上面 mutex 的示例代码写成这样的形式

#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;

mutex mtx;

void ProcessA() {
    for (int i = 0; i < 10; i++) {
        lock_guard<mutex> lock(mtx);
        cout << "A, time = " << clock() << endl;
    }
}
void ProcessB() {
    for (int i = 0; i < 10; i++) {
        lock_guard<mutex> lock(mtx);
        cout << "B, " << "time = " << clock() << endl;
    }
}

int main() {
    thread a(ProcessA);
    thread b(ProcessB);
    a.join();
    b.join();
    return 0;
}

unique_lock

它具有 lock_guard 的功能,但更灵活,当然开销也更大

unique_lock 可以在析构之前 unlock,构造的时候可以不 lock,也可以 try_lock
具体操作可以查文档(看语法提示不就行了吗),比较蛇皮废话太多懒得写了

condition_variable

条件变量
配合 unique_lock 使用,用于线程之间交互,比如通知共享变量被修改等

一个线程在 wait 的时候会被阻塞,如果之后其他线程进行了 notify 操作,这个阻塞的线程会被唤醒

下面示例代码控制两个线程交替输出

#include <condition_variable>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;

mutex mtx;
condition_variable cv;
bool flag;

void ProcessA() {
    for (int i = 0; i < 10; i++) {
        unique_lock<mutex> lck(mtx);
        while (!flag) cv.wait(lck);
        flag = false;
        printf("A\n");
        cv.notify_one();
    }
}
void ProcessB() {
    for (int i = 0; i < 10; i++) {
        unique_lock<mutex> lck(mtx);
        while (flag) cv.wait(lck);
        flag = true;
        printf("B\n");
        cv.notify_one();
    }
}

int main() {
    thread a(ProcessA);
    thread b(ProcessB);
    a.join();
    b.join();
    return 0;
}

解释:
两个线程输出后更新标记为自己,当标记表明上一次输出还是自己时则调用 wait 阻塞自己,等另一个线程输出结束 notify 的时候才被唤醒

然后这个交替输出也可以写成管程的形式

#include <condition_variable>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;

struct Moniter {
    mutex mtx;
    condition_variable cv;
    bool flag;
    void A() {
        unique_lock<mutex> lck(mtx);
        while (!flag) cv.wait(lck);
        flag = false;
        printf("A\n");
        cv.notify_one();
    }
    void B() {
        unique_lock<mutex> lck(mtx);
        while (flag) cv.wait(lck);
        flag = true;
        printf("B\n");
        cv.notify_one();
    }
} m;

void ProcessA() {
    for (int i = 0; i < 10; i++) m.A();
}
void ProcessB() {
    for (int i = 0; i < 10; i++) m.B();
}

int main() {
    thread a(ProcessA);
    thread b(ProcessB);
    a.join();
    b.join();
    return 0;
}

但是需要注意一个严重的问题,等待条件变量的线程可能会因为某些原因极小概率地被唤醒,而条件并没有得到满足,这就是传说中的虚假唤醒。据说可能来源于操作系统的玄学操作,也可能是多核处理器环境下 notify_one 的时候低概率导致多个 wait 被依次唤醒,因此在上面的例子中在 wait 的时候需要使用 while 反复检查条件是否满足。不过最常见的情况应该是 notify_all 的时候会导致所有的 wait 都被依次唤醒,每个 wait 在被唤醒后并不知道条件是否仍然满足,所以还需要再次检查条件。当然我们的例子中处于 wait 状态的线程每次只会有一个,因此其实不需要担心,但是考虑到前面提到的操作系统的玄学操作以及避免潜在的误用情况,还是使用了 while 来反复检查条件

future

可以理解为存放数据的容器,它的数据在未来某一时间被写入。没有数据的时候如果调用 get 方法试图获取数据,会被阻塞,直到数据被写入。

因此可以用于线程之间传递数据,也可以简单的当作线程的返回值

要配合 promise 使用,下面展示使用 future 实现线程返回值

#include <chrono>
#include <cstdio>
#include <future>
#include <thread>

using namespace std;

void ProcessA(promise<int>* p) {
    for (int i = 0; i < 10; i++) {
        this_thread::sleep_for(chrono::milliseconds(100));
        printf("A\n");
    }
    p->set_value(clock());
}

int main() {
    promise<int> p;
    future<int> f = p.get_future();
    thread a(ProcessA, &p);
    a.detach();
    printf("Thread A return, time = %d\n", f.get());
    getchar();
    return 0;
}

解释:
线程 A 不断输出,结束时返回当前的时间。
而主线程因为要获取 f 的值,一直被阻塞直到线程 A 写入数据

线程池

线程不断开辟销毁会降低效率,可以预先开辟一定量的线程,然后将任务分配给它们执行,实现线程的循环使用

下面简单实现一下
维护一个 Task 队列,由 Worker 线程负责执行其中的 Task

#include <chrono>
#include <cstdio>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
using namespace std;

class ThreadPool {
   private:
    typedef function<void()> Task;
    int threadCnt;
    bool isRunning;
    vector<thread> workers;
    queue<Task> tasks;
    mutex mtx;
    condition_variable cv;
    void ProcessWorker() {
        while (isRunning) {
            Task task;
            {
                unique_lock<mutex> lock(mtx);
                if (tasks.empty()) cv.wait(lock);
                if (!tasks.empty()) {
                    task = tasks.front();
                    tasks.pop();
                }
            }
            if (task) task();
        }
    }
   public:
    explicit ThreadPool(int cnt) : threadCnt(cnt), isRunning(0) {}
    ~ThreadPool() {
        if (isRunning) Stop();
    }
    void Start() {
        isRunning = 1;
        for (int i = 0; i < threadCnt; i++)
            workers.emplace_back(thread(&ThreadPool::ProcessWorker, this));
    }
    void Push(const Task &task) {
        if (isRunning) {
            lock_guard<mutex> lock(mtx);
            tasks.push(task);
            cv.notify_one();
        }
    }
    void Stop() {
        {
            lock_guard<mutex> lock(mtx);
            isRunning = false;
            cv.notify_all();
        }
        for (auto &thd : workers) thd.join();
    }
};

void ProcessA() {
    puts("A start");
    for (int i = 0; i < 5; i++)
        this_thread::sleep_for(chrono::milliseconds(300)), puts("A");
    puts("A end");
}
void ProcessB() {
    puts("B start");
    for (int i = 0; i < 5; i++)
        this_thread::sleep_for(chrono::milliseconds(100)), puts("B");
    puts("B end");
}
void ProcessC() {
    puts("C start");
    for (int i = 0; i < 5; i++)
        this_thread::sleep_for(chrono::milliseconds(150)), puts("C");
    puts("C end");
}
void ProcessD() {
    puts("D start");
    for (int i = 0; i < 5; i++)
        this_thread::sleep_for(chrono::milliseconds(250)), puts("D");
    puts("D end");
}
void ProcessE() {
    puts("E start");
    for (int i = 0; i < 5; i++)
        this_thread::sleep_for(chrono::milliseconds(200)), puts("E");
    puts("E end");
}

int main() {
    ThreadPool tp(2);
    tp.Start();
    tp.Push(ProcessA);
    tp.Push(ProcessB);
    tp.Push(ProcessC);
    tp.Push(ProcessD);
    tp.Push(ProcessE);

    getchar();
    tp.Stop();
    return 0;
}

解释:
Push 方法可向队列中添加 Task
每个 Worker 线程从队列中获取 Task 并执行,结束后再次获取 Task 执行并如此不断循环

注意
队列要互斥访问,从队列中取 Task 使用了条件变量
然后线程池析构或停止时要 notify_all 使 Worker 线程从 wait 中唤醒并正常退出,还有 join