4237 字
21 分钟
C++20-P1_标准库多线程设施简介

封面来源:多线程

  • 对多线程编程中的问题与相关C++工具进行解释,基于C++20.

多线程中的问题#

  • 数据竞争:多个线程访问共享内存,且至少有一个线程写入共享内存.(典型问题有撕裂读写,解决方案为设计只读数据结构,无写入时不会有竞争问题)
  • 死锁:两个线程分别持有对方期望拥有的资源,导致共同阻塞.(解决方案:定时检测)
  • 伪共享 line被一个线程锁定,另一线程无法并行(解决方案:内存对齐)

线程基础#

创建线程#

  • 常用std::thread或std::jthread创建线程.
  • 线程结合
    • 除非对std::thread创建的线程使用.join()或者.detach()成员函数,线程默认是可结合的.
    • 如果销毁可结合线程,C++会调用std::terminate()强行终止整个程序.
    • jthread通过在析构函数中调用.join()避免了带崩整个程序的现象,但可能导致意外的阻塞.
#include <thread>
// 1.std::thread
// 允许接受任意数量的参数,第一个参数为待调用的函数,后续参数是向该函数传递的参数
// 使用普通函数创建线程
void counter(int id, int numIter) {
for (auto i{0}; i < numIter; ++i) {
cout << "Counter " << id << "has value " << i << endl;
}
}
thread t1 {counter, 1, 6};
thread t2 {counter, 2, 4};
t1.join();
t2.join();
// 使用functor创建线程
struct Counter {
public:
explicit Counter(int id ,int numIter) : id_(id), max_iter_(numIter) {}
void operator() () const {
for (auto i{0}; i < numIter; ++i)
cout << "Counter " << id << "has value " << i << endl;
}
private:
int id_;
int max_iter_;
}
thread t3 {Counter{1, 20}};
auto c = Counter{2, 12};
thread t4 {c};
// 使用lambda函数创建线程
int id = 1;
int numIter = 5;
auto func = [id, numIter] () {
for (auto i{0}; i < numIter; ++i)
cout << "Counter " << id << "has value " << i << endl;
};
thread t5 {func};
// 使用类成员函数创建线程
class ClassWithMemberCount {
public:
explicit ClassWithMemberCount(int id ,int numIter) : id_(id), max_iter_(numIter) {}
void count(int id ,int numIter) {
for (auto i{0}; i < numIter; ++i)
cout << "Counter " << id << "has value " << i << endl;
}
private:
int id_;
int max_iter_;
};
ClassWithMemberCount cls{1, 2};
thread t6 {&ClassWithMemberCount::count, &cls};
// 2.std::jthread
// 类似于std::thread,支持上述所有操作.
// 但在析构函数中自动调用.join()防止带崩整个程序,且支持协作式取消.
// 协作式取消见"线程基础操作".

线程基础操作#

从线程获取计算结果#

  • 使用**指针或std::ref()**传递引用
  • 使用promise-future组合(见后文)
#include <thread>
// 1. 传递引用
void adder(int &in) {
++in;
}
int main() {
int a{1};
std::thread(adder, a); // 会编译错误
std::thread(adder, std::ref(a)); // 正确
std::cout << a; // 2
}
// 2. promise-future(见后文)

异常处理#

// 1. 传统异常处理
// 适用于包装现有代码.
exception_ptr current_exception() noexcept; // 返回正在处理的异常或者空的exception_ptr对象,可用if判断.
[[noreturn]] void rethrow_exception(exception_ptr p); // 再次抛出正在处理的异常.
exception_ptr make_exception_ptr(E e); // 复制一份e,创建一个指向副本的exception_ptr.
// 工作线程.
void do_things() {
// 实际工作.
throw runtime_error {"Exception thrown!"};
}
// Thread handling exception
void ThreadFunc(exception_ptr& err) {
try {
do_things();
} catch (...) {
err = current_exception(); // 使用引用将异常对象传递到上级.
}
}
// Class Managing Thread.
void ThreadCreator() {
exception_ptr error;
thread t {ThreadFunc, ref(error)};
t.join();
if (error) { // 在主线程抛出异常.
rethrow_exception(error);
} else {
// normal end.
}
}
int main() {
try {
ThreadCreator();
} catch (const exception& e) {
// exception handler.
}
}
// 2.使用promise-future.
// 更为简便,见promise-future一节.

线程本地存储#

  • thread_local int n;
  • 局部thread_local变量
    • 在多次调用该函数时,将继承上一次的值(和static变量的行为一致)
    • 但各个线程之间不共享,作用域仍为局部变量的作用域.

