1. 项目概述

🔗 Github 源码地址https://github.com/Hai-Jun-Yan/linux-C-C--Thread/tree/main/C%2B%2B11-Thread

本项目旨在通过一系列核心示例,全面展示 C++11 标准库提供的现代多线程编程能力。项目涵盖了从基础的线程创建与管理,到复杂的线程同步、互斥锁机制,以及经典的生产者-消费者模型实现。

主要功能与目标:

  • 线程基础:掌握 std::thread 的创建、等待(join)与分离(detach)。
  • 结果传递:学习使用 std::promisestd::future 在线程间安全地传递数据。
  • 线程同步:深入理解 std::mutex 互斥锁及其 RAII 包装器(std::lock_guard, std::unique_lock)的使用。
  • 高级模型:手动实现读写锁机制,以及基于条件变量(std::condition_variable)的两种生产者-消费者模型(队列版与信号量版)。

应用场景:

  • 高并发服务器开发
  • 实时数据处理与后台任务调度
  • 利用多核 CPU 提升计算性能

2. 前置知识

在开始本项目之前,建议具备以下基础:

  • C++11 新特性:熟悉 Lambda 表达式、右值引用与移动语义(std::move)、RAII(资源获取即初始化)惯用手法。
  • 操作系统基础
    • 进程与线程:理解线程是操作系统调度的最小单位,共享进程的内存空间。
    • 并发与并行:区分多任务交替执行(并发)与多核同时执行(并行)。
    • 竞态条件 (Race Condition):理解多线程同时访问共享资源可能导致的数据不一致问题。
    • 死锁 (Deadlock):了解死锁产生的四个必要条件及预防方法。

3. API 详解

本项目主要涉及 <thread>, <mutex>, <future>, <condition_variable> 头文件中的核心组件。

组件 / API 核心作用 关键参数与说明
std::thread 线程对象 thread(Fn&& fn, Args&&... args): 构造时立即启动线程执行 fn
join(): 阻塞当前线程直到该线程结束。
detach(): 分离线程,使其在后台独立运行。
std::this_thread 当前线程工具 sleep_for(duration): 使当前线程休眠指定时长。
get_id(): 获取当前线程的唯一标识符。
std::mutex 互斥锁 lock(): 阻塞直到获得锁。
unlock(): 释放锁。
通常配合 lock_guard 使用以防忘记解锁。
std::lock_guard 锁的 RAII 包装 构造时加锁,析构时自动解锁。不可手动解锁,不可拷贝。
std::unique_lock 灵活的锁包装 lock_guard 更灵活,支持手动 lock/unlock,配合条件变量使用。
std::condition_variable 条件变量 wait(lock): 释放锁并挂起线程,直到被唤醒。
notify_one(): 唤醒一个等待线程。
notify_all(): 唤醒所有等待线程。
std::promise 承诺(发送端) 用于在线程的一端设置值 (set_value),该值可在另一端通过 future 获取。
std::future 未来(接收端) get(): 阻塞等待 promise 设置的值并获取它。

4. 核心代码实现

以下展示项目的七个核心模块,代码与源码完全一致。

4.1 线程创建与回收 (1.CreateThread.cpp)

演示最基础的线程创建,以及使用 join() 等待子线程结束,防止主线程提前退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <thread>
#include <chrono>

using namespace std;

/**
* C++11 线程示例 - 创建与回收
*
* 重点:
* 1. std::thread 代表一个线程对象。
* 2. join() 会阻塞主线程,直到子线程执行结束。
* 3. std::this_thread::get_id() 获取当前线程 ID。
*/

// 子线程的执行函数
void callback()
{
for (int i = 0; i < 5; i++)
{
cout << "子线程: " << i << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}
cout << "子线程结束-ID: " << this_thread::get_id() << endl;
}

int main()
{
// 1. 创建线程对象,绑定到回调函数
thread worker(callback);

/**
* 2. 线程等待(回收)
* 作用:主线程阻塞,直到子线程执行结束
* 目的:保证子线程先结束,避免主线程提前退出导致子线程异常中断
*/
worker.join();

// 3. 主线程继续执行自己的逻辑
for (int i = 0; i < 5; i++)
{
cout << "主线程: " << i << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}

cout << "主线程结束-ID: " << this_thread::get_id() << endl;

return 0;
}

