简介

线程池是一种并发编程的技术,用于有效地管理和复用线程资源。它由一组预先创建的线程组成,这些线程可以在需要时执行任务,并在任务完成后返回线程池中等待下一个任务。

线程池的主要目的是避免反复创建和销毁线程的开销,以及有效地控制并发线程的数量。通过使用线程池,可以降低系统的负载,并提高任务执行的效率。

以下是线程池的一些关键特点:

  1. 线程池包含一个线程队列和任务队列,任务队列用于存储待执行的任务。
  2. 线程池在启动时会创建一定数量的线程,并将它们放入线程队列中。
  3. 当有任务需要执行时,线程池从任务队列中获取任务,并将其分配给空闲的线程执行。
  4. 执行完任务的线程会继续等待下一个任务的到来,而不是被销毁。
  5. 如果任务队列为空,线程池中的线程可以进入睡眠状态,减少资源占用。
  6. 线程池可以限制同时执行的线程数量,避免过多的并发线程导致系统负载过高。

使用线程池有以下几个优点:

  1. 提高性能:通过复用线程,避免了线程创建和销毁的开销,提高了任务执行的效率。
  2. 资源控制:线程池可以限制并发线程的数量,避免系统负载过高,保护系统资源。
  3. 提高响应性:线程池可以在任务到来时立即进行处理,减少了任务等待的时间,提高了系统的响应速度。
  4. 简化编程:使用线程池可以将任务的提交和执行分离,简化了并发编程的复杂性。

需要注意的是,在使用线程池时,需要合理设置线程池的大小,避免线程过多导致资源浪费,或线程过少导致任务等待的时间过长。

线程池的实现

首先我不希望线程池被拷贝,我希望它能以单例的形式在需要的地方调用, 那么单例模式就需要删除拷贝构造和拷贝赋值,所以我设计一个基类

class NoneCopy {

public:
    ~NoneCopy(){}
protected:
    NoneCopy(){}
private:
    NoneCopy(const NoneCopy&) = delete;
    NoneCopy& operator=(const NoneCopy&) = delete;
};

然后让线程池ThreadPool类继承NoneCopy, 这样ThreadPool也就不支持拷贝构造和拷贝赋值了,拷贝构造和拷贝赋值的前提是其基类可以拷贝构造和赋值。

class ThreadPool : public NoneCopy {
public:
    ~ThreadPool();

    static ThreadPool& instance() {
        static ThreadPool ins;
        return ins;
    }
private:
    ThreadPool();
};

我们先实现了instance函数,该函数是一个静态成员函数,返回局部的静态实例ins.

我们之前在单例模式中讲过,函数内局部的静态变量,其生命周期和进程同步,但是可见度仅在函数内部。

局部静态变量只会在第一次调用这个函数时初始化一次。故可以作为单例模式。这种模式在C++ 11之前是不安全的,因为各平台编译器实现规则可能不统一导致多线程会生成多个实例。

但是C++ 11过后,语言层面对其优化保证了多个线程调用同一个函数只会生成一个实例,所以C++ 11过后我们可以放心使用。

接下来考虑构造函数,我们说过线程池需要线程队列和任务队列,所以这两个队列要在构造函数中完成构造,线程队列我们可以用一个vector存储,任务队列因为要保证先进先出,所以用queue结构即可。

因为任务队列要有通用性,所以我们规定任务队列中存储的类型为

using Task = std::packaged_task<void()>;

我们在ThreadPool中添加如下成员

std::atomic_int          thread_num_;
std::queue<Task>         tasks_;
std::vector<std::thread> pool_;
std::atomic_bool         stop_;

其中 tasks 表示任务队列, pool表示线程队列, threadnum表示空闲的线程数, stop_表示线程池是否退出。

那我们可以实现线程池的构造函数了

ThreadPool(unsigned int num = std::thread::hardware_concurrency())
    : stop_(false) {

    if (num <= 1)
        thread_num_ = 2;
    else
        thread_num_ = num;

    start();
}

我们在构造函数中初始化停止标记为false,初始化线程数默认为硬件允许的物理并行核数。然后调用了start函数。

start函数主要的功能为启动线程并且将线程放入vector中管理,线程的回调函数基本功能就是从任务队列中消费数据,如果队列中有任务则pop出任务并执行,否则线程需要挂起。在部分初学者实现的线程池当中会采用循环等待的方式(如果队列为空则继续循环),这种方式会造成线程忙等,进而引发资源的浪费。

