1. 项目概述

🔗 Github 源码地址https://github.com/Hai-Jun-Yan/linux-C-C--Thread/tree/main/Posix-Thread

本项目是一个基于 C++POSIX Threads (pthread) 库的多线程编程实战练习集。通过七个循序渐进的示例代码,系统展示了在 Linux/Unix 环境下进行多线程开发的核心技术路径。

项目涵盖的核心能力:

  • 基础管理:线程的创建、退出、回收(Join)与分离(Detach)。
  • 资源同步:使用互斥锁(Mutex)解决临界区竞态条件,以及死锁的成因与预防。
  • 性能优化:使用读写锁(Read-Write Lock)优化“读多写少”场景下的并发性能。
  • 经典模型:基于条件变量(Condition Variable)和信号量(Semaphore)实现生产者-消费者模型。

应用场景:本项目的技术栈广泛应用于 Linux 服务器后端开发、嵌入式系统、高性能计算(HPC)以及任何需要充分利用多核 CPU 性能的 C++ 项目中。

2. 前置知识

在深入代码之前,你需要掌握以下核心概念:

  • 进程 vs 线程
    • 进程是资源分配的最小单位(拥有独立的内存空间)。
    • 线程是 CPU 调度的最小单位(共享进程的内存空间)。
  • 并发 (Concurrency) vs 并行 (Parallelism)
    • 并发是逻辑上的同时处理(时间片轮转)。
    • 并行是物理上的同时执行(多核 CPU)。
  • 竞态条件 (Race Condition):当多个线程同时访问并修改共享数据时,结果依赖于执行顺序的现象。
  • 同步原语:用于协调线程执行顺序的机制,如锁、信号量、条件变量。

3. API 详解

本项目主要使用了以下 POSIX 线程标准接口(需包含 <pthread.h>):

API 函数 核心作用 关键参数说明
pthread_create 创建新线程 pthread_t *thread: 线程ID指针
void *(*start_routine) (void *): 线程入口函数
void *arg: 传递给入口函数的参数
pthread_join 等待线程结束 pthread_t thread: 目标线程ID
void **retval: 接收线程返回值的二级指针
pthread_detach 分离线程 pthread_t thread: 目标线程ID
分离后线程资源由系统自动回收,无需 join
pthread_exit 退出当前线程 void *retval: 线程退出时的返回值
pthread_mutex_lock 互斥锁加锁 pthread_mutex_t *mutex: 互斥锁指针
若已被锁,当前线程阻塞
pthread_mutex_unlock 互斥锁解锁 pthread_mutex_t *mutex: 互斥锁指针
pthread_rwlock_rdlock 读写锁-读加锁 允许多个读线程同时进入,但阻塞写线程
pthread_rwlock_wrlock 读写锁-写加锁 独占锁,阻塞其他所有读和写线程
pthread_cond_wait 条件变量等待 pthread_cond_t *cond: 条件变量
pthread_mutex_t *mutex: 关联的互斥锁
原子操作:解锁 -> 等待 -> 被唤醒 -> 加锁
pthread_cond_signal 唤醒一个等待者 pthread_cond_t *cond: 条件变量
sem_wait 信号量 P 操作 sem_t *sem: 信号量
计数器减1,若为0则阻塞
sem_post 信号量 V 操作 sem_t *sem: 信号量
计数器加1,唤醒等待者

4. 核心代码实现

4.1 线程基础:创建与传参

文件1.CreateThread.cpp
展示了最基本的线程创建流程,以及如何给线程传递参数(虽然此例传了 NULL)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <iostream>
#include <string.h>
#include <pthread.h> // POSIX 线程库头文件,多线程编程必须包含
#include <unistd.h> // 提供 sleep 等系统调用函数

using namespace std;

/**
* 子线程的回调函数
* 1. 返回值必须是 void*
* 2. 参数必须是 void*
*/
void* callback(void* arg)
{
for (int i = 0; i < 5; i++)
{
cout << "子线程: "<< i << endl;
}
// pthread_self() 获取当前线程的 ID
cout << "子线程结束-ID:" << pthread_self() << endl;
return NULL; // 线程执行完毕,返回空指针
}

