cpp多线程内容扫盲

最近写多线程的代码,闹了个笑话。这里做个简单归纳,详细细节直接看c++多线程实战这本书即可

1. 多线程

1.1 thread的基本用法

void do_some_work();
std::thread my_thread(do_some_work);
my_thread.join();

// 当然也可以传匿名函数  这里写个返回智能指针版本
std::unique_ptr<std::thread> thread_ = std::make_unique<std::thread>([&]() { fun(); });

cpp线程池

1.2 thread的封装

相关内容:

1.2.1 std::async 与 std::future

std::async会自动创建一个线程去调用 线程函数,它返回一个std::future,这个future中存储了线程函数返回的结果,当我们需要线程函数的结果时,直接从future中获取,解耦了线程的创建和执行。

async其实相当于封装了std::promise std::packaged_task std::thread。

即 async(Callback) 等价将输入的Callback 传入 packaged_task, 调用get_guture返回future,

而future的get()依赖promise set_value,该promise设置的值为callback的返回值

获取future结果有三种方式:get、wait、wait_for,其中get等待异步操作结束并返回结果,wait只是等待异步操作完成,没有返回值,wait_for是超时等待返回结果。返回结果:std::future_status::deferred、timeout、ready

#include <future>
#include <string>
#include <iostream>


int find_the_answer_to_ltuae();
void do_other_stuff();
int main() {
  std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
  do_other_stuff();
  std::cout<<"The answer is "<<the_answer.get()<<std::endl;
  return 0;
}

async带参数示例

struct X {
  void foo(int,std::string const&);
  std::string bar(std::string const&);
};
struct Y {
  double operator()(double);
};
class move_only {
  public:
  move_only();
  move_only(move_only&&);
  move_only(move_only const&) = delete;
  move_only& operator=(move_only&&);
  move_only& operator=(move_only const&) = delete;

  void operator()();
};

int main () {
  X x;
  auto f1=std::async(&X::foo,&x,42,"hello");  // 调用p->foo(42, "hello"),p是指向x的指针
  auto f2=std::async(&X::bar,x,"goodbye");  // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本

  Y y;
  auto f3=std::async(Y(),3.141);  // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
  auto f4=std::async(std::ref(y),2.718);  // 调用y(2.718)
  X baz(X&);
  std::async(baz,std::ref(x));  // 调用baz(x)

  auto f5=std::async(move_only());  // 调用tmp(),tmp是通过std::move(move_only())构造得到

  auto f6=std::async(std::launch::async,Y(),1.2);  // 在新线程上执行
  auto f7=std::async(std::launch::deferred,baz,std::ref(x));  // 在wait()或get()调用时执行
  auto f8=std::async(
              std::launch::deferred | std::launch::async,
              baz,std::ref(x));  // 实现选择执行方式
  auto f9=std::async(baz,std::ref(x));
  f7.wait();  //  调用延迟函数
}

注意:std::async 的析构函数里会等待任务完成,如果没有接住返回值,将创建完future对象就开始析构,导致没有异步效果。

// async 可能产生的堵塞问题
// 当直接async不取出future 则 @1 @2 堵塞等待future ready才会析构完毕 执行后面语句。

void funasync() {
  std::cout << "Test start" << std::endl;
  // std::async(std::launch::async, [] {
  auto fut1 = std::async(std::launch::async, [] {
    std::this_thread::sleep_for(std::chrono::milliseconds(5000));
    std::cout << "work done 1!\n";
    return 1;
  });
  std::cout << "This shold show before work done 1" << std::endl; //@1
  // std::async(std::launch::async, [] {
  auto fut2 = std::async(std::launch::async, [] {
    std::this_thread::sleep_for(std::chrono::milliseconds(5000));
    std::cout << "work done 2!" << std::endl;
  });
  std::cout << "This shold show before work done 2" << std::endl; //@2
}

int funpack() {
  std::thread th([]() {
    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
    std::cout << "funpack start" << std::endl;
    return 1;
  });
  th.detach();
  return 0;
}

1.2.2 std::promise

promise 用于future对象交互数据 案例如下:

#include <iostream>
#include <thread>
#include <future>