4.2 线程返回值传递 (2.ExitAndJoinThread.cpp)

C++11 中不直接通过 join 获取返回值,而是推荐使用 promisefuture 机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

using namespace std;

struct Test
{
int time;
int data;
};

/**
* C++11 线程示例 - 线程返回值与 join
*
* POSIX 里可以 pthread_exit 返回指针。
* C++11 里常见做法是使用 promise/future 传递结果。
*/

void callback(promise<Test> resultPromise)
{
for (int i = 0; i < 5; i++)
{
cout << "子线程: " << i << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}

cout << "子线程结束-ID: " << this_thread::get_id() << endl;

Test result;
result.time = 10;
result.data = 100;
cout << "子线程: time=" << result.time << ", data=" << result.data << endl;

// 将结果发送给主线程
resultPromise.set_value(result);
}

int main()
{
// 1. 创建 promise/future,用于在线程间传递结果
promise<Test> resultPromise;
future<Test> resultFuture = resultPromise.get_future();

// 2. 创建子线程,并把 promise 以移动方式传入
thread worker(callback, move(resultPromise));

/**
* 3. 等待子线程结束
* join() 保证子线程执行完成,避免主线程提前退出。
*/
worker.join();

// 4. 读取线程执行结果
Test result = resultFuture.get();
cout << "主线程: time=" << result.time << ", data=" << result.data << endl;

// 5. 主线程继续执行自己的逻辑
for (int i = 0; i < 5; i++)
{
cout << "主线程: " << i << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}

cout << "主线程结束-ID: " << this_thread::get_id() << endl;

return 0;
}

4.3 线程分离 (3.DetachThread.cpp)

展示如何使用 detach() 让线程在后台独立运行,系统会自动回收资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <thread>
#include <chrono>

using namespace std;

/**
* C++11 线程示例 - detach
*
* detach() 会让线程进入“分离状态”,线程结束后系统自动回收资源。
* 分离后不能再 join,否则会抛出 std::system_error。
*/

void callback()
{
for (int i = 0; i < 5; i++)
{
cout << "子线程: " << i << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}
cout << "子线程结束-ID: " << this_thread::get_id() << endl;
}

int main()
{
// 1. 创建子线程
thread worker(callback);

/**
* 2. 线程分离
* 作用:让子线程独立运行,结束后自动回收资源
* 注意:一旦 detach,就不能再 join
*/
worker.detach();

// 3. 主线程继续执行自己的逻辑
for (int i = 0; i < 5; i++)
{
cout << "主线程: " << i << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}

cout << "主线程结束-ID: " << this_thread::get_id() << endl;

// 稍微等待,避免主线程过早退出导致子线程来不及输出
this_thread::sleep_for(chrono::milliseconds(100));

return 0;
}

4.4 互斥锁同步 (4.MutexThreadSynchronization.cpp)

使用 std::mutexstd::lock_guard 保护共享资源,防止数据竞争,并总结了死锁的常见场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

using namespace std;

#define MAX 50

// 全局共享变量
int number = 0;
mutex mtx;

/**
* C++11 线程示例 - 互斥锁同步
*
* 核心思路:
* 1. 使用 std::mutex 保护共享变量 number。
* 2. 使用 std::lock_guard 进行 RAII 加锁,确保异常或提前返回也能自动解锁。
*/

void funcA_num()
{
for (int i = 0; i < MAX; ++i)
{
lock_guard<mutex> lock(mtx);
int cur = number;
cur++;
number = cur;
cout << "Thread A, id = " << this_thread::get_id() << ", number = " << number << endl;
}
}

void funcB_num()
{
for (int i = 0; i < MAX; ++i)
{
lock_guard<mutex> lock(mtx);
int cur = number;
cur++;
number = cur;
cout << "Thread B, id = " << this_thread::get_id() << ", number = " << number << endl;
}
}

int main()
{
thread t1(funcA_num);
thread t2(funcB_num);

// 等待线程结束
t1.join();
t2.join();

return 0;
}