协作式取消#

  • 顾名思义,需要被取消线程协作的线程取消机制.
  • 可由std::jthread实现.
// thread 1: 需要被协作取消的线程.
void f(std::stop_token stop_token, int value)
{
while (!stop_token.stop_requested())
{
std::cout << value++ << ' ' << std::flush;
std::this_thread::sleep_for(200ms);
}
std::cout << std::endl;
// 析构时自动调用.join().
}
int main()
{
std::jthread thread(f, 5); // 打印 5 6 7 8... 约3s.
std::this_thread::sleep_for(3s);
// 析构时自动调用thread.request_stop().
// request_stop()函数也可以手动调用.
}

互斥类型#

内存序#

  • 有时也被称为内存屏障.

  • 现代CPU支持乱序执行,也即指令执行顺序与代码顺序并不一致.

    • 单线程:不影响计算结果.
    • 多线程:可能导致非预期结果.
  • 因此,C++对很多原子操作支持指定内存序(规定了某个具体线程内的指令该怎样执行),以防止多线程时的非预期结果.

  • 内存序种类作用
    memory_order_relaxed不指定内存序.
    memory_order_consume(即将废弃)用于读取指令.本线程上后续对该变量的读写操作禁止重排到该操作前.
    memory_order_acquire用于读取指令.本线程上后续所有读写操作禁止重排到该操作前.
    memory_order_release用于写入指令.前序所有读写操作禁止重排至该指令后.
    memory_order_acq_relacquire + release
    memory_order_seq_cstacq_rel,所有使用seq_ast的指令有严格全序关系;大部分操作的默认值
  • release-acquire模型

atomic<int> aint{0};
int nint{0}; // normal int.
void process1() {
nint = 1;
aint.store(1, std::memory_order_release);
}
void process2() {
int temp = aint.load(std::memory_order_acquire);
if(temp == 1) assert(nint == 1); // release-acquire所保证的内容.
}
// 注意:
// 无论内存序如何,对atom变量的操作始终具有原子性.
// release-acquire不能保证:
// 1. acquire指令发生在release之后(这是由线程具体运行进度所决定).
// 2. acquire指令先于release运行时,其它变量的有关行为.(如temp == 0时,nint的值存在竞争,可能为0也可能为1.)
// release-acqure只能保证:
// 当acquire指令确实发生在release之后时,release指令前的所有变量读写操作一定已经完成.

原子类型/互斥量#

  • 对C++支持的原子类型/互斥量按开销进行排序如下.