int main()
{
// 1. 定义一个线程 ID 变量
pthread_t tid;

/**
* 2. 创建子线程
* 参数1: &tid -> 存储创建成功的线程 ID
* 参数2: NULL -> 线程属性,通常传 NULL 表示使用默认属性
* 参数3: callback -> 线程运行的入口函数(回调函数)
* 参数4: NULL -> 传递给回调函数的参数
*/
pthread_create(&tid, NULL, callback, NULL);

/**
* 3. 线程等待(回收)
* 作用:主线程会阻塞在这里,直到 tid 指定的子线程执行结束
* 目的:1. 保证子线程先执行完;2. 回收子线程资源,防止内存泄漏
*/
pthread_join(tid, NULL);

// 4. 主线程继续执行自己的逻辑
for (int i = 0; i < 5; i++)
{
cout << "主线程: "<< i << endl;
}

cout << "主线程结束-ID:" << pthread_self() << endl;

return 0;
}

4.2 线程交互:退出与获取返回值

文件2.ExitAndJoinThread.cpp
演示了子线程如何通过 pthread_exit 返回数据,以及主线程如何通过 pthread_join 获取该数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <iostream>
#include <string.h>
#include <pthread.h> // POSIX 线程库头文件,多线程编程必须包含
#include <unistd.h> // 提供 sleep 等系统调用函数

using namespace std;
struct Test
{
int time;
int data;
/* data */
};


struct Test *test = (struct Test*)malloc(sizeof(struct Test));

/**
* 子线程的回调函数
* 1. 返回值必须是 void*
* 2. 参数必须是 void*
*/
void* callback(void* arg)
{
for (int i = 0; i < 5; i++)
{
cout << "子线程: "<< i << endl;
}
// pthread_self() 获取当前线程的 ID
cout << "子线程结束-ID:" << pthread_self() << endl;
test->time = 10;
test->data = 100;
cout << "子线程: time=" << test->time << ", data=" << test->data << endl;
// 子线程退出
pthread_exit(test);

return NULL; // 线程执行完毕,返回空指针
}

int main()
{
// 1. 定义一个线程 ID 变量
pthread_t tid;



/**
* 2. 创建子线程
* 参数1: &tid -> 存储创建成功的线程 ID
* 参数2: NULL -> 线程属性,通常传 NULL 表示使用默认属性
* 参数3: callback -> 线程运行的入口函数(回调函数)
* 参数4: NULL -> 传递给回调函数的参数
*/
pthread_create(&tid, NULL, callback, NULL);

/**
* 3. 线程等待(回收)
* 作用:主线程会阻塞在这里,直到 tid 指定的子线程执行结束
* 目的:1. 保证子线程先执行完;2. 回收子线程资源,防止内存泄漏
*/
pthread_join(tid, (void**)&test);
cout << "主线程: time=" << test->time << ", data=" << test->data << endl;

// 4. 主线程继续执行自己的逻辑
for (int i = 0; i < 5; i++)
{
cout << "主线程: "<< i << endl;
}

cout << "主线程结束-ID:" << pthread_self() << endl;

return 0;
}

4.3 线程属性:线程分离

文件3.DetachThread.cpp
展示了如何让线程“自生自灭”,无需主线程等待回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <iostream>
#include <string.h>
#include <pthread.h> // POSIX 线程库头文件,多线程编程必须包含
#include <unistd.h> // 提供 sleep 等系统调用函数

using namespace std;

/**
* 子线程的回调函数
* 1. 返回值必须是 void*
* 2. 参数必须是 void*
*/
void* callback(void* arg)
{
for (int i = 0; i < 5; i++)
{
cout << "子线程: "<< i << endl;
}
// pthread_self() 获取当前线程的 ID
cout << "子线程结束-ID:" << pthread_self() << endl;

return NULL; // 线程执行完毕,返回空指针
}

int main()
{
// 1. 定义一个线程 ID 变量
pthread_t tid;



/**
* 2. 创建子线程
* 参数1: &tid -> 存储创建成功的线程 ID
* 参数2: NULL -> 线程属性,通常传 NULL 表示使用默认属性
* 参数3: callback -> 线程运行的入口函数(回调函数)
* 参数4: NULL -> 传递给回调函数的参数
*/
pthread_create(&tid, NULL, callback, NULL);

/**
* 线程分离
* 作用:将 tid 指定的子线程设置为分离状态,当子线程执行结束后,系统会自动回收其资源
* 注意:1. 只能对未被等待的子线程进行分离操作;2. 一旦分离,就不能再使用 pthread_join 等待子线程结束, 也就是说pthread_join失效了
*/
pthread_detach(tid);
/**
* 3. 线程等待(回收)
* 作用:主线程会阻塞在这里,直到 tid 指定的子线程执行结束
* 目的:1. 保证子线程先执行完;2. 回收子线程资源,防止内存泄漏
*/
pthread_join(tid, NULL);

// 4. 主线程继续执行自己的逻辑
for (int i = 0; i < 5; i++)
{
cout << "主线程: "<< i << endl;
}

cout << "主线程结束-ID:" << pthread_self() << endl;

return 0;
}

