简介

前几篇文章陆续介绍了线程池(ThreadPool),可汇合线程(join_thread)等技术,其中也用到了当管理类要退出时会通过条件变量唤醒挂起的线程,然后等待其执行完退出。本文按照作者的思路补充设计可中断的线程。

可中断线程

一个可中断的线程大体的实现是这个样子的

class interruptible_thread
{
    std::thread internal_thread;
    interrupt_flag* flag;
public:
    template<typename FunctionType>
    interruptible_thread(FunctionType f)
    {
        //⇽-- - 2
        std::promise<interrupt_flag*> p;  
        //⇽-- - 3
        internal_thread = std::thread([f, &p] {    
            p.set_value(&this_thread_interrupt_flag);
            //⇽-- - 4
            f();    
        });
        //⇽-- - 5
        flag = p.get_future().get();    
    }

    void join() {
        internal_thread.join();
    }
    void interrupt()
    {
        if (flag)
        {
            //⇽-- - 6
            flag->set();    
        }
    }
};
  1. interrupt_flag 为中断标记,其set操作用来标记中断
  2. internal_thread为内部线程,其回调函数内部先设置interrupt_flag*类型的promise值,再执行回调函数。
  3. 在interruptible_thread构造函数中等待internal_thread回调函数内部设置好flag的promise值后再退出。
  4. this_thread_interrupt_flag是我们定义的线程变量thread_local interrupt_flag this_thread_interrupt_flag;

中断标记

中断标记interrupt_flag类,主要是用来设置中断标记和判断是否已经中断,有可能挂起在条件变量的wait操作上,此时中断就需要唤醒挂起的线程。

为了扩充功能,我们希望设计接口支持在任何锁上等待,那我们使用condition_variable_any支持任意类型的条件变量。

class interrupt_flag
{
    std::atomic<bool> flag;
    std::condition_variable* thread_cond;
    std::condition_variable_any* thread_cond_any;
    std::mutex set_clear_mutex;
public:
    interrupt_flag() :
        thread_cond(0), thread_cond_any(0)
    {}
    void set()
    {
        flag.store(true, std::memory_order_relaxed);
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        if (thread_cond)
        {
            thread_cond->notify_all();
        }
        else if (thread_cond_any) {
            thread_cond_any->notify_all();
        }
    }
    bool is_set() const
    {
        return flag.load(std::memory_order_relaxed);
    }
    void set_condition_variable(std::condition_variable& cv)
    {
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        thread_cond = &cv;
    }
    void clear_condition_variable()
    {
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        thread_cond = 0;
    }


    template<typename Lockable>
    void wait(std::condition_variable_any& cv, Lockable& lk) {
        struct custom_lock {
            interrupt_flag* self;
            Lockable& lk;
            custom_lock(interrupt_flag* self_, std::condition_variable_any& cond, Lockable& lk_) :
                self(self_), lk(lk_) {
                self->set_clear_mutex.lock();
                self->thread_cond_any = &cond;
            }

            void unlock() {
                lk.unlock();
                self->set_clear_mutex.unlock();
            }

            void lock() {
                std::lock(self->set_clear_mutex, lk);
            }

            ~custom_lock() {
                self->thread_cond_any = 0;
                self->set_clear_mutex.unlock();
            }
        };

        custom_lock cl(this, cv, lk);
        interruption_point();
        cv.wait(cl);
        interruption_point();
    }
};
  1. set函数将停止标记设置为true,然后用条件变量通知挂起的线程。
  2. set_condition_variable 设置flag关联的条件变量,因为需要用指定的条件变量通知挂起的线程。
  3. clear_condition_variable清除关联的条件变量
  4. wait操作封装了接受任意锁的等待操作,wait函数内部定义了custom_lock,封装了加锁,解锁等操作。
  5. wait操作内部构造了custom_lock对象cl主要是对set_clear_mutex加锁,然后在调用cv.wait,这样能和set函数中的通知条件变量构成互斥,这么做的好处就是要么先将flag设置为true并发送通知,要么先wait,然后再发送通知。这样避免了线程在wait处卡死(线程不会错过发送的通知)