// 异步执行任务并获取返回值的函数
std::future<int> asyncTask() {
    std::promise<int> p;
    std::future<int> f = p.get_future();

    std::thread t([&] {
        int result = 42;
        p.set_value(result);
    });

    t.join();
    return f;
}

int main() {
    // 异步执行任务并获取返回值
    std::future<int> future = asyncTask();
    // 等待任务完成并获取返回值
    int result = future.get();
    // 输出任务的返回值
    std::cout << "Task result: " << result << std::endl;
    return 0;
}

1.2.3 packaged_task

它和promise在某种程度上有点像,promise保存了一个共享状态的值,而packaged_task保存的是函数,通过get_future 得到future,该函数返回值调用get来获取,当然可以使用wait get等方式等待并获取返回结果。它的基本用法:

std::packaged_task<int()> task([](){ return 7; });
    std::thread t1(std::ref(task)); 
    std::future<int> f1 = task.get_future(); 
    auto r1 = f1.get();

简单案例:

#include <chrono>
#include <future>
#include <iostream>
#include <string>

int main() {
  std::packaged_task<int(void)> pt(funpack);
  std::future<int> fu = pt.get_future();
  // std::thread ts(std::move(pt));
  pt();
  std::future_status st = fu.wait_for(std::chrono::seconds(6));
  // wait_for可设置超时时间,如果在超时时间之内任务完成则返回
  // std::future_status::ready状态;如果在超时时间之内任务尚未完成,
  // 则返回std::future_status::timeout状态

  if (st == std::future_status::ready) {
    std::cout << "~~~~" << std::endl;
  } else if (st == std::future_status::timeout) {
    std::cout << "·····" << std::endl;
  } else if (st == std::future_status::deferred) {
    std::cout << "11111" << std::endl;
  }

  int a = fu.get();
  std::cout << a << std::endl;
  std::this_thread::sleep_for(std::chrono::milliseconds(5000));
  std::cout << "end main" << std::endl;
  return 0;
}

2. 互斥锁

多线程可以通过锁来进行同步,分为独占锁(mutex)和共享锁(shared_mutex),共享锁就是读写锁,lock与unlock都是不能被多次上锁解锁的。

  • 避免死锁的几个准则:
    • 避免嵌套加锁
    • 避免持有锁时执行未确定代码
    • 使用固定顺序获取锁 (std::lock 可以同时获取多个锁)
    • 实现并使用多层级的锁
    • 锁的粒度要控制住,使用unique_lock提前解锁尽量避免获取锁后长时间处理其他部分。
    • 只用于初始化的锁 单例模式使用std::once_flag和std::call_once

若另一线程已锁定互斥,则lock的调用线程将阻塞执行,直至获得锁。除了普通锁的lock tye_lock ublock 还具有共享锁定lock_shared try_lock_shared unlock_shared,即共享锁定可以被多个线程获取。 若已以任何模式(共享或排他性)占有 mutex 的线程调用 lock ,则行为未定义。也就是说,已经获得读模式锁或者写模式锁的线程再次调用lock的话,行为是未定义的。 std::recursive_mutex 才允许嵌套上锁,且需要unlock同等次数。

通常不直接调用 lock() unlock() 而是用 std::unique_lock std::lock_guard std::shared_lock 管理排他性锁定。析构自动解锁。所以使用上述类来包装锁时,千万不要自行调用锁的lock unlock接口。使用如下所示:

std::mutex mtx;
// lock_guard 类似简化的 unique 无法提前解锁,
// 即不能拷贝、赋值、移动,只能通过构造函数初始化和析构函数销毁,
// unique_lock 是可移动的,可以拷贝、赋值、移动。
{
  std::unique_lock<std::mutex> lck(mutex);
  lck.unlock(); // 可提前解锁
} //析构时会判断是否已解锁来进行解锁
{
  std::unique_lock<std::mutex> lck(mutex);
}

std::shared_mutex mutex_;
{
  //读者, 获取共享锁, 使用shared_lock
  std::shared_lock<std::shared_mutex> lck(mutex_);//执行mutex_.lock_shared();
}//lck 析构, 执行mutex_.unlock_shared();

{
  //写者, 获取独占锁, 使用unique_lock
  std::unique_lock<std::shared_mutex> lck(mutex_);//执行mutex_.lock();
}//lck 析构, 执行mutex_.unlock();

3 条件变量