4.4 线程同步:互斥锁与死锁详解

文件4.MutexThreadSynchronization.cpp
这是并发编程中最重要的一课。代码展示了如何使用 pthread_mutex_t 保护全局变量 number,并在注释中详细分析了死锁的三种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <atomic>

using namespace std;

#define MAX 50
// 全局变量
int number = 0;

// pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex;

// 线程处理函数
void* funcA_num(void* arg)
{
for(int i=0; i<MAX; ++i)
{
pthread_mutex_lock(&mutex);
int cur = number;
cur++;
number = cur;
cout << "Thread A, id = " << pthread_self() << ", number = " << number << endl;
pthread_mutex_unlock(&mutex);
}

return NULL;
}

void* funcB_num(void* arg)
{
for(int i=0; i<MAX; ++i)
{
pthread_mutex_lock(&mutex);
int cur = number;
cur++;
number = cur;
cout << "Thread B, id = " << pthread_self() << ", number = " << number << endl;
pthread_mutex_unlock(&mutex);
}

return NULL;
}

int main(int argc, const char* argv[])
{
pthread_mutex_init(&mutex, NULL);

pthread_t p1, p2;

// 创建两个子线程
pthread_create(&p1, NULL, funcA_num, NULL);
pthread_create(&p2, NULL, funcB_num, NULL);

// 阻塞,资源回收
pthread_join(p1, NULL);
pthread_join(p2, NULL);

// 销毁互斥锁
pthread_mutex_destroy(&mutex);

return 0;
}

/**
* 【死锁情况总结】
*
* 情况 1:忘记释放锁 (Forgetting to unlock)
* --------------------------------------
* 代码逻辑中途 return、break 或发生异常,导致 pthread_mutex_unlock 未被执行。
* 结果:锁永远被占用,其他等待该锁的线程将永久阻塞。
* 解决:确保所有退出路径都释放锁,或使用 C++ RAII 机制(如 std::lock_guard)。
*
* 情况 2:重复加锁 (Self-Deadlock)
* --------------------------------------
* 同一个线程在没释放锁的情况下,再次尝试对同一个互斥锁加锁。
* void func() {
* pthread_mutex_lock(&mutex);
* // ... 执行逻辑 ...
* pthread_mutex_lock(&mutex); // 这里会永久阻塞,因为锁被自己占着,自己又在等自己释放
* pthread_mutex_unlock(&mutex);
* }
* 解决:检查逻辑避免重入,或使用递归锁 (PTHREAD_MUTEX_RECURSIVE)。
*
* 情况 3:多线程多锁资源竞争 (Circular Wait)
* --------------------------------------
* 线程 A 拿着锁 1 等锁 2,线程 B 拿着锁 2 等锁 1。
* 线程 A: lock(mutex1) -> lock(mutex2)
* 线程 B: lock(mutex2) -> lock(mutex1)
* 结果:两个线程互相等待对方释放资源,形成环路死锁。
* 解决:
* 1. 强制所有线程按相同的顺序加锁(例如:永远先锁 1 再锁 2)。
* 2. 使用 pthread_mutex_trylock 尝试加锁,失败则释放已占有的锁并重试。
*/

/**
* 【如何确定需要几把锁】
* 1. 一般来说,有几个资源就需要几把锁。
* 例如:有 2 个资源,就需要 2 把锁。
*
* 2. 如果说如果多个变量必须 同时改变 才算有效,那么它们必须共用 一把锁。
* 例如:有 2 个变量 a, b,必须同时改变 a, b 才算有效,那么它们必须共用一把锁。
*
* 3. 如果说如果多个线程,有多个资源
* 例如:有 2 个变量 a, b,必须同时改变 a, b 才算有效,那么它们必须共用一把锁。
*
*/