所以我们现在还需要给ThreadPool添加两个成员

std::mutex               cv_mt_;
std::condition_variable  cv_lock_;

分别表示互斥量和条件变量,用来控制线程的休眠和唤醒。

那我们实现start函数

void start() {
    for (int i = 0; i < thread_num_; ++i) {
        pool_.emplace_back([this]() {
            while (!this->stop_.load()) {
                Task task;
                {
                    std::unique_lock<std::mutex> cv_mt(cv_mt_);
                    this->cv_lock_.wait(cv_mt, [this] {
                        return this->stop_.load() || !this->tasks_.empty();
                            });
                     if (this->tasks_.empty())
                        return;

                    task = std::move(this->tasks_.front());
                    this->tasks_.pop();
                }
                    this->thread_num_--;
                    task();
                    this->thread_num_++;
            }
        });
    }
}

pool_为线程队列,在线程队列中我们采用emplace_back直接调用线程的构造函数,将线程要处理的逻辑写成lambda表达式,从而构造线程并且将线程插入线程队列中。

lambda表达式内的逻辑先判断是否停止,如果停止则退出循环, 否则继续循环。

循环的逻辑就是每次从队列中取任务,先调用条件变量等待队列不为空,或者收到退出信号,二者只要满足其一,条件变量的wait就返回,并且继续向下走。否则条件变量wait不会返回,线程将挂起。

如果条件变量判断条件满足(队列不为空或者发现停止信号),线程继续向下执行,判断如果任务队列为空则说明是因为收到停止信号所以直接返回退出,否则就说明任务队列有数据,我们取出任务队列头部的task,将空闲线程数减少1,执行task,再将空闲线程数+1.

接下来我们实现析构函数

~ThreadPool() {
    stop();
}

析构函数中的stop就是要向线程发送停止信号,避免线程一直处于挂起状态(因为任务队列为空会导致线程挂起)

void stop() {
    stop_.store(true);
    cv_lock_.notify_all();
    for (auto& td : pool_) {
        if (td.joinable()) {
            std::cout << "join thread " << td.get_id() << std::endl;
            td.join();
        }
    }
}

stop函数中我们将停止标记设置为true,并且调用条件变量的notify_all唤醒所有线程,并且等待所有线程退出后线程池才析构完成。

我们再实现一个函数提供给外部查询当前空闲的线程数,这个功能可有可无,主要是方便外部根据空闲线程数是否达到阈值派发任务。

int idleThreadCount() {
    return thread_num_;
}

我们实现了线程池处理任务的逻辑,接下来我们要封装一个接口提供给外部,支持其投递任务给线程池。

因为我们要投递任务给线程池,任务的功能和参数都不同,而之前我们设置的线程池执行的task类型为void(void),返回值为void,参数为void的任务。那我们可用用参数绑定的方式将一个函数绑定为void(void)类型, 比如我们用如下操作

int functionint(int param) {
    std::cout << "param is " << param << std::endl;
    return 0;
}

void bindfunction() {
    std::function<int(void)> functionv = std::bind(functionint, 3);
    functionv();
}

假设我们希望任务队列里的任务要调用functionint,以及参数为3,因为在投递任务时我们就知道任务要执行的函数和参数,所以我们可以将执行的函数和参数绑定生成参数为void的函数。

我们通过bindfunction将functionint绑定为一个返回值为int,参数为void的新函数functionv。而我们的任务队列要放入返回值为void,参数也为void的函数,该怎么办呢?

其实很简单,我们可以利用lambda表达式生成一个返回值和参数都为void的函数,函数内部调用functionv即可,有点类似于go,python等语言的闭包,但是C++的闭包是一种伪闭包,需要用值的方式捕获用到的变量。

比如我们将上面的函数functionint和调用的参数3打包放入队列,可以这么写

void pushtasktoque() {
    std::function<int(void)> functionv = std::bind(functionint, 3);
    using Task = std::packaged_task<void()>;
    std::queue<Task> taskque;
    taskque.emplace([functionv]() {
        functionv();
        });
}

我们先将functionint绑定为functionv,然后定义一个队列存储的类型为std::packaged_task<void()>, 为了防止拷贝构造的开销,我们调用队列的emplace函数,该函数接受lambda表达式直接构造任务放入了队列里。因为lambda表达式捕获了functionv的值,所以可以在内部调用functionv。

