POSIX Threads (pthread) 编程实战
1. 项目概述
🔗 Github 源码地址 :https://github.com/Hai-Jun-Yan/linux-C-C--Thread/tree/main/Posix-Thread
本项目是一个基于 C++ 和 POSIX Threads (pthread) 库的多线程编程实战练习集。通过七个循序渐进的示例代码,系统展示了在 Linux/Unix 环境下进行多线程开发的核心技术路径。
项目涵盖的核心能力:
基础管理 :线程的创建、退出、回收(Join)与分离(Detach)。
资源同步 :使用互斥锁(Mutex)解决临界区竞态条件,以及死锁的成因与预防。
性能优化 :使用读写锁(Read-Write Lock)优化“读多写少”场景下的并发性能。
经典模型 :基于条件变量 (Condition Variable)和信号量 (Semaphore)实现生产者-消费者模型。
应用场景 :本项目的技术栈广泛应用于 Linux 服务器后端开发、嵌入式系统、高性能计算(HPC)以及任何需要充分利用多核 CPU 性能的 C++ 项目中。
2. 前置知识 在深入代码之前,你需要掌握以下核心概念:
进程 vs 线程 :
进程 是资源分配的最小单位(拥有独立的内存空间)。
线程 是 CPU 调度的最小单位(共享进程的内存空间)。
并发 (Concurrency) vs 并行 (Parallelism) :
并发是逻辑上的同时处理(时间片轮转)。
并行是物理上的同时执行(多核 CPU)。
竞态条件 (Race Condition) :当多个线程同时访问并修改共享数据时,结果依赖于执行顺序的现象。
同步原语 :用于协调线程执行顺序的机制,如锁、信号量、条件变量。
3. API 详解 本项目主要使用了以下 POSIX 线程标准接口(需包含 <pthread.h>):
API 函数
核心作用
关键参数说明
pthread_create
创建新线程
pthread_t *thread: 线程ID指针void *(*start_routine) (void *): 线程入口函数void *arg: 传递给入口函数的参数
pthread_join
等待线程结束
pthread_t thread: 目标线程IDvoid **retval: 接收线程返回值的二级指针
pthread_detach
分离线程
pthread_t thread: 目标线程ID 分离后线程资源由系统自动回收,无需 join
pthread_exit
退出当前线程
void *retval: 线程退出时的返回值
pthread_mutex_lock
互斥锁加锁
pthread_mutex_t *mutex: 互斥锁指针 若已被锁,当前线程阻塞
pthread_mutex_unlock
互斥锁解锁
pthread_mutex_t *mutex: 互斥锁指针
pthread_rwlock_rdlock
读写锁-读加锁
允许多个读线程同时进入,但阻塞写线程
pthread_rwlock_wrlock
读写锁-写加锁
独占锁,阻塞其他所有读和写线程
pthread_cond_wait
条件变量等待
pthread_cond_t *cond: 条件变量pthread_mutex_t *mutex: 关联的互斥锁原子操作 :解锁 -> 等待 -> 被唤醒 -> 加锁
pthread_cond_signal
唤醒一个等待者
pthread_cond_t *cond: 条件变量
sem_wait
信号量 P 操作
sem_t *sem: 信号量 计数器减1,若为0则阻塞
sem_post
信号量 V 操作
sem_t *sem: 信号量 计数器加1,唤醒等待者
4. 核心代码实现 4.1 线程基础:创建与传参 文件 :1.CreateThread.cpp 展示了最基本的线程创建流程,以及如何给线程传递参数(虽然此例传了 NULL)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #include <iostream> #include <string.h> #include <pthread.h> #include <unistd.h> using namespace std;void * callback (void * arg) { for (int i = 0 ; i < 5 ; i++) { cout << "子线程: " << i << endl; } cout << "子线程结束-ID:" << pthread_self () << endl; return NULL ; } int main () { pthread_t tid; pthread_create (&tid, NULL , callback, NULL ); pthread_join (tid, NULL ); for (int i = 0 ; i < 5 ; i++) { cout << "主线程: " << i << endl; } cout << "主线程结束-ID:" << pthread_self () << endl; return 0 ; }
4.2 线程交互:退出与获取返回值 文件 :2.ExitAndJoinThread.cpp 演示了子线程如何通过 pthread_exit 返回数据,以及主线程如何通过 pthread_join 获取该数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <iostream> #include <string.h> #include <pthread.h> #include <unistd.h> using namespace std;struct Test { int time; int data; }; struct Test *test = (struct Test*)malloc (sizeof (struct Test));void * callback (void * arg) { for (int i = 0 ; i < 5 ; i++) { cout << "子线程: " << i << endl; } cout << "子线程结束-ID:" << pthread_self () << endl; test->time = 10 ; test->data = 100 ; cout << "子线程: time=" << test->time << ", data=" << test->data << endl; pthread_exit (test); return NULL ; } int main () { pthread_t tid; pthread_create (&tid, NULL , callback, NULL ); pthread_join (tid, (void **)&test); cout << "主线程: time=" << test->time << ", data=" << test->data << endl; for (int i = 0 ; i < 5 ; i++) { cout << "主线程: " << i << endl; } cout << "主线程结束-ID:" << pthread_self () << endl; return 0 ; }
4.3 线程属性:线程分离 文件 :3.DetachThread.cpp 展示了如何让线程“自生自灭”,无需主线程等待回收。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 #include <iostream> #include <string.h> #include <pthread.h> #include <unistd.h> using namespace std;void * callback (void * arg) { for (int i = 0 ; i < 5 ; i++) { cout << "子线程: " << i << endl; } cout << "子线程结束-ID:" << pthread_self () << endl; return NULL ; } int main () { pthread_t tid; pthread_create (&tid, NULL , callback, NULL ); pthread_detach (tid); pthread_join (tid, NULL ); for (int i = 0 ; i < 5 ; i++) { cout << "主线程: " << i << endl; } cout << "主线程结束-ID:" << pthread_self () << endl; return 0 ; }
4.4 线程同步:互斥锁与死锁详解 文件 :4.MutexThreadSynchronization.cpp 这是并发编程中最重要的一课。代码展示了如何使用 pthread_mutex_t 保护全局变量 number,并在注释中详细分析了死锁的三种情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 #include <iostream> #include <unistd.h> #include <pthread.h> #include <atomic> using namespace std;#define MAX 50 int number = 0 ;pthread_mutex_t mutex;void * funcA_num (void * arg) { for (int i=0 ; i<MAX; ++i) { pthread_mutex_lock (&mutex); int cur = number; cur++; number = cur; cout << "Thread A, id = " << pthread_self () << ", number = " << number << endl; pthread_mutex_unlock (&mutex); } return NULL ; } void * funcB_num (void * arg) { for (int i=0 ; i<MAX; ++i) { pthread_mutex_lock (&mutex); int cur = number; cur++; number = cur; cout << "Thread B, id = " << pthread_self () << ", number = " << number << endl; pthread_mutex_unlock (&mutex); } return NULL ; } int main (int argc, const char * argv[]) { pthread_mutex_init (&mutex, NULL ); pthread_t p1, p2; pthread_create (&p1, NULL , funcA_num, NULL ); pthread_create (&p2, NULL , funcB_num, NULL ); pthread_join (p1, NULL ); pthread_join (p2, NULL ); pthread_mutex_destroy (&mutex); return 0 ; }
4.5 进阶同步:读写锁 文件 :5.ReadWriteLock.cpp 展示了在“读多写少”场景下,如何使用 pthread_rwlock 提高并发效率。代码中模拟了 3 个写线程和 5 个读线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 #include <iostream> #include <unistd.h> #include <pthread.h> #include <atomic> using namespace std;#define MAX 50 int number = 0 ;pthread_mutex_t mutex;void * write_num (void * arg) { for (int i=0 ; i<MAX; ++i) { pthread_rwlock_wrlock (&mutex); int cur = number; cur++; number = cur; cout << "Write, id = " << pthread_self () << ", number = " << number << endl; pthread_rwlock_unlock (&mutex); } return NULL ; } void * read_num (void * arg) { for (int i=0 ; i<MAX; ++i) { pthread_rwlock_rdlock (&mutex); int cur = number; cout << "Read, id = " << pthread_self () << ", cur = " << number << endl; pthread_rwlock_unlock (&mutex); } return NULL ; } int main (int argc, const char * argv[]) { pthread_rwlock_init (&mutex, NULL ); pthread_t p1[3 ], p2[5 ]; for (int i = 0 ; i < 3 ; ++i) { pthread_create (&p1[i], NULL , write_num, NULL ); } for (int i = 0 ; i < 5 ; ++i) { pthread_create (&p2[i], NULL , read_num, NULL ); } for (int i = 0 ; i < 3 ; ++i) { pthread_join (p1[i], NULL ); } for (int i = 0 ; i < 5 ; ++i) { pthread_join (p2[i], NULL ); } pthread_rwlock_destroy (&mutex); return 0 ; }
4.6 生产者-消费者模型:条件变量版 文件 :6.CondMutexThreadProducerConsumer.cpp 经典模型实现。通过 not_full 和 not_empty 两个条件变量,实现了线程间的精细协作。特别注意 while 循环的使用,是为了防止虚假唤醒 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 #include <iostream> #include <pthread.h> #include <queue> #include <unistd.h> using namespace std;#define QUEUE_SIZE 5 #define PRODUCER_COUNT 2 #define CONSUMER_COUNT 2 #define PRODUCE_EACH 10 queue<int > buffer; pthread_mutex_t mutex; pthread_cond_t not_empty; pthread_cond_t not_full; void * producer (void * arg) { int id = *(int *)arg; for (int i = 0 ; i < PRODUCE_EACH; ++i) { pthread_mutex_lock (&mutex); while (buffer.size () >= QUEUE_SIZE) { pthread_cond_wait (¬_full, &mutex); } int value = id * 100 + i; buffer.push (value); cout << "生产者 " << id << " 放入数据: " << value << ", 当前队列大小: " << buffer.size () << endl; pthread_cond_signal (¬_empty); pthread_mutex_unlock (&mutex); usleep (1000 * 10 ); } return NULL ; } void * consumer (void * arg) { int id = *(int *)arg; for (int i = 0 ; i < PRODUCE_EACH; ++i) { pthread_mutex_lock (&mutex); while (buffer.empty ()) { pthread_cond_wait (¬_empty, &mutex); } int value = buffer.front (); buffer.pop (); cout << "消费者 " << id << " 取走数据: " << value << ", 当前队列大小: " << buffer.size () << endl; pthread_cond_signal (¬_full); pthread_mutex_unlock (&mutex); usleep (1000 * 15 ); } return NULL ; } int main () { pthread_mutex_init (&mutex, NULL ); pthread_cond_init (¬_empty, NULL ); pthread_cond_init (¬_full, NULL ); pthread_t producers[PRODUCER_COUNT]; pthread_t consumers[CONSUMER_COUNT]; int producer_ids[PRODUCER_COUNT]; int consumer_ids[CONSUMER_COUNT]; for (int i = 0 ; i < PRODUCER_COUNT; ++i) { producer_ids[i] = i + 1 ; pthread_create (&producers[i], NULL , producer, &producer_ids[i]); } for (int i = 0 ; i < CONSUMER_COUNT; ++i) { consumer_ids[i] = i + 1 ; pthread_create (&consumers[i], NULL , consumer, &consumer_ids[i]); } for (int i = 0 ; i < PRODUCER_COUNT; ++i) { pthread_join (producers[i], NULL ); } for (int i = 0 ; i < CONSUMER_COUNT; ++i) { pthread_join (consumers[i], NULL ); } pthread_cond_destroy (¬_empty); pthread_cond_destroy (¬_full); pthread_mutex_destroy (&mutex); cout << "所有生产和消费任务已完成。" << endl; return 0 ; }
4.7 生产者-消费者模型:信号量版 文件 :7.SignalMutexThreadProducerConsumer.cpp 展示了如何用信号量(Semaphore)简化资源计数管理。通过 sem_empty 和 sem_full 直接控制资源的数量,代码逻辑比条件变量版更直观。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 #include <iostream> #include <pthread.h> #include <queue> #include <semaphore.h> #include <unistd.h> using namespace std;#define QUEUE_SIZE 5 #define PRODUCER_COUNT 2 #define CONSUMER_COUNT 2 #define PRODUCE_EACH 10 queue<int > buffer; pthread_mutex_t mutex; sem_t sem_empty; sem_t sem_full; void * producer (void * arg) { int id = *(int *)arg; for (int i = 0 ; i < PRODUCE_EACH; ++i) { sem_wait (&sem_empty); pthread_mutex_lock (&mutex); int value = id * 100 + i; buffer.push (value); cout << "生产者 " << id << " 放入数据: " << value << ", 当前队列大小: " << buffer.size () << endl; pthread_mutex_unlock (&mutex); sem_post (&sem_full); usleep (1000 * 10 ); } return NULL ; } void * consumer (void * arg) { int id = *(int *)arg; for (int i = 0 ; i < PRODUCE_EACH; ++i) { sem_wait (&sem_full); pthread_mutex_lock (&mutex); int value = buffer.front (); buffer.pop (); cout << "消费者 " << id << " 取走数据: " << value << ", 当前队列大小: " << buffer.size () << endl; pthread_mutex_unlock (&mutex); sem_post (&sem_empty); usleep (1000 * 15 ); } return NULL ; } int main () { pthread_mutex_init (&mutex, NULL ); sem_init (&sem_empty, 0 , QUEUE_SIZE); sem_init (&sem_full, 0 , 0 ); pthread_t producers[PRODUCER_COUNT]; pthread_t consumers[CONSUMER_COUNT]; int producer_ids[PRODUCER_COUNT]; int consumer_ids[CONSUMER_COUNT]; for (int i = 0 ; i < PRODUCER_COUNT; ++i) { producer_ids[i] = i + 1 ; pthread_create (&producers[i], NULL , producer, &producer_ids[i]); } for (int i = 0 ; i < CONSUMER_COUNT; ++i) { consumer_ids[i] = i + 1 ; pthread_create (&consumers[i], NULL , consumer, &consumer_ids[i]); } for (int i = 0 ; i < PRODUCER_COUNT; ++i) pthread_join (producers[i], NULL ); for (int i = 0 ; i < CONSUMER_COUNT; ++i) pthread_join (consumers[i], NULL ); sem_destroy (&sem_empty); sem_destroy (&sem_full); pthread_mutex_destroy (&mutex); cout << "所有生产和消费任务已完成。" << endl; return 0 ; }