/**
* 【死锁情况总结】
*
* 情况 1:忘记释放锁 (Forgetting to unlock)
* --------------------------------------
* 手动 lock 后在中途 return、break 或发生异常,导致 unlock 未被执行。
* 结果:锁永远被占用,其他线程永久阻塞。
* 解决:使用 std::lock_guard / std::unique_lock 进行自动释放。
*
* 情况 2:重复加锁 (Self-Deadlock)
* --------------------------------------
* 同一个线程在没释放锁的情况下再次锁同一把互斥锁。
* void func() {
* mtx.lock();
* // ... 执行逻辑 ...
* mtx.lock(); // 这里会永久阻塞,因为锁已被自己持有
* mtx.unlock();
* }
* 解决:避免重入,或使用 std::recursive_mutex。
*
* 情况 3:多线程多锁资源竞争 (Circular Wait)
* --------------------------------------
* 线程 A 拿着锁 1 等锁 2,线程 B 拿着锁 2 等锁 1。
* 线程 A: lock(mutex1) -> lock(mutex2)
* 线程 B: lock(mutex2) -> lock(mutex1)
* 结果:两个线程互相等待,形成环路死锁。
* 解决:
* 1. 强制所有线程按相同顺序加锁(例如:先锁 1 再锁 2)。
* 2. 使用 std::try_lock 尝试加锁,失败则释放已占有的锁并重试。
*/

/**
* 【如何确定需要几把锁】
* 1. 一般来说,有几个资源就需要几把锁。
* 例如:有 2 个资源,就需要 2 把锁。
*
* 2. 如果多个变量必须同时改变才算有效,则必须共用同一把锁。
* 例如:变量 a、b 必须同时更新才能保持一致性,那么它们需要同一把锁。
*
* 3. 多线程访问多个资源时,优先保证资源边界清晰,
* 如果资源有强关联,合并为一把锁更安全但并发度更低。
*/

4.5 读写锁实现 (5.ReadWriteLock.cpp)

C++11 标准库(直到 C++17 才有 shared_mutex)没有原生读写锁。这里展示了如何用 mutex + condition_variable 手动实现一个“写优先”的读写锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <chrono>

using namespace std;

#define MAX 50

// 全局共享变量
int number = 0;

/**
* C++11 线程示例 - 读写锁(用互斥锁 + 条件变量模拟)
*
* 说明:
* C++11 标准库没有原生读写锁,这里用 mutex + condition_variable 实现一个简化版本。
* 读写锁规则:
* 1. 读读并行
* 2. 读写互斥
* 3. 写写互斥
* 4. 写优先,避免写线程长期饥饿
*/

class ReadWriteLock
{
public:
void lock_read()
{
unique_lock<mutex> lock(mtx);
// 写优先:如果有写线程等待,则新来的读线程也要等待
cv.wait(lock, [this]() { return !writer_active && waiting_writers == 0; });
++reader_count;
}

void unlock_read()
{
unique_lock<mutex> lock(mtx);
--reader_count;
if (reader_count == 0)
{
cv.notify_all();
}
}

void lock_write()
{
unique_lock<mutex> lock(mtx);
++waiting_writers;
cv.wait(lock, [this]() { return !writer_active && reader_count == 0; });
--waiting_writers;
writer_active = true;
}

void unlock_write()
{
unique_lock<mutex> lock(mtx);
writer_active = false;
cv.notify_all();
}

private:
mutex mtx;
condition_variable cv;
int reader_count = 0;
int waiting_writers = 0;
bool writer_active = false;
};

ReadWriteLock rwlock;

void write_num()
{
for (int i = 0; i < MAX; ++i)
{
rwlock.lock_write();
int cur = number;
cur++;
number = cur;
cout << "Write, id = " << this_thread::get_id() << ", number = " << number << endl;
rwlock.unlock_write();
this_thread::sleep_for(chrono::milliseconds(5));
}
}

void read_num()
{
for (int i = 0; i < MAX; ++i)
{
rwlock.lock_read();
cout << "Read, id = " << this_thread::get_id() << ", cur = " << number << endl;
rwlock.unlock_read();
this_thread::sleep_for(chrono::milliseconds(5));
}
}