interruption_point函数内部判断flag是否为true,如果为true则抛出异常,这里作者处理的突兀了一些。读者可将这个函数改为bool返回值,调用者根据返回值判断是否继续等都可以。

void interruption_point()
{
    if (this_thread_interrupt_flag.is_set())
    {
        throw thread_interrupted();
    }
}

thread_interrupted为我们自定义的异常

class thread_interrupted : public std::exception
{
public:
    thread_interrupted() : message("thread interrupted.") {}
    ~thread_interrupted() throw () {
    }

    virtual const char* what() const throw () {
        return message.c_str();
    }

private:
    std::string message;
};

接下来定义一个类clear_cv_on_destruct

struct clear_cv_on_destruct {
    ~clear_cv_on_destruct(){
        this_thread_interrupt_flag.clear_condition_variable();
    }
};

clear_cv_on_destruct 这个类主要是用来在析构时释放和flag关联的条件变量。

除此之外,我们还可以封装几个不同版本的等待 支持普通条件变量的等待

void interruptible_wait(std::condition_variable& cv,
    std::unique_lock<std::mutex>& lk)
{
    interruption_point();
    this_thread_interrupt_flag.set_condition_variable(cv);
    clear_cv_on_destruct guard;
    interruption_point();
    cv.wait_for(lk, std::chrono::milliseconds(1));
    interruption_point();
}

支持谓词的等待

template<typename Predicate>
void interruptible_wait(std::condition_variable& cv,
    std::unique_lock<std::mutex>& lk,
    Predicate pred)
{
    interruption_point();
    this_thread_interrupt_flag.set_condition_variable(cv);
    clear_cv_on_destruct guard;
    while (!this_thread_interrupt_flag.is_set() && !pred())
    {
        cv.wait_for(lk, std::chrono::milliseconds(1));
    }
    interruption_point();
}

上面两个版本采用wait_for而不用wait是因为如果等待之前条件变量的通知已经发送,线程之后才调用wait就会发生死等,所以这里采用的wait_for

支持future的等待

template<typename T>
void interruptible_wait(std::future<T>& uf)
{
    while (!this_thread_interrupt_flag.is_set())
    {
        if (uf.wait_for(std::chrono::milliseconds(1)) ==
            std::future_status::ready)
            break;
    }
    interruption_point();
}

接下来我们用案例测试上面的案例

#include <iostream>
#include "interupthread.h"
std::vector<interruptible_thread> background_threads;
std::mutex mtx1;
std::mutex mtx2;
std::condition_variable cv1;
std::condition_variable_any cv2;
void start_background_processing() {
    background_threads.push_back([]() {
        try {
            std::unique_lock<std::mutex> lock(mtx1);
            interruptible_wait(cv1, lock);
        }
        catch (std::exception& ex) {
            std::cout << "catch exception is " << ex.what() << std::endl;
        }

    });

    background_threads.push_back([]() {
        try {
            std::unique_lock<std::mutex> lock(mtx2);
            this_thread_interrupt_flag.wait(cv2, mtx2);
        }
        catch (std::exception& ex) {
            std::cout << "catch exception is " << ex.what() << std::endl;
        }

    });
}

int main()
{
    start_background_processing();
    for (unsigned i = 0; i < background_threads.size(); i++) {
        background_threads[i].interrupt();
    }

    for (unsigned i = 0; i < background_threads.size(); i++) {
        background_threads[i].join();
    }
}

上面的案例中启动了两个线程,每个线程回调函数中调用我们封装的可中断的等待。在主函数中断两个线程,并测试两个线程能否在等待中中断。

程序输出

catch exception is thread interrupted.
catch exception is thread interrupted.

总结

本文介绍了中断线程的设计,说简单点还是设置终止标记为true,利用条件变量通知挂起的线程唤醒。

源码链接:

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day23-interupthread

视频链接:

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

results matching ""

    No results matching ""