简介
前几篇文章陆续介绍了线程池(ThreadPool),可汇合线程(join_thread)等技术,其中也用到了当管理类要退出时会通过条件变量唤醒挂起的线程,然后等待其执行完退出。本文按照作者的思路补充设计可中断的线程。
可中断线程
一个可中断的线程大体的实现是这个样子的
class interruptible_thread
{
std::thread internal_thread;
interrupt_flag* flag;
public:
template<typename FunctionType>
interruptible_thread(FunctionType f)
{
//⇽-- - 2
std::promise<interrupt_flag*> p;
//⇽-- - 3
internal_thread = std::thread([f, &p] {
p.set_value(&this_thread_interrupt_flag);
//⇽-- - 4
f();
});
//⇽-- - 5
flag = p.get_future().get();
}
void join() {
internal_thread.join();
}
void interrupt()
{
if (flag)
{
//⇽-- - 6
flag->set();
}
}
};
- interrupt_flag 为中断标记,其set操作用来标记中断
- internal_thread为内部线程,其回调函数内部先设置
interrupt_flag*
类型的promise值,再执行回调函数。 - 在interruptible_thread构造函数中等待internal_thread回调函数内部设置好flag的promise值后再退出。
- this_thread_interrupt_flag是我们定义的线程变量
thread_local interrupt_flag this_thread_interrupt_flag;
中断标记
中断标记interrupt_flag类,主要是用来设置中断标记和判断是否已经中断,有可能挂起在条件变量的wait操作上,此时中断就需要唤醒挂起的线程。
为了扩充功能,我们希望设计接口支持在任何锁上等待,那我们使用condition_variable_any
支持任意类型的条件变量。
class interrupt_flag
{
std::atomic<bool> flag;
std::condition_variable* thread_cond;
std::condition_variable_any* thread_cond_any;
std::mutex set_clear_mutex;
public:
interrupt_flag() :
thread_cond(0), thread_cond_any(0)
{}
void set()
{
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if (thread_cond)
{
thread_cond->notify_all();
}
else if (thread_cond_any) {
thread_cond_any->notify_all();
}
}
bool is_set() const
{
return flag.load(std::memory_order_relaxed);
}
void set_condition_variable(std::condition_variable& cv)
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond = &cv;
}
void clear_condition_variable()
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond = 0;
}
template<typename Lockable>
void wait(std::condition_variable_any& cv, Lockable& lk) {
struct custom_lock {
interrupt_flag* self;
Lockable& lk;
custom_lock(interrupt_flag* self_, std::condition_variable_any& cond, Lockable& lk_) :
self(self_), lk(lk_) {
self->set_clear_mutex.lock();
self->thread_cond_any = &cond;
}
void unlock() {
lk.unlock();
self->set_clear_mutex.unlock();
}
void lock() {
std::lock(self->set_clear_mutex, lk);
}
~custom_lock() {
self->thread_cond_any = 0;
self->set_clear_mutex.unlock();
}
};
custom_lock cl(this, cv, lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
};
- set函数将停止标记设置为true,然后用条件变量通知挂起的线程。
- set_condition_variable 设置flag关联的条件变量,因为需要用指定的条件变量通知挂起的线程。
- clear_condition_variable清除关联的条件变量
- wait操作封装了接受任意锁的等待操作,wait函数内部定义了custom_lock,封装了加锁,解锁等操作。
- wait操作内部构造了custom_lock对象cl主要是对set_clear_mutex加锁,然后在调用cv.wait,这样能和set函数中的通知条件变量构成互斥,这么做的好处就是要么先将flag设置为true并发送通知,要么先wait,然后再发送通知。这样避免了线程在wait处卡死(线程不会错过发送的通知)
interruption_point函数内部判断flag是否为true,如果为true则抛出异常,这里作者处理的突兀了一些。读者可将这个函数改为bool返回值,调用者根据返回值判断是否继续等都可以。
void interruption_point()
{
if (this_thread_interrupt_flag.is_set())
{
throw thread_interrupted();
}
}
thread_interrupted为我们自定义的异常
class thread_interrupted : public std::exception
{
public:
thread_interrupted() : message("thread interrupted.") {}
~thread_interrupted() throw () {
}
virtual const char* what() const throw () {
return message.c_str();
}
private:
std::string message;
};
接下来定义一个类clear_cv_on_destruct
struct clear_cv_on_destruct {
~clear_cv_on_destruct(){
this_thread_interrupt_flag.clear_condition_variable();
}
};
clear_cv_on_destruct 这个类主要是用来在析构时释放和flag关联的条件变量。
除此之外,我们还可以封装几个不同版本的等待 支持普通条件变量的等待
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
clear_cv_on_destruct guard;
interruption_point();
cv.wait_for(lk, std::chrono::milliseconds(1));
interruption_point();
}
支持谓词的等待
template<typename Predicate>
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk,
Predicate pred)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
clear_cv_on_destruct guard;
while (!this_thread_interrupt_flag.is_set() && !pred())
{
cv.wait_for(lk, std::chrono::milliseconds(1));
}
interruption_point();
}
上面两个版本采用wait_for而不用wait是因为如果等待之前条件变量的通知已经发送,线程之后才调用wait就会发生死等,所以这里采用的wait_for
支持future的等待
template<typename T>
void interruptible_wait(std::future<T>& uf)
{
while (!this_thread_interrupt_flag.is_set())
{
if (uf.wait_for(std::chrono::milliseconds(1)) ==
std::future_status::ready)
break;
}
interruption_point();
}
接下来我们用案例测试上面的案例
#include <iostream>
#include "interupthread.h"
std::vector<interruptible_thread> background_threads;
std::mutex mtx1;
std::mutex mtx2;
std::condition_variable cv1;
std::condition_variable_any cv2;
void start_background_processing() {
background_threads.push_back([]() {
try {
std::unique_lock<std::mutex> lock(mtx1);
interruptible_wait(cv1, lock);
}
catch (std::exception& ex) {
std::cout << "catch exception is " << ex.what() << std::endl;
}
});
background_threads.push_back([]() {
try {
std::unique_lock<std::mutex> lock(mtx2);
this_thread_interrupt_flag.wait(cv2, mtx2);
}
catch (std::exception& ex) {
std::cout << "catch exception is " << ex.what() << std::endl;
}
});
}
int main()
{
start_background_processing();
for (unsigned i = 0; i < background_threads.size(); i++) {
background_threads[i].interrupt();
}
for (unsigned i = 0; i < background_threads.size(); i++) {
background_threads[i].join();
}
}
上面的案例中启动了两个线程,每个线程回调函数中调用我们封装的可中断的等待。在主函数中断两个线程,并测试两个线程能否在等待中中断。
程序输出
catch exception is thread interrupted.
catch exception is thread interrupted.
总结
本文介绍了中断线程的设计,说简单点还是设置终止标记为true,利用条件变量通知挂起的线程唤醒。
源码链接:
https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day23-interupthread
视频链接:
https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290