开销排名互斥类型说明
1(最小)std::atomic_flag最轻量的无锁原子标志位.
2std::atomic<T>轻量原子操作类型
3自旋锁标准库未提供,需自行实现.
4std::shared_mutex适用于多读者,读多写少的情况.
5std::mutex标准阻塞式互斥锁.
6std::recursive_mutex递归锁.
7std::timed_mutex支持超时的标准锁.
8(最大)std::recursive_timed_mutex支持超时的递归锁.
#include <atomic>
// 1.std::atomic_flag
// 1.1.初始化
std::atomic_flag flag = ATOMIC_FLAG_INIT; // 初始化为clear(false)状态.
std::atomic_flag flag2; // 不定状态(C++20后为clear状态).
// 1.2.读写操作
flag.clear(mem_order); // 置false.
flag.test(mem_order); // 返回现有值.
flag.test_and_set(mem_order); // 返回现有值,并置true.
// 1.3.同步操作
flag.wait(status, mem_order); // 阻塞到flag的值不等于status,且线程被唤醒(见"条件变量"一节).
flag.notify_one(); // 唤醒单个线程(见"条件变量"一节).
flag.notify_all(); // 唤醒所有线程(见"条件变量"一节).
// 2.std::atomic<T>
// T必须不含cv限定,可以平凡拷贝,且具备四个移动/复制函数.
// 2.1.初始化操作
std::atomic<int> aint{0};
std::atomic<int> aint1 = 0;
// 2.2.读写操作
aint = 1; // 等价于aint.store(1);
aint.store(1, mem_order); // 写入值.
aint.load(mem_order); // 读取值.
int b = aint + 1; // 隐式转换到T.
// 2.3.计算操作
// 对整型变量,实现了fetch_add, _sub, _and位与, _or位或, _xor位异或, ++, --, +=, -=, &=, ^=, |=
// 对浮点变量,实现了fetch_add, _sub
int fetched{value.fetch_add(4)}; // fetched == 10, value == 14.
// 2.4.同步操作
aint.wait(0, mem_order); // 类似于atomic_flag.
aint.notify_one();
aint.notify_all();
// 2.5.其他操作
aint.is_lock_free(); // 检查内部实现是否无锁.
aint.exchange(1, mem_order); // 将1赋给aint,返回aint现有值.全过程为原子操作.
aint.compare_exchange_weak(T&expected, T desired, mem_order); // 类似于_strong函数,性能更高,但有时aint == expected时,仍会返回false+更新expected.
aint.compare_exchange_strong(T&expected, T desired, mem_order); // 原子CAS(compare and swap).
// aint.compare_exchange_strong与如下原子逻辑等效:
if (*this == expected) {
*this = desired;
return true;
} else {
expected = *this;
return false;
}
// 2.6.别名
std::atomic_int // atomic_*对所有默认scalar量都成立.
// 2.7.原子引用atomic_ref
// 将非原子变量临时包装为原子变量,性能较差.
// 期间如果直接操作原变量,则原子性无法保证.
int inc{0};
atomic_ref<int> atomicCounter {counter};
// 3.自旋锁
// C++不提供标准实现,需自己写.
// 忙碌等待,只适合很短时间.
atomic_flag spinlock = ATOMIC_FLAG_INIT;
void lock() {
while(spinlock.test_and_set(std::memory_order_acquire));
}
void unlock() {
spinlock.clear(std::memory_order_release);
}
// 4.mutex/shared_mutex/timed_mutex
// 4.1.初始化
std::mutex mut;
// 4.2.加解锁
mut.lock(); // 阻塞加锁.
mut.try_lock(); // 非阻塞加锁,返回是否成功获得锁.
mut.unlock(); // 释放锁.
// 4.3.shared_mutex
// 又被称为readerwriter锁,适合读多写少的情况.
std::shared_mutex smut;
// 写锁相关:独占,只有没有任何线程持有读锁/写锁时才能获得.
smut.lock();
smut.try_lock();
smut.unlock();
// 读锁相关:非独占,多个线程可同时持有读锁.
smut.lock_shared();
smut.try_lock_shared();
smut.unlock_shared();
// 4.4.recursive_mutex
// 持有锁的线程可以多次加锁.
// 注意:unlock()次数必须与lock()一致才能解开.
std::recursive_mutex rmut;
rmut.lock();
rmut.try_lock();
rmut.unlock();
// 4.5.timed_mutex
std::timed_mutex tmut;
std::recursive_timed_mutex rtmut;
std::shared_timed_mutex stmut;
tmut.try_lock_for(rel_time); // 在相对时间内尝试获得锁,返回是否成功的bool.
tmut.try_lock_until(abs_time); // 在绝对时间点前尝试获得锁.

互斥量智能管理#

  • 使用标准库提供的互斥量管理工具以避免忘记手动加解锁.
// 1.std::lock()与std::try_lock()
// 适用于需要同时获得多个锁的情况,避免死锁.
std::mutex mut1, mut2;
std::lock(mut1, mut2, ...); // 不按指定顺序锁定互斥体对象;如果其中一个抛出异常,释放所有已获得的锁.
std::try_lock(mut1, mut2, ...); // 全成功:返回-1; 失败:释放所有已获得的锁,返回未成功获得锁的序号(0开始).
// 2.std::scoped_lock
std::scoped_lock slock(mut1, mut2, ...); // 使用std::lock()获取多个锁,在其生命周期结束自动释放.
// 3.lockguard
std::mutex mut, mut2;
std::lock_guard lock(mut); // 阻塞获得锁,在作用域生命周期结束时释放锁.
mut2.lock();
std::lock_guard lock2(mut2, std::adopt_lock); // 将已经获得的mut2交给lock2管理.
// 4.unique_lock
std::timed_mutex tmut;
std::unique_lock ulock(mut); // 初始化并阻塞获得锁,在作用域结束释放锁.
std::unique_lock ulock(mut, std::defer_lock); // 初始化但不取得锁.
std::unique_lock ulock(mut, std::try_to_lock); // 初始化并获得锁(非阻塞),可能不会成功获得锁.
std::unique_lock ulock(mut, std::adopt_lock); // 将已经获得的mut交由ulock管理.
std::unique_lock ulock(tmut, rel_time); // 必须使用支持超时的锁对象.
std::unique_lock ulock(tmut, abs_time);
bool is_obtained = ulock; // 判断锁是否成功获得.
ulock.release(); // 释放对内部锁的所有权(不对其进行解锁).
// 支持.lock(), .try_lock(), .unlock()等方法.
// 在作用域结束会自动解锁.
// 5.shared_lock
// 类似于unique_lock,除了获取的是shared_mutex的读锁.

