1. 项目概述
🔗 Github 源码地址 :https://github.com/Hai-Jun-Yan/linux-C-C--Thread/tree/main/C%2B%2B11-Thread
本项目旨在通过一系列核心示例,全面展示 C++11 标准库提供的现代多线程编程能力。项目涵盖了从基础的线程创建与管理,到复杂的线程同步、互斥锁机制,以及经典的生产者-消费者模型实现。
主要功能与目标:
线程基础 :掌握 std::thread 的创建、等待(join)与分离(detach)。
结果传递 :学习使用 std::promise 和 std::future 在线程间安全地传递数据。
线程同步 :深入理解 std::mutex 互斥锁及其 RAII 包装器(std::lock_guard, std::unique_lock)的使用。
高级模型 :手动实现读写锁机制,以及基于条件变量(std::condition_variable)的两种生产者-消费者模型(队列版与信号量版)。
应用场景:
高并发服务器开发
实时数据处理与后台任务调度
利用多核 CPU 提升计算性能
2. 前置知识 在开始本项目之前,建议具备以下基础:
C++11 新特性 :熟悉 Lambda 表达式、右值引用与移动语义(std::move)、RAII(资源获取即初始化)惯用手法。
操作系统基础 :
进程与线程 :理解线程是操作系统调度的最小单位,共享进程的内存空间。
并发与并行 :区分多任务交替执行(并发)与多核同时执行(并行)。
竞态条件 (Race Condition) :理解多线程同时访问共享资源可能导致的数据不一致问题。
死锁 (Deadlock) :了解死锁产生的四个必要条件及预防方法。
3. API 详解 本项目主要涉及 <thread>, <mutex>, <future>, <condition_variable> 头文件中的核心组件。
组件 / API
核心作用
关键参数与说明
std::thread
线程对象
thread(Fn&& fn, Args&&... args): 构造时立即启动线程执行 fn。join(): 阻塞当前线程直到该线程结束。detach(): 分离线程,使其在后台独立运行。
std::this_thread
当前线程工具
sleep_for(duration): 使当前线程休眠指定时长。get_id(): 获取当前线程的唯一标识符。
std::mutex
互斥锁
lock(): 阻塞直到获得锁。unlock(): 释放锁。 通常配合 lock_guard 使用以防忘记解锁。
std::lock_guard
锁的 RAII 包装
构造时加锁,析构时自动解锁。不可手动解锁,不可拷贝。
std::unique_lock
灵活的锁包装
比 lock_guard 更灵活,支持手动 lock/unlock,配合条件变量使用。
std::condition_variable
条件变量
wait(lock): 释放锁并挂起线程,直到被唤醒。notify_one(): 唤醒一个等待线程。notify_all(): 唤醒所有等待线程。
std::promise
承诺(发送端)
用于在线程的一端设置值 (set_value),该值可在另一端通过 future 获取。
std::future
未来(接收端)
get(): 阻塞等待 promise 设置的值并获取它。
4. 核心代码实现 以下展示项目的七个核心模块,代码与源码完全一致。
4.1 线程创建与回收 (1.CreateThread.cpp) 演示最基础的线程创建,以及使用 join() 等待子线程结束,防止主线程提前退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <iostream> #include <thread> #include <chrono> using namespace std;void callback () { for (int i = 0 ; i < 5 ; i++) { cout << "子线程: " << i << endl; this_thread::sleep_for (chrono::milliseconds (10 )); } cout << "子线程结束-ID: " << this_thread::get_id () << endl; } int main () { thread worker (callback) ; worker.join (); for (int i = 0 ; i < 5 ; i++) { cout << "主线程: " << i << endl; this_thread::sleep_for (chrono::milliseconds (10 )); } cout << "主线程结束-ID: " << this_thread::get_id () << endl; return 0 ; }
4.2 线程返回值传递 (2.ExitAndJoinThread.cpp) C++11 中不直接通过 join 获取返回值,而是推荐使用 promise 和 future 机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 #include <iostream> #include <thread> #include <future> #include <chrono> using namespace std;struct Test { int time; int data; }; void callback (promise<Test> resultPromise) { for (int i = 0 ; i < 5 ; i++) { cout << "子线程: " << i << endl; this_thread::sleep_for (chrono::milliseconds (10 )); } cout << "子线程结束-ID: " << this_thread::get_id () << endl; Test result; result.time = 10 ; result.data = 100 ; cout << "子线程: time=" << result.time << ", data=" << result.data << endl; resultPromise.set_value (result); } int main () { promise<Test> resultPromise; future<Test> resultFuture = resultPromise.get_future (); thread worker (callback, move(resultPromise)) ; worker.join (); Test result = resultFuture.get (); cout << "主线程: time=" << result.time << ", data=" << result.data << endl; for (int i = 0 ; i < 5 ; i++) { cout << "主线程: " << i << endl; this_thread::sleep_for (chrono::milliseconds (10 )); } cout << "主线程结束-ID: " << this_thread::get_id () << endl; return 0 ; }
4.3 线程分离 (3.DetachThread.cpp) 展示如何使用 detach() 让线程在后台独立运行,系统会自动回收资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <iostream> #include <thread> #include <chrono> using namespace std;void callback () { for (int i = 0 ; i < 5 ; i++) { cout << "子线程: " << i << endl; this_thread::sleep_for (chrono::milliseconds (10 )); } cout << "子线程结束-ID: " << this_thread::get_id () << endl; } int main () { thread worker (callback) ; worker.detach (); for (int i = 0 ; i < 5 ; i++) { cout << "主线程: " << i << endl; this_thread::sleep_for (chrono::milliseconds (10 )); } cout << "主线程结束-ID: " << this_thread::get_id () << endl; this_thread::sleep_for (chrono::milliseconds (100 )); return 0 ; }
4.4 互斥锁同步 (4.MutexThreadSynchronization.cpp) 使用 std::mutex 和 std::lock_guard 保护共享资源,防止数据竞争,并总结了死锁的常见场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 #include <iostream> #include <thread> #include <mutex> #include <chrono> using namespace std;#define MAX 50 int number = 0 ;mutex mtx; void funcA_num () { for (int i = 0 ; i < MAX; ++i) { lock_guard<mutex> lock (mtx) ; int cur = number; cur++; number = cur; cout << "Thread A, id = " << this_thread::get_id () << ", number = " << number << endl; } } void funcB_num () { for (int i = 0 ; i < MAX; ++i) { lock_guard<mutex> lock (mtx) ; int cur = number; cur++; number = cur; cout << "Thread B, id = " << this_thread::get_id () << ", number = " << number << endl; } } int main () { thread t1 (funcA_num) ; thread t2 (funcB_num) ; t1. join (); t2. join (); return 0 ; }
4.5 读写锁实现 (5.ReadWriteLock.cpp) C++11 标准库(直到 C++17 才有 shared_mutex)没有原生读写锁。这里展示了如何用 mutex + condition_variable 手动实现一个“写优先”的读写锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <vector> #include <chrono> using namespace std;#define MAX 50 int number = 0 ;class ReadWriteLock { public : void lock_read () { unique_lock<mutex> lock (mtx) ; cv.wait (lock, [this ]() { return !writer_active && waiting_writers == 0 ; }); ++reader_count; } void unlock_read () { unique_lock<mutex> lock (mtx) ; --reader_count; if (reader_count == 0 ) { cv.notify_all (); } } void lock_write () { unique_lock<mutex> lock (mtx) ; ++waiting_writers; cv.wait (lock, [this ]() { return !writer_active && reader_count == 0 ; }); --waiting_writers; writer_active = true ; } void unlock_write () { unique_lock<mutex> lock (mtx) ; writer_active = false ; cv.notify_all (); } private : mutex mtx; condition_variable cv; int reader_count = 0 ; int waiting_writers = 0 ; bool writer_active = false ; }; ReadWriteLock rwlock; void write_num () { for (int i = 0 ; i < MAX; ++i) { rwlock.lock_write (); int cur = number; cur++; number = cur; cout << "Write, id = " << this_thread::get_id () << ", number = " << number << endl; rwlock.unlock_write (); this_thread::sleep_for (chrono::milliseconds (5 )); } } void read_num () { for (int i = 0 ; i < MAX; ++i) { rwlock.lock_read (); cout << "Read, id = " << this_thread::get_id () << ", cur = " << number << endl; rwlock.unlock_read (); this_thread::sleep_for (chrono::milliseconds (5 )); } } int main () { vector<thread> writers; vector<thread> readers; for (int i = 0 ; i < 3 ; ++i) { writers.emplace_back (write_num); } for (int i = 0 ; i < 5 ; ++i) { readers.emplace_back (read_num); } for (auto & t : writers) t.join (); for (auto & t : readers) t.join (); return 0 ; }
4.6 条件变量生产者-消费者 (6.CondMutexThreadProducerConsumer.cpp) 最经典的生产者-消费者模型实现。使用 condition_variable 的 wait 和 notify 机制来协调线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <chrono> using namespace std;#define QUEUE_SIZE 5 #define PRODUCER_COUNT 2 #define CONSUMER_COUNT 2 #define PRODUCE_EACH 10 queue<int > buffer; mutex mtx; condition_variable not_empty; condition_variable not_full; void producer (int id) { for (int i = 0 ; i < PRODUCE_EACH; ++i) { unique_lock<mutex> lock (mtx) ; while (buffer.size () >= QUEUE_SIZE) { not_full.wait (lock); } int value = id * 100 + i; buffer.push (value); cout << "生产者 " << id << " 放入数据: " << value << ", 当前队列大小: " << buffer.size () << endl; not_empty.notify_one (); lock.unlock (); this_thread::sleep_for (chrono::milliseconds (10 )); } } void consumer (int id) { for (int i = 0 ; i < PRODUCE_EACH; ++i) { unique_lock<mutex> lock (mtx) ; while (buffer.empty ()) { not_empty.wait (lock); } int value = buffer.front (); buffer.pop (); cout << "消费者 " << id << " 取走数据: " << value << ", 当前队列大小: " << buffer.size () << endl; not_full.notify_one (); lock.unlock (); this_thread::sleep_for (chrono::milliseconds (15 )); } } int main () { thread producers[PRODUCER_COUNT]; thread consumers[CONSUMER_COUNT]; for (int i = 0 ; i < PRODUCER_COUNT; ++i) { producers[i] = thread (producer, i + 1 ); } for (int i = 0 ; i < CONSUMER_COUNT; ++i) { consumers[i] = thread (consumer, i + 1 ); } for (int i = 0 ; i < PRODUCER_COUNT; ++i) producers[i].join (); for (int i = 0 ; i < CONSUMER_COUNT; ++i) consumers[i].join (); cout << "所有生产和消费任务已完成。" << endl; return 0 ; }
4.7 信号量生产者-消费者 (7.SignalMutexThreadProducerConsumer.cpp) C++11 同样没有原生信号量。这里展示了如何封装一个 Semaphore 类,并用它来简化生产者-消费者的同步逻辑(无需显式管理锁和条件变量的 notify)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <chrono> using namespace std;class Semaphore { public : explicit Semaphore (int count) : count_(count) { } void wait () { unique_lock<mutex> lock (mtx_) ; while (count_ == 0 ) { cv_.wait (lock); } --count_; } void post () { unique_lock<mutex> lock (mtx_) ; ++count_; cv_.notify_one (); } private : mutex mtx_; condition_variable cv_; int count_; }; #define QUEUE_SIZE 5 #define PRODUCER_COUNT 2 #define CONSUMER_COUNT 2 #define PRODUCE_EACH 10 queue<int > buffer; mutex mtx; Semaphore sem_empty (QUEUE_SIZE) ;Semaphore sem_full (0 ) ;void producer (int id) { for (int i = 0 ; i < PRODUCE_EACH; ++i) { sem_empty.wait (); { lock_guard<mutex> lock (mtx) ; int value = id * 100 + i; buffer.push (value); cout << "生产者 " << id << " 放入数据: " << value << ", 当前队列大小: " << buffer.size () << endl; } sem_full.post (); this_thread::sleep_for (chrono::milliseconds (10 )); } } void consumer (int id) { for (int i = 0 ; i < PRODUCE_EACH; ++i) { sem_full.wait (); { lock_guard<mutex> lock (mtx) ; int value = buffer.front (); buffer.pop (); cout << "消费者 " << id << " 取走数据: " << value << ", 当前队列大小: " << buffer.size () << endl; } sem_empty.post (); this_thread::sleep_for (chrono::milliseconds (15 )); } } int main () { thread producers[PRODUCER_COUNT]; thread consumers[CONSUMER_COUNT]; for (int i = 0 ; i < PRODUCER_COUNT; ++i) { producers[i] = thread (producer, i + 1 ); } for (int i = 0 ; i < CONSUMER_COUNT; ++i) { consumers[i] = thread (consumer, i + 1 ); } for (int i = 0 ; i < PRODUCER_COUNT; ++i) producers[i].join (); for (int i = 0 ; i < CONSUMER_COUNT; ++i) consumers[i].join (); cout << "所有生产和消费任务已完成。" << endl; return 0 ; }