4.5 进阶同步:读写锁

文件5.ReadWriteLock.cpp
展示了在“读多写少”场景下,如何使用 pthread_rwlock 提高并发效率。代码中模拟了 3 个写线程和 5 个读线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <atomic>

using namespace std;

#define MAX 50
// 全局变量
int number = 0;

// pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex;

// 线程处理函数
void* write_num(void* arg)
{
for(int i=0; i<MAX; ++i)
{
pthread_rwlock_wrlock(&mutex);
int cur = number;
cur++;
number = cur;
cout << "Write, id = " << pthread_self() << ", number = " << number << endl;
pthread_rwlock_unlock(&mutex);
}

return NULL;
}

void* read_num(void* arg)
{
for(int i=0; i<MAX; ++i)
{
pthread_rwlock_rdlock(&mutex);
int cur = number;
cout << "Read, id = " << pthread_self() << ", cur = " << number << endl;
pthread_rwlock_unlock(&mutex);
}

return NULL;
}

int main(int argc, const char* argv[])
{
pthread_rwlock_init(&mutex, NULL);

// 3个写线程,5个读线程
pthread_t p1[3], p2[5];

for (int i = 0; i < 3; ++i)
{
pthread_create(&p1[i], NULL, write_num, NULL);
}

for (int i = 0; i < 5; ++i)
{
pthread_create(&p2[i], NULL, read_num, NULL);
}

// 等待所有线程执行完毕
for (int i = 0; i < 3; ++i)
{
pthread_join(p1[i], NULL);
}

for (int i = 0; i < 5; ++i)
{
pthread_join(p2[i], NULL);
}


// 销毁读写锁
pthread_rwlock_destroy(&mutex);

return 0;
}

/**
* 【读写锁 (Read-Write Lock) 深度解析】
*
* 1. 为什么需要读写锁?
* --------------------------------------
* 互斥锁 (Mutex) 的问题:无论读写,一律串行。即使两个线程只是想读取数据,也必须排队,这在“读多写少”的场景下效率极低。
* 读写锁的优势:允许“读-读”并行。这点是最重要的,因为很多场景中读操作比写操作多得多。
*
* 2. 读写锁的八字口诀:
* --------------------------------------
* “读读并行,读写互斥,写写互斥”
* - 只要没有线程在“写”,任意数量的线程都可以同时“读”。
* - 只要有人在“写”,其他人(无论是读还是写)都必须等待。
*
* 3. 读写锁的独特意义(与互斥锁的区别):
* --------------------------------------
* | 锁类型 | 读-读 | 读-写 | 写-写 | 适用场景 |
* | :------- | :--- | :--- | :--- | :---------------- |
* | 互斥锁 | 串行 | 串行 | 串行 | 读写频率差不多的场景 |
* | 读写锁 | 并行 | 串行 | 串行 | 读多写少的场景 |
*
* 4. 读写锁的优先级(写优先):
* --------------------------------------
* 当读写锁处于“读模式”锁定状态时,如果有线程尝试以“写模式”加锁,
* 读写锁通常会阻塞后续所有的“读模式”请求,直到写线程完成。
* 这样做的目的是防止“写饥饿”(即大量的读操作导致写操作永远抢不到锁)。
*
* 5. 适用场景举例:
* --------------------------------------
* - 数据库缓存:数千人查数据,偶尔有人改数据。
* - 配置信息:程序运行期间一直被读取,只有极少数情况下会热更新配置。
*/

4.6 生产者-消费者模型:条件变量版

文件6.CondMutexThreadProducerConsumer.cpp
经典模型实现。通过 not_fullnot_empty 两个条件变量,实现了线程间的精细协作。特别注意 while 循环的使用,是为了防止虚假唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

using namespace std;

/**
* 生产者消费者模型 - 条件变量版
*
* 核心组件:
* 1. 互斥锁 (Mutex):保护共享资源(队列),确保同一时间只有一个线程操作队列。
* 2. 条件变量 (Condition Variable):用于线程间的同步。
* - not_full:当队列满时,生产者在此等待;当消费者取走数据时,通知此变量。
* - not_empty:当队列空时,消费者在此等待;当生产者放入数据时,通知此变量。
*/