线程同步#

  • 除了互斥类型,C++还支持了多种线程同步操作.

单次运行 call_once#

  • 保证特定函数在多线程中只运行一次.
  • 适用于懒初始化相关工作.
std::once_flag flag;
void process() {
std::call_once(flag, InitFunction); // 在多个线程同时运行process,只会运行一次InitFunction.
}

同步流#

#include <syncstream>
// 同步流中的内容在刷新缓冲区/析构时统一输出.
// 每个线程有独立缓冲区,不会出现覆盖现象.
std::osyncstream syncedCout {std::cout}; // 用于char
std::wosyncstream wsyncedCout; // 用于wchar

条件变量#

  • 非忙碌等待.
  • 运行到给定指令时,释放锁并挂起;被唤醒时尝试获得锁并继续执行.
  • 存在虚假唤醒问题,故一般会额外提供一个谓词函数/flag变量,只有唤醒+对应变量满足一定条件才会结束等待.
// 1.用于atomic变量的条件变量 (C++20)
// x.wait(oldValue)
// 在X等于oldValue时,将持续阻塞;
// 其它线程使用notify_one/notify_all/发生唤醒时,将检测X是否等于oldValue;
// 若等于则继续阻塞,否则结束阻塞,线程继续执行.
int main() {
std::atomic_int aint{0};
std::thread job { [&aint] () {
std::cout << "Job thread waiting." << std::endl;
aint.wait(0);
std::cout << aint << std::endl;
}};
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Notify1" << std::endl;
aint.notify_one(); // 子线程被唤醒,但aint仍为0,因此重新挂起,继续等待.
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // statement 1
aint.store(1);
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // statement 2
aint.notify_all(); // 子线程被唤醒,但aint仍为0,因此重新挂起,继续等待.
std::cout << "Notify2" << std::endl;
job.detach();
}
// 运行结果
// Job thread waiting.
// Notify1
// Notify2 (与下一行顺序不定)
// 1
// 2.用于mutex的环境变量.
std::condition_variable cv; // 只能等待std::mutex的条件变量.
std::condition_variable_any cva; // 可以等待多种锁类型(如shared_lock),但效率较低.
cv.wait(mut); // 注意:调用前,该线程应该已经持有mut;将释放mut并在原地阻塞.
cv.wait_until(mut, abs_time);
cv.wait_for(mut, rel_time);
cv.wait(mut, pred); // 带一个谓词的版本,当唤醒时,先获得锁,再检查谓词;如果为假,释放锁并再次进入等待.
cv.notify_one();
cv.notify_all();
// 对于以下函数,lck应该已经获得.
// 线程退出时将自动调用lck.unlock(); cond.notify_all();
// 其它线程上的cond在等待的互斥量必须与lck.mutex()一致,否则UB.
std::notify_all_at_thread_exit(cond, lck);
void producer() {
// 准备数据...
{
std::unique_lock<std::mutex> lock{mut};
queue.push_back(new_data);
}
cout << "data ready.";
cv.notify_one();
}
void consumer() {
std::unique_lock<std::mutex> lock{mut};
for(;;) {
cv.wait(lock, [this]{return !queue.empty();});
// 处理数据...
}
}

latch#

  • 一次性使用的线程协调点.
  • 用正整数初始化,当其计数器清零时,释放所有阻塞在当前latch的线程.
  • 适用于初始化场景.
#include <latch>
std::latch latch{n};
latch.arrive_and_wait(); // 递减计数器,阻塞当前线程.
latch.wait(); // 不递减计数器,阻塞线程.
latch.try_wait(); // 检测计数器是否已经为0.
latch.countdown(); // 减少计数器(不阻塞).
// 示例程序
std::latch latch{1};
vector<jthread> threads;
for(auto i = 0; i < 10; ++i) {
threads.push_back(jthread{[&latch](){
// 运行slave线程初始化操作...
latch.wait(); // 统一阻塞在latch处.
// 正式的处理过程...
}});
}
// master线程加载数据...
latch.countdown(); // 递减latch计数器,开始运行slave线程.

barrier#

  • 可重复使用的线程协调点.
  • 给定数量的线程达到指定协调点时,执行结束回调.
  • 结束回调执行完成后,结束所有进程的阻塞.
#include <barrier>
void completionFunction() noexcept {...}
int main() {
const size_t kNThreads = 4u;
barrier barrierPoint{kThreads, conpletionFunction};
vector<jthread> threads;
for(int i{0}; i < kNThreads; ++i) {
threads.push_back(jthread{
[&barrierPoint](std::stop_token token){
while(!token.stop_requested()) {
// 进行计算...
barrierPoint.arrive_and_wait();
}
}
});
}
}