int main()
{
// 3 个写线程,5 个读线程
vector<thread> writers;
vector<thread> readers;

for (int i = 0; i < 3; ++i)
{
writers.emplace_back(write_num);
}
for (int i = 0; i < 5; ++i)
{
readers.emplace_back(read_num);
}

for (auto& t : writers) t.join();
for (auto& t : readers) t.join();

return 0;
}

/**
* 【读写锁 (Read-Write Lock) 深度解析】
*
* 1. 为什么需要读写锁?
* --------------------------------------
* 互斥锁 (Mutex) 的问题:无论读写,一律串行。即使两个线程只是想读取数据,也必须排队,这在“读多写少”的场景下效率极低。
* 读写锁的优势:允许“读-读”并行。这点是最重要的,因为很多场景中读操作比写操作多得多。
*
* 2. 读写锁的八字口诀:
* --------------------------------------
* “读读并行,读写互斥,写写互斥”
* - 只要没有线程在“写”,任意数量的线程都可以同时“读”。
* - 只要有人在“写”,其他人(无论是读还是写)都必须等待。
*
* 3. 读写锁的独特意义(与互斥锁的区别):
* --------------------------------------
* | 锁类型 | 读-读 | 读-写 | 写-写 | 适用场景 |
* | :------- | :--- | :--- | :--- | :---------------- |
* | 互斥锁 | 串行 | 串行 | 串行 | 读写频率差不多的场景 |
* | 读写锁 | 并行 | 串行 | 串行 | 读多写少的场景 |
*
* 4. 写优先策略:
* --------------------------------------
* 如果持续不断地有读线程进入,写线程可能会长时间无法获取写锁。
* 写优先的实现方式:当有写线程等待时,新的读线程先阻塞,保证写线程尽快执行。
*/

4.6 条件变量生产者-消费者 (6.CondMutexThreadProducerConsumer.cpp)

最经典的生产者-消费者模型实现。使用 condition_variablewaitnotify 机制来协调线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

using namespace std;

/**
* 生产者消费者模型 - C++11 条件变量版
*
* 核心组件:
* 1. std::mutex:保护共享队列,确保同一时刻只有一个线程修改队列。
* 2. std::condition_variable:线程同步工具,负责“等待”和“唤醒”。
* - not_full:队列满时生产者等待
* - not_empty:队列空时消费者等待
*/

#define QUEUE_SIZE 5
#define PRODUCER_COUNT 2
#define CONSUMER_COUNT 2
#define PRODUCE_EACH 10

queue<int> buffer;
mutex mtx;
condition_variable not_empty;
condition_variable not_full;

void producer(int id)
{
for (int i = 0; i < PRODUCE_EACH; ++i)
{
// 1. 进入临界区前先加锁
unique_lock<mutex> lock(mtx);

// 2. 如果队列已满,则等待消费者取走数据
// 必须用 while 而不是 if,避免“虚假唤醒”
while (buffer.size() >= QUEUE_SIZE)
{
// wait() 会释放锁并让线程休眠,被唤醒后再自动重新加锁
not_full.wait(lock);
}

// 3. 生产数据并放入队列
int value = id * 100 + i;
buffer.push(value);
cout << "生产者 " << id << " 放入数据: " << value << ", 当前队列大小: " << buffer.size() << endl;

// 4. 通知消费者:队列不再为空
not_empty.notify_one();

// 5. 解锁,给其他线程机会
lock.unlock();

// 6. 模拟生产耗时
this_thread::sleep_for(chrono::milliseconds(10));
}
}

void consumer(int id)
{
for (int i = 0; i < PRODUCE_EACH; ++i)
{
// 1. 进入临界区前先加锁
unique_lock<mutex> lock(mtx);

// 2. 如果队列为空,则等待生产者放入数据
// 同样使用 while 防止虚假唤醒
while (buffer.empty())
{
not_empty.wait(lock);
}

// 3. 取走数据
int value = buffer.front();
buffer.pop();
cout << "消费者 " << id << " 取走数据: " << value << ", 当前队列大小: " << buffer.size() << endl;

// 4. 通知生产者:队列有空位了
not_full.notify_one();

// 5. 解锁
lock.unlock();

// 6. 模拟消费耗时
this_thread::sleep_for(chrono::milliseconds(15));
}
}