#define QUEUE_SIZE 5 // 队列最大容量
#define PRODUCER_COUNT 2 // 生产者数量
#define CONSUMER_COUNT 2 // 消费者数量
#define PRODUCE_EACH 10 // 每个线程生产/消费的次数

queue<int> buffer; // 共享资源:数据队列
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t not_empty; // 队列不为空的条件
pthread_cond_t not_full; // 队列不为满的条件

// 生产者线程函数
void* producer(void* arg)
{
int id = *(int*)arg;
for (int i = 0; i < PRODUCE_EACH; ++i)
{
// 1. 操作共享资源前必须先加锁
pthread_mutex_lock(&mutex);

// 2. 检查队列是否已满
// 注意:必须使用 while 而不是 if,为了防止“虚假唤醒” (Spurious Wakeup)
// 虚假唤醒指线程在没有收到信号的情况下被唤醒,或者唤醒后条件仍不满足
while (buffer.size() >= QUEUE_SIZE)
{
// 队列满,生产者阻塞等待。
// pthread_cond_wait 会自动做三件事:
// a) 解锁 mutex;b) 线程挂起等待;c) 被唤醒后自动重新加锁 mutex
pthread_cond_wait(&not_full, &mutex);
}

// 3. 生产数据
int value = id * 100 + i;
buffer.push(value);
cout << "生产者 " << id << " 放入数据: " << value << ", 当前队列大小: " << buffer.size() << endl;

// 4. 通知消费者:现在队列里有东西了,可以来消费了
pthread_cond_signal(&not_empty);

// 5. 解锁
pthread_mutex_unlock(&mutex);

// 稍微休眠,模拟生产耗时,也给其他线程竞争的机会
usleep(1000 * 10);
}
return NULL;
}

// 消费者线程函数
void* consumer(void* arg)
{
int id = *(int*)arg;
for (int i = 0; i < PRODUCE_EACH; ++i)
{
// 1. 加锁
pthread_mutex_lock(&mutex);

// 2. 检查队列是否为空
// 同样使用 while 防止虚假唤醒
while (buffer.empty())
{
// 队列空,消费者阻塞等待生产者通知
pthread_cond_wait(&not_empty, &mutex);
}

// 3. 消费数据
int value = buffer.front();
buffer.pop();
cout << "消费者 " << id << " 取走数据: " << value << ", 当前队列大小: " << buffer.size() << endl;

// 4. 通知生产者:现在队列有空位了,可以继续生产了
pthread_cond_signal(&not_full);

// 5. 解锁
pthread_mutex_unlock(&mutex);

// 稍微休眠,模拟消费耗时
usleep(1000 * 15);
}
return NULL;
}

int main()
{
// 初始化互斥锁和条件变量
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&not_empty, NULL);
pthread_cond_init(&not_full, NULL);

pthread_t producers[PRODUCER_COUNT];
pthread_t consumers[CONSUMER_COUNT];
int producer_ids[PRODUCER_COUNT];
int consumer_ids[CONSUMER_COUNT];

// 创建生产者线程
for (int i = 0; i < PRODUCER_COUNT; ++i)
{
producer_ids[i] = i + 1;
pthread_create(&producers[i], NULL, producer, &producer_ids[i]);
}

// 创建消费者线程
for (int i = 0; i < CONSUMER_COUNT; ++i)
{
consumer_ids[i] = i + 1;
pthread_create(&consumers[i], NULL, consumer, &consumer_ids[i]);
}

// 等待所有线程结束
for (int i = 0; i < PRODUCER_COUNT; ++i)
{
pthread_join(producers[i], NULL);
}
for (int i = 0; i < CONSUMER_COUNT; ++i)
{
pthread_join(consumers[i], NULL);
}

// 销毁资源
pthread_cond_destroy(&not_empty);
pthread_cond_destroy(&not_full);
pthread_mutex_destroy(&mutex);

cout << "所有生产和消费任务已完成。" << endl;

return 0;
}

4.7 生产者-消费者模型:信号量版

文件7.SignalMutexThreadProducerConsumer.cpp
展示了如何用信号量(Semaphore)简化资源计数管理。通过 sem_emptysem_full 直接控制资源的数量,代码逻辑比条件变量版更直观。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <iostream>
#include <pthread.h>
#include <queue>
#include <semaphore.h> // 信号量头文件
#include <unistd.h>

using namespace std;