信号量 semaphore#

  • 轻量级同步源.
#include <semaphore>
std::counting_semaphore cs{4}; // 多槽信号量
std::binary_semaphore bs;
cs.acquire(); // 递减计数器,若计数器为0时阻塞,直到有其他线程释放.
cs.try_acquire(); // 非阻塞获取信号量,未获得时返回false.
cs.try_acquire_for(); // 在一定时间内尝试获取.
cs.try_acquire_until();
cs.release(); // 释放信号量.

promise-future#

  • 方便地进行线程返回值获取与异常转移.
  • 子线程将结果放入promise,父线程通过future获取返回结果.

// 1. promise-future组合
// promise不可复制,但可移动.
promise<T> p;
p.get_future(); // 获取对应的future,只允许调用一次.
p.set_value(); // 设定返回值类型,只允许调用一次.
p.set_value_at_thread_exit();
p.set_exception();
p.set_exception_at_thread_exit();
auto f{p.get_future()}; // f类型为future<T>
f.get(); // 从future获取返回值,只能调用一次,否则抛异常.如果结果不可用会阻塞.
f.valid(); // 检查future是否为共享态,只有共享态才能调用get();调用get()后会退出共享态.
f.wait(); // 阻塞到结果可用.
f.wait_for(); // 阻塞到结果可用/达到给定时间.
f.wait_until();
void WorkFunction(promise<int> p) {
// 实际计算...
p.set_value(42);
}
int main() {
promise<int> p;
auto f {p.get_future()};
jthread t{workFunction, move(p)};
while(f.wait_for(0)) { // 检查结果是否计算完成.
auto res {f.get()};
cout << res;
} else {
// 执行结果未完成时的内容...
}
}
// 2. packaged_task
// 进一步封装,无需手动传递promise.
// 工作函数结束时,自动调用promise.set_value(return_of_work_function).
// 同样不可复制,只能移动.
int main() {
pacakaged_task<int(void)> pt {[](){return 42;}};
auto f {pt.get_future()};
jthread t{move(pt)};
while(f.wait_for(0)) { // 检查结果是否计算完成.
auto res {f.get()};
cout << res;
} else {
// 执行结果未完成时的内容...
}
}
// 3. async
// 进一步封装,无需手动创建进程.
// 注意:未捕获async返回值时,其返回的future临时对象将立即析构,
// 因此会使父线程阻塞直到子线程完成.
int workFunction() {return 42};
int main() {
// launch::async:强制创建新线程运行
// launch::deferred:强制在get()时使用当前进程运行
// launch::async | launch::deferred:自动选择(默认行为)
auto f {async(launch::async, workFunction)};
while(f.wait_for(0)) { // 检查结果是否计算完成.
auto res {f.get()};
cout << res;
} else {
// 执行结果未完成时的内容...
}
}
// 4. shared_future
// 允许多个进程同时阻塞在get()函数.
// 返回类型T必须可以复制.
promise<T> p;
shared_future<T> sf{p.get_future()}; // 该初始化只接受右值对象.
sf.valid(); // 在get()后不会进入非共享态,因此结果可以由多个线程取得.
// 可起到类似于条件变量的作用.
int main()
{
std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
std::shared_future<void> ready_future(ready_promise.get_future());
std::chrono::time_point<std::chrono::high_resolution_clock> start;
auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli>
{
t1_ready_promise.set_value();
ready_future.wait(); // waits for the signal from main()
return std::chrono::high_resolution_clock::now() - start;
};
auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli>
{
t2_ready_promise.set_value();
ready_future.wait(); // waits for the signal from main()
return std::chrono::high_resolution_clock::now() - start;
};
auto fut1 = t1_ready_promise.get_future();
auto fut2 = t2_ready_promise.get_future();
auto result1 = std::async(std::launch::async, fun1);
auto result2 = std::async(std::launch::async, fun2);
// wait for the threads to become ready
fut1.wait();
fut2.wait();
// the threads are ready, start the clock
start = std::chrono::high_resolution_clock::now();
// signal the threads to go
ready_promise.set_value();
std::cout << "Thread 1 received the signal "
<< result1.get().count() << " ms after start\n"
<< "Thread 2 received the signal "
<< result2.get().count() << " ms after start\n";
}

小结#

线程池在下一期单独介绍.

C++20-P1_标准库多线程设施简介
https://www.lithium-hydroxide.space/posts/250808_cpp_thread/
作者
LiH
发布于
2025-08-08
许可协议
CC BY-NC-SA 4.0