int main()
{
thread producers[PRODUCER_COUNT];
thread consumers[CONSUMER_COUNT];

for (int i = 0; i < PRODUCER_COUNT; ++i)
{
producers[i] = thread(producer, i + 1);
}
for (int i = 0; i < CONSUMER_COUNT; ++i)
{
consumers[i] = thread(consumer, i + 1);
}

for (int i = 0; i < PRODUCER_COUNT; ++i) producers[i].join();
for (int i = 0; i < CONSUMER_COUNT; ++i) consumers[i].join();

cout << "所有生产和消费任务已完成。" << endl;

return 0;
}

4.7 信号量生产者-消费者 (7.SignalMutexThreadProducerConsumer.cpp)

C++11 同样没有原生信号量。这里展示了如何封装一个 Semaphore 类,并用它来简化生产者-消费者的同步逻辑(无需显式管理锁和条件变量的 notify)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

using namespace std;

/**
* 生产者消费者模型 - C++11 信号量版
*
* C++11 没有原生信号量,这里用 mutex + condition_variable 模拟一个计数信号量。
* 信号量是“有状态”的计数器:
* - post() 让计数 +1
* - wait() 让计数 -1,如果计数为 0 则阻塞
*/

class Semaphore
{
public:
explicit Semaphore(int count) : count_(count) {}

void wait()
{
unique_lock<mutex> lock(mtx_);
// 计数为 0 表示没有可用资源,必须等待
// 使用 while 防止虚假唤醒
while (count_ == 0)
{
cv_.wait(lock);
}
// 成功拿到一个资源,计数减 1
--count_;
}

void post()
{
unique_lock<mutex> lock(mtx_);
// 释放一个资源,计数加 1
++count_;
// 唤醒一个等待者
cv_.notify_one();
}

private:
mutex mtx_;
condition_variable cv_;
int count_;
};

#define QUEUE_SIZE 5
#define PRODUCER_COUNT 2
#define CONSUMER_COUNT 2
#define PRODUCE_EACH 10

queue<int> buffer;
mutex mtx;
Semaphore sem_empty(QUEUE_SIZE);
Semaphore sem_full(0);

void producer(int id)
{
for (int i = 0; i < PRODUCE_EACH; ++i)
{
// 1. 申请空位,空位不足则阻塞等待
sem_empty.wait();

// 2. 互斥访问队列,确保 push 操作安全
{
lock_guard<mutex> lock(mtx);
int value = id * 100 + i;
buffer.push(value);
cout << "生产者 " << id << " 放入数据: " << value << ", 当前队列大小: " << buffer.size() << endl;
}

// 3. 增加产品数量,唤醒消费者
sem_full.post();

// 4. 模拟生产耗时
this_thread::sleep_for(chrono::milliseconds(10));
}
}

void consumer(int id)
{
for (int i = 0; i < PRODUCE_EACH; ++i)
{
// 1. 申请产品,产品不足则阻塞等待
sem_full.wait();

// 2. 互斥访问队列,确保 pop 操作安全
{
lock_guard<mutex> lock(mtx);
int value = buffer.front();
buffer.pop();
cout << "消费者 " << id << " 取走数据: " << value << ", 当前队列大小: " << buffer.size() << endl;
}

// 3. 增加空位数量,唤醒生产者
sem_empty.post();

// 4. 模拟消费耗时
this_thread::sleep_for(chrono::milliseconds(15));
}
}

int main()
{
thread producers[PRODUCER_COUNT];
thread consumers[CONSUMER_COUNT];

for (int i = 0; i < PRODUCER_COUNT; ++i)
{
producers[i] = thread(producer, i + 1);
}
for (int i = 0; i < CONSUMER_COUNT; ++i)
{
consumers[i] = thread(consumer, i + 1);
}

for (int i = 0; i < PRODUCER_COUNT; ++i) producers[i].join();
for (int i = 0; i < CONSUMER_COUNT; ++i) consumers[i].join();

cout << "所有生产和消费任务已完成。" << endl;

return 0;
}