lambda表达式返回值为void参数也为void,所以可以直接放入任务队列。

接下来要一个问题,一个问题是我们投递任务,有时候投递方需要获取任务是否完成, 那我们可以利用packaged_task返回一个future给调用方,调用方在外部就可以通过future判断任务是否返回了。我们修改上面的函数,实现commit任务的函数

std::future<int> committask() {
    std::function<int(void)> functionv = std::bind(functionint, 3);
    auto taskf = std::make_shared<std::packaged_task<int(void)>>(functionv);
    auto res = taskf->get_future();
    using Task = std::packaged_task<void()>;
    std::queue<Task> taskque;
    taskque.emplace([taskf]() {
        (*taskf)();
        });

    return res;
}

我们将functionv传递给packaged_task构造函数,构造了一个packaged_task类型的智能指针,每个人的编程风格不同,大家也可以不用智能指针,直接使用packaged_task对象,比如下面的

std::packaged_task<int(void)> taskf(functionv);

我构造的是packaged_task类型的智能指针,所以通过taskf->get_future()获取future对象res,这个res作为参数返回给外部,外部就可以通过res判断任务是否完成。

接下来我们定义了一个任务队列,任务队列调用emplace直接构造任务插入队列中,避免拷贝开销。参数为lambda表达式,lamba捕获taskf对象的值,在内部调用(*taskf)()完成任务调用。

上面只是通过具体的函数和参数实现了投递任务的功能,而实际情况是我们要投递各种类型的任务,以及多种类型和多个参数,该怎么实现committask函数更通用呢?

对于更通用的设计我们通常采用模板

template <class F, class... Args>
std::future<int> commit(F&& f, Args&&... args){
    //....
    return std::future<int>();
}

上面的模板定义了两个类型,F表示可调用对象类型,可以是lambda表达式,函数,function类等, Args为可变参数模板,可以是任意种类的类型,任意数量。commit函数参数采用F和Args的右值引用,这种模板类型的右值引用也被称作万能引用类型,可以接受左值引用,也可接受右值引用,利用引用折叠技术,可以推断出f和args的最终类型。我在基础课程里讲过,这里再给大家复习一下折叠规则,假设T为模板类型,推到规则如下:

T& & => T&

T& && => T&

T&& & => T&

T&& && => T&&

总结一下,就是只要出现了左值引用最后折叠的结果都是左值引用,只有右值应用和右值引用折叠才能变成右值引用。

template<typename T>
void Function(T&& t){
    //...
}

int main(){
    int a = 3;
    Function(a);
    Function(3);
    return 0;
}

当我们把一个int类型的左值a传递给 Function的 T&& 参数t时(T为模板类型), T被推导为int & , 那么参数t整体的类型就变为int & && => int &类型,也就是左值引用类型。

当我们把一个右值3传递给Function的T&& 参数t时,T被推导为int类型。t被推导为int && 类型,也就是右值引用类型。

如果大家熟悉boost库,可以用boost库的type_id_with_cvr打印具体类型,比如我们下面的代码

#include <boost/type_index.hpp>
using boost::typeindex::type_id_with_cvr;

int functionint(int param) {
    std::cout << "param is " << param << std::endl;
    return 0;
}

template <class F, class... Args>
std::future<int> commit(F&& f, Args&&... args) {
    //....
        // 利用Boost库打印模板推导出来的 T 类型
    std::cout << "F type:" << type_id_with_cvr<F>().pretty_name() << std::endl;

    // 利用Boost库打印形参的类型
    std::cout << "f type:" << type_id_with_cvr<decltype(f)>().pretty_name() << std::endl;

    std::cout << "Args type:" << type_id_with_cvr<Args...>().pretty_name() << std::endl;

    std::cout << "args type:" << type_id_with_cvr<decltype(args)...>().pretty_name() << std::endl;

    return std::future<int>();
}

void reference_collapsing(){
    int a = 3;
    commit(functionint, a);
}

调用reference_collapsing函数输出如下

F type:int (__cdecl&)(int)
f type:int (__cdecl&)(int)
Args type:int & __ptr64
args type:int & __ptr64

可以看出F和f的类型都为函数对象的左值引用类型int (__cdecl&)(int),因为可变参数列表只有一个int左值类型,所以Args被推导为int &类型, 同样的道理args也是int &类型。

