36、多线程入门

厨子大约 5 分钟C++C++基础编程程序厨

并发编程的重要性不用多说。本文主要介绍C++多线程相关的知识点。

如何创建线程?

C++11之前你可能使用pthread_xxx来创建线程,繁琐且不易读,C++11引入了std::thread来创建线程,支持对线程join或者detach。直接看代码:

#include <iostream>
#include <thread>
using namespace std;

int main() {
 auto func = {
  for (int i = 0; i < 10; ++i) {
   cout << i << " ";
  }
  cout << endl;
 };
 
 std::thread t(func);
 if (t.joinable()) {
  t.detach();
 }
 
 auto func1 = [](int k) {
  for (int i = 0; i < k; ++i) {
   cout << i << " ";
  }
  cout << endl;
 };
 
 std::thread tt(func1, 20);
 if (tt.joinable()) { // 检查线程可否被join
  tt.join();
 }
 return 0;
}

上述代码中,函数funcfunc1运行在线程对象ttt中,从刚创建对象开始就会新建一个线程用于执行函数,调用**join****函数将会阻塞主线程,直到线程函数执行结束,线程函数的返回值将会被忽略。如果不希望线程被阻塞执行,可以调用线程对象的detach**函数,表示将线程和线程对象分离,新的线程与主线程没有任何关联,线程资源在任务结束后会由操作系统自动回收。

如果没有调用****join或者detach**函数,假如线程函数执行时间较长,此时线程对象的生命周期结束调用析构函数清理资源,这时可能会发生****crash**,这里有两种解决办法,一个是调用join(),保证线程函数的生命周期和线程对象的生命周期相同,另一个是调用detach(),将线程和线程对象分离,这里需要注意,如果线程已经和对象分离,那我们就再也无法控制线程什么时候结束了,不能再通过join来等待线程执行完。

C++11还提供了获取线程id,或者系统cpu个数,获取thread native_handle,让线程休眠等功能:

std::thread t(func);
cout << "当前线程ID " << t.get_id() << endl;
cout << "当前cpu个数 " << std::thread::hardware_concurrency() << endl;
auto handle = t.native_handle(); // handle可用于pthread相关操作
std::this_thread::sleep_for(std::chrono::seconds(1));

如何加锁?

C++11中,加锁可以使用std::mutexmutex主要有四种:

  • std::mutex:独占的互斥量,不能递归使用,不带超时功能
  • std::recursive_mutex:递归互斥量,可重入,不带超时功能
  • std::timed_mutex:带超时的互斥量,不能递归
  • std::recursive_timed_mutex:带超时的互斥量,可以递归使用

最常用的就是**std::mutex**,其它三种我也没用过:

std::mutex mutex_;
int main() {
 auto func1 = [](int k) {
  mutex_.lock();
  for (int i = 0; i < k; ++i) {
   cout << i << " ";
  }
  cout << endl;
  mutex_.unlock();
 };
 
 std::thread threads[5];
 for (int i = 0; i < 5; ++i) {
  threads[i] = std::thread(func1, 200);
 }
 
 for (auto& th : threads) {
  th.join();
 }
 
 return 0;
}

mutex还可以搭配RAII方式的锁封装类一起使用,可以动态的释放锁资源,防止线程由于编码失误导致始终持有锁。

C++11主要有std::lock_guardstd::unique_lock两种RAII方式,使用方式类似:

auto func1 = [](int k) {
 // std::lock_guard<std::mutex> lock(mutex_);
 std::unique_lock<std::mutex> lock(mutex_);
 for (int i = 0; i < k; ++i) {
  cout << i << " ";
 }
 cout << endl;
};

std::lock_gurad相比于std::unique_lock更加轻量级,少了一些成员函数,std::unique_lock类有unlock函数,可以手动释放锁,所以条件变量都配合std::unique_lock使用,而不是std::lock_guard,因为条件变量在wait时需要有手动释放锁的能力,具体关于条件变量后面会讲到。

如何使用原子操作?

C++11提供了原子类型std::atomic<T>,理论上这个T可以是任意类型,但是我平时只存放整型,别的还真的没用过,整型有这种原子变量已经足够方便,就不需要使用std::mutex来保护该变量啦。看一个带锁计数器的代码:

struct OriginCounter { // 普通的计数器
 int count;
 std::mutex mutex_;
 
 void add() {
  std::lock_guard<std::mutex> lock(mutex_);
  ++count;
 }
 
 void sub() {
  std::lock_guard<std::mutex> lock(mutex_);
  --count;
 }
 
 int get() {
  std::lock_guard<std::mutex> lock(mutex_);
  return count;
 }
 
};

而用原子变量就方便的多:

struct NewCounter { // 使用原子变量的计数器
 std::atomic<int> count;
 void add() {
  ++count;
 }
 
 void sub() {
  --count;
 }
 
 int get() { return count.load(); }
};

如何使用条件变量?

条件变量是C++11引入的一种同步机制,它可以阻塞一个线程或者个线程,直到有线程通知或者超时才会唤醒正在阻塞的线程,条件变量需要和锁配合使用,这里的锁就是上面介绍的std::unique_lock

这里使用条件变量实现一个CountDownLatch

class CountDownLatch {
public:
 explicit CountDownLatch(uint32_t count) : count_(count);
 
 void CountDown() {
  std::unique_lock<std::mutex> lock(mutex_);
  --count_;
  if (count_ == 0) {
   cv_.notify_all();
  }
 }
 
 void Await(uint32_t time_ms = 0) {
  std::unique_lock<std::mutex> lock(mutex_);
  while (count_ > 0) {
   if (time_ms > 0) {
    cv_.wait_for(lock, std::chrono::milliseconds(time_ms));
   } else {
    cv_.wait(lock);
   }
  }
 }
 
 uint32_t GetCount() const {
  std::unique_lock<std::mutex> lock(mutex_);
  return count_;
 }
 
private:
 std::condition_variable cv_;
 mutable std::mutex mutex_;
 uint32_t count_ = 0;
};

关于条件变量其实还涉及到通知丢失和虚假唤醒问题,可以看这篇文章:条件变量open in new window

如何优雅的执行异步任务?

你可能已经猜到了,我要介绍的就是async,关于异步操作可以优先使用async,看这段代码:

#include <functional>
#include <future>
#include <iostream>
#include <thread>
using namespace std;

int func(int in) { return in + 1; }

int main() {
 auto res = std::async(func, 5);
 // res.wait();
 cout << res.get() << endl; // 阻塞直到函数返回
 return 0;
}

使用async异步执行函数是不是方便多啦。

async具体语法如下:

async(std::launch::async | std::launch::deferred, func, args...);

第一个参数是创建策略:

  • std::launch::async表示任务执行在另一线程。
  • std::launch::deferred表示延迟执行任务,调用get或者wait时才会执行,不会创建线程,惰性执行在当前线程。

如果不明确指定创建策略,以上两个都不是async的默认策略,而是未定义,它是一个基于任务的程序设计,内部有一个调度器(线程池),会根据实际情况决定采用哪种策略。

若从 std::async 获得的 std::future 未被移动或绑定到引用,则在完整表达式结尾,std::future的析构函数将阻塞直至异步计算完成,实际上相当于同步操作:

std::async(std::launch::async, [] { f(); }); // 临时量的析构函数等待 f()
std::async(std::launch::async, [] { g(); }); // f() 完成前不开始

关于多线程还有很多其它的知识点,比如futurepackage_taskpromisecall_once等等。