/**
* 生产者消费者模型 - 信号量版
*
* 核心组件:
* 1. 互斥锁 (Mutex):用于保护队列的原子操作(push/pop),防止多个线程同时操作队列导致崩溃。
* 2. 信号量 (Semaphore):用于管理资源的“数量”。
* - sem_empty:代表“空位”的数量。初始化为队列容量。
* - sem_full:代表“产品”的数量。初始化为 0。
*
* 信号量与条件变量的区别:
* - 信号量是有状态的计数器。即使先发布(post)后等待(wait),信号也不会丢失。
* - 信号量非常适合管理“固定数量的资源”。
*/

#define QUEUE_SIZE 5 // 队列最大容量
#define PRODUCER_COUNT 2 // 生产者数量
#define CONSUMER_COUNT 2 // 消费者数量
#define PRODUCE_EACH 10 // 每个线程生产/消费的次数

queue<int> buffer; // 共享资源:数据队列
pthread_mutex_t mutex; // 互斥锁
sem_t sem_empty; // 空位信号量
sem_t sem_full; // 产品信号量

// 生产者线程函数
void* producer(void* arg)
{
int id = *(int*)arg;
for (int i = 0; i < PRODUCE_EACH; ++i)
{
// 1. 申请一个“空位” (P操作)
// 如果 sem_empty > 0,则减1并继续;如果为 0,则阻塞等待。
sem_wait(&sem_empty);

// 2. 加锁保护队列操作
pthread_mutex_lock(&mutex);

// 3. 生产数据并放入队列
int value = id * 100 + i;
buffer.push(value);
cout << "生产者 " << id << " 放入数据: " << value << ", 当前队列大小: " << buffer.size() << endl;

// 4. 解锁
pthread_mutex_unlock(&mutex);

// 5. 发布一个“产品”信号 (V操作)
// sem_full 计数加1,唤醒正在等待产品的消费者。
sem_post(&sem_full);

// 稍微休眠,模拟生产耗时
usleep(1000 * 10);
}
return NULL;
}

// 消费者线程函数
void* consumer(void* arg)
{
int id = *(int*)arg;
for (int i = 0; i < PRODUCE_EACH; ++i)
{
// 1. 申请一个“产品” (P操作)
// 如果 sem_full > 0,则减1并继续;如果为 0,则阻塞等待。
sem_wait(&sem_full);

// 2. 加锁保护队列操作
pthread_mutex_lock(&mutex);

// 3. 从队列取走数据
int value = buffer.front();
buffer.pop();
cout << "消费者 " << id << " 取走数据: " << value << ", 当前队列大小: " << buffer.size() << endl;

// 4. 解锁
pthread_mutex_unlock(&mutex);

// 5. 发布一个“空位”信号 (V操作)
// sem_empty 计数加1,唤醒正在等待空位的生产者。
sem_post(&sem_empty);

// 稍微休眠,模拟消费耗时
usleep(1000 * 15);
}
return NULL;
}

int main()
{
// 初始化互斥锁
pthread_mutex_init(&mutex, NULL);

// 初始化信号量
// 参数2:0 表示线程间共享,非0表示进程间共享
// 参数3:初始值。empty 初始化为队列大小,full 初始化为 0
sem_init(&sem_empty, 0, QUEUE_SIZE);
sem_init(&sem_full, 0, 0);

pthread_t producers[PRODUCER_COUNT];
pthread_t consumers[CONSUMER_COUNT];
int producer_ids[PRODUCER_COUNT];
int consumer_ids[CONSUMER_COUNT];

// 创建生产者和消费者线程
for (int i = 0; i < PRODUCER_COUNT; ++i)
{
producer_ids[i] = i + 1;
pthread_create(&producers[i], NULL, producer, &producer_ids[i]);
}
for (int i = 0; i < CONSUMER_COUNT; ++i)
{
consumer_ids[i] = i + 1;
pthread_create(&consumers[i], NULL, consumer, &consumer_ids[i]);
}

// 回收线程资源
for (int i = 0; i < PRODUCER_COUNT; ++i) pthread_join(producers[i], NULL);
for (int i = 0; i < CONSUMER_COUNT; ++i) pthread_join(consumers[i], NULL);

// 销毁资源
sem_destroy(&sem_empty);
sem_destroy(&sem_full);
pthread_mutex_destroy(&mutex);

cout << "所有生产和消费任务已完成。" << endl;

return 0;
}