那如果我们换一种方式调用

void reference_collapsing2(){
    commit(std::move(functionint), 3);
}

调用reference_collapsing2输出如下

F type:int __cdecl(int)
f type:int (__cdecl&&)(int)
Args type:int
args type:int && __ptr64

F为函数对象类型int __cdecl(int), f被对段位函数对象的右值引用类型int (__cdecl&&)(int)

Args 被推断为int类型, args被推断为int && 类型。

所以我们就可以得出之前给大家的结论,对于模板类型参数T && , 编译器会根据传入的类型为左值还是右值,将T 推断为不同的类型, 如果传入的类型为int类型的左值,则T为int&类型,如果传入的类型为int类型的右值,则T为int类型。

模板参数介绍完了,还要介绍一下原样转发, 熟悉我视频风格的读者都知道在介绍正确做法前我会先介绍错误示范,我们先看下面的例子

void use_rightref(int && rparam) {
    //....
}

template<typename T>
void use_tempref(T&& tparam) {
    use_rightref(tparam);
}

void test_tempref() {
    use_tempref(3);
}

我先给大家介绍下上面代码的调用流程,我们在test_tempref里调用use_tempref, 参数3是一个右值,所以use_tempref中T被推断为int类型, tparam为int && 类型。我们接着将tparam传递给use_rightref,tparam是int && 类型,刚好可以传递给use_rightref,然而上面的代码会报错。

“void use_rightref(int &&)”: 无法将参数 1 从“T”转换为“int &&”

报错的原因是我们将tparam传递给use_rightref的时候参数类型不匹配。在use_tempref中,tparam为int && 类型,即int 的右值引用类型。但是将tparam传递给use_rightref时,tparam是作为左值传递的, 他的类型是int && 类型,但是在函数use_tempref中tparam可以作为左值使用。这么说大家有点难理解

我们分开理解,左值和右值的区别

左值(lvalue) 是指表达式结束后依然存在的、可被取地址的数据。通俗地说,左值就是可以放在赋值符号左边的值。

右值(rvalue) 是指表达式结束后就不再存在的临时数据。通常是不可被取地址的临时值,例如常量、函数返回值、表达式计算结果等。在 C++11 之后,右值引用的引入使得我们可以直接操作右值。

我们看下面的代码

template<typename T>
void use_tempref(T&& tparam) {
    int a = 4;
    tparam = a;
    tparam = std::move(a);
}

void test_tempref() {
    use_tempref(3);
}

上述代码编译没有问题可以运行,tparam可以作为左值被赋值。所以当它作为参数传递给其他函数的时候,它也是作为左值使用的,那么传递给use_rightref时,就会出现int&& 绑定左值的情况,这在编译阶段是不允许的。

下面这种tparam也是被作为左值使用

void use_tempref(int && tparam) {
    int a = 4;
    tparam = a;
    tparam = std::move(a);
}

void test_tempref() {
    use_tempref(3);
}

上面代码编译也会通过的。

那么我们接下来要解决tparam作为左值传递给use_rightref报错的问题,C++ 给我们提供了原样转发功能,这个在基础中也给大家介绍过, C++ 源码对于forward的实现有两个版本,分别是将一个左值转化为一个左值或者右值,以及将一个右值转化为一个右值。

template <class _Ty>
_NODISCARD constexpr _Ty&& forward(
    remove_reference_t<_Ty>& _Arg) noexcept { // forward an lvalue as either an lvalue or an rvalue
    return static_cast<_Ty&&>(_Arg);
}

template <class _Ty>
_NODISCARD constexpr _Ty&& forward(remove_reference_t<_Ty>&& _Arg) noexcept { // forward an rvalue as an rvalue
    static_assert(!is_lvalue_reference_v<_Ty>, "bad forward call");
    return static_cast<_Ty&&>(_Arg);
}

因为实现了两个版本,所以forward会根据传递的是左值调用第一个版本,传递的是右值调用第二个版本。

我们看看remove_reference_t<_Ty>的源码

template <class _Ty>
struct remove_reference<_Ty&> {
    using type                 = _Ty;
    using _Const_thru_ref_type = const _Ty&;
};

template <class _Ty>
struct remove_reference<_Ty&&> {
    using type                 = _Ty;
    using _Const_thru_ref_type = const _Ty&&;
};