上述条件变量的执行时会先申请获取锁,执行完毕后自动解锁。

以wait为例,条件变量的wait(unique_lock& lck, Predicate pred);条件变量会被notify唤醒后确保lck无锁再执行pred函数为true时才解除堵塞。注意当最开始执行wait时会直接执行pred,如果为true不需要被notify就通过(相当于认为此时已经被notify)。 这就是为什么用std::unique_lock而不使用std::lock_guard——等待中的线程必须在等待期间解锁互斥量, 并在这之后对互斥量再次上锁,而std::lock_guard没有这么灵活。

3.1 简易的可中断多线程sleep示例

class InterruptableSleeper {
 public:
  template <typename... Duration>
  // duration will be passed to condition_variable::sleep_for
  void SleepFor(Duration&&... duration) {
    std::unique_lock lock(mutex_);

    if (!interrupt_) {
      condition_.wait_for(lock, std::forward<Duration>(duration)...);
    }

    interrupt_ = false;
  }

  void Interrupt() {
    std::unique_lock lock(mutex_);
    interrupt_ = true;
    condition_.notify_all();
  }

 private:
  std::mutex mutex_;
  std::condition_variable condition_;
  bool interrupt_ = false;
};

4 原子

4.1 原子变量

C++中原子变量(atomic)是一种多线程编程中常用的同步机制,它能够确保对共享变量的操作在执行时不会被其他线程的操作干扰,从而避免竞态条件(race condition)和死锁(deadlock)等问题。

原子变量可以看作是一种特殊的类型,它具有类似于普通变量的操作,但是这些操作都是原子级别的,即要么全部完成,要么全部未完成。C++标准库提供了丰富的原子类型,包括整型、指针、布尔值等,使用方法也非常简单,只需要通过std::atomic定义一个原子变量即可,其中T表示变量的类型。

在普通的变量中,并发的访问它可能会导致数据竞争,竞争的后果会导致操作过程不会按照正确的顺序进行操作。

4.2 六类原子同步

std::memory_order_seq_cst比起其他内存序要简单的多,因为所有操作都将其作为总序。本章的所有例子,都是从std::memory_order_seq_cst开始,只有当基本操作正常工作的时候,才放宽内存序的选择。在这种情况下,使用其他内存序就是进行优化(早起可以不用这样做)。通常,当你看整套代码对数据结构的操作后,才能决定是否要放宽该操作的内存序选择。所以,尝试放宽选择,可能会让你轻松一些。在测试后的时候,工作的代码可能会很复杂(不过,不能完全保证内存序正确)。除非你有一个算法检查器,可以系统的测试,线程能看到的所有可能性组合,这样就能保证指定内存序的正确性(这样的测试的确存在),仅是执行实现代码是远远不够的。

内存原子执行模型共6种,默认是memory_order_seq_cst;这是最严格的可用选项。

  • 顺序一致: 当前线程完全同步其他线程的操作,这就需要处理期间进行大量并且费时的信息交换.
    • memory_order_seq_cst
  • 获得-释放: 操作仍然没有总的顺序,但比较松散引入了一些同步,不同线程的同一变量先 acq 再release
    • memory_order_consume; 为数据添加依赖 可通过std::kill_dependency()显式取消
    • memory_order_acquire; load 获取 (常用同步手段)当前线程中的其他读写操作不允许重排到W之后,
    • memory_order_release; store 释放 (常用同步手段)当前线程中的其他读写操作不允许重排到R之前,
    • memory_order_acq_rel; 结合获取与释放,兼具上述功能
  • 松散: 完全不同步,执行顺序不确定交由系统执行。其他线程可能读到新值,也可能读到旧值。一般联合其他约束
    • memory_order_relaxed;

5. 其他

  • thread_localthread_local 有点像是该线程的static变量,线程独占

  • std::thread 传参为引用时,使用std::ref (需要注意变量生命周期), 否则编译器不通过,编译器通过时会造成thread构造函数盲目的拷贝已提供变即传递给函数的参数是传参内部拷贝的引用,而非数据本身的引用。

  • std:🧵:hardware_concurrency() 返回当前进程并发线程数量,线程标识类型是std:🧵:id,可以通过调用对象的get_id()或std::this_thread::get_id()获取。默认值为std:🧵:type id允许排序比较。