template <class _Ty>
using remove_reference_t = typename remove_reference<_Ty>::type;

我们通过观察就会发现remove_reference_t<_Ty>其实是去除了_Ty中的引用返回内部的type.

所以我们forward(3)时,执行forward(remove_reference_t<_Ty>&& _Arg), _Ty为int && 类型,remove_reference_t<_Ty>为int类型. 返回的为static_cast<_Ty&&>(_Arg)类型,即int && &&类型,折叠一下变为int &&类型。

同样当我们forward(a),比如a是一个int类型的左值,则执行_Ty&& forward(remove_reference_t<_Ty>& _Arg), _Ty为int &类型, remove_reference_t<_Ty>为int类型, 返回值为static_cast<_Ty&&>(_Arg) ,即int & && 类型折叠为int &类型。

所以有了这些知识,我们解决上面的编译错误可以这么干

void use_rightref(int && rparam) {
    //....
}

template<typename T>
void use_tempref(T&& tparam) {
    use_rightref(std::forward<T>(tparam));
}

void test_tempref() {
    use_tempref(3);
}

接下来我们回到线程池的话题,commit函数需要返回future对象,但是我们又无法在函数定义的时候提前写好返回值future的类型,那怎么办呢?

可以用到C++ 11的一个技术就是尾置推导

template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> 
        std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))> {
        using RetType = decltype(std::forward<F>(f)(std::forward<Args>(args)...));

        return std::future<RetType>{};
}

我们在commit函数返回值写成了auto,告诉编译器具体的返回类型在其后,这样编译器在加载完函数的参数f和args之后,可以推导返回值类型.

推导也很简单,我们通过decltype(std::forward<F>(f)(std::forward<Args>(args)...)), decltype会根据根据表达式推断表达式的结果类型,我们用future存储这个类型,这个future就是返回值类型。

decltype中我们用了forward原样转发f和args,其实f不用转发,因为我们调用f是按照左值调用的,至于args原样转发是考虑f接受的参数可能是一个右值,但是这种情况其实不多,所以对于普通情形,我们写成decltype(f(args...))没问题的。

因为推导的类型我们以后还会用到,所以用了RetType来记录这个类型。

接下来我们给出commit的完整代码

template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> 
std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))> {
    using RetType = decltype(std::forward<F>(f)(std::forward<Args>(args)...));
    if (stop_.load())
        return std::future<RetType>{};

    auto task = std::make_shared<std::packaged_task<RetType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));

    std::future<RetType> ret = task->get_future();
    {
        std::lock_guard<std::mutex> cv_mt(cv_mt_);
        tasks_.emplace([task] { (*task)(); });
    }
    cv_lock_.notify_one();
    return ret;
}

在commit中我们生成一个packaged_task类型的智能指针task,通过task获取future.

接下来我们加锁并且将task放入队列,但是因为task的返回类型为RetType,所以我们采用了lambda表达式捕获task,内部调用task,将这个lambda表达式放入任务队列。

然后通知其他线程唤醒,并且返回future。

测试

为了测试线程池,我们可以用前文实现的快速排序的方法,将任务分段递归投递给线程池,让线程池排序

template<typename T>
std::list<T>pool_thread_quick_sort(std::list<T> input) {
    if (input.empty())
    {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const& partition_val = *result.begin();
    typename std::list<T>::iterator divide_point =
        std::partition(input.begin(), input.end(),
            [&](T const& val) {return val < partition_val; });
    std::list<T> new_lower_chunk;
    new_lower_chunk.splice(new_lower_chunk.end(),
        input, input.begin(),
        divide_point);

    std::future<std::list<T> > new_lower = ThreadPool::instance().commit(pool_thread_quick_sort<T>, new_lower_chunk);

    std::list<T> new_higher(pool_thread_quick_sort(input));
    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower.get());
    return result;
}

我们再写一个测试用例

void TestThreadPoolSort() {
    std::list<int> nlist = { 6,1,0,5,2,9,11 };

    auto sortlist = pool_thread_quick_sort<int>(nlist);

    for (auto& value : sortlist) {
        std::cout << value << " ";
    }

    std::cout << std::endl;
}

结果输出

0 1 2 5 6 9 11

总结

本文介绍线程池的原理,并实现了线程池

源码链接:

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day22-ThreadPool

视频链接:

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

results matching ""

    No results matching ""