简介
前文介绍了几种数据划分的方式,包括按照线程数量划分,按照递归方式划分,以及按照任务类型划分等。
本文结合之前的划分方式,基于stl的find, for_each以及partial_sum等算法实现并行版本。
并行版本for_each
实现并行的for_each,最简单的方式就是将数据划分,每个线程分别处理一段连续的数据即可。
在介绍并行版本之前,我们先实现一个管理线程 的类join_threads,用来管控线程防止线程过早退出
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_)
{}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); ++i)
{
if (threads[i].joinable())
threads[i].join();
}
}
};
接下来我们实现第一种方式
template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
{
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<void>> futures(num_threads - 1); //⇽-- - 1
std::vector<std::thread> threads(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void(void)> task( // ⇽-- - 2
[=]()
{
std::for_each(block_start, block_end, f);
});
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task)); //⇽-- - 3
block_start = block_end;
}
std::for_each(block_start, last, f);
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
futures[i].get(); // ⇽-- - 4
}
}
1 我们规定如果处理的数量不超过25个则用单线程。否则根据处理的数量划分任务,计算开辟的线程数,如果要开辟的线程数大于内核线程的数量,则以内核线程数为准。
2 根据实际开辟的线程数num_threads计算每个线程处理的块大小。并且初始化两个vector,分别用来存储处理结果的future和处理任务的线程。
3 我们在(2处)代码生成了一个任务task,然后获取future赋值给vector对应下标为i的future元素,并且把任务绑定给对应下标为i的thread。
4 numthreads-1个线程并行处理for_each,剩下的主线程处理余下的for_each,最后通过futures.get汇总
第二种划分方式是我们采取递归的方式,我们知道采用递归的方式无法提前开辟准确数量的线程,我们采用async帮我们完成这个任务
template<typename Iterator, typename Func>
void async_for_each(Iterator first, Iterator last, Func f)
{
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
if (length < (2 * min_per_thread))
{
std::for_each(first, last, f); //⇽-- - 1
}
else
{
Iterator const mid_point = first + length / 2;
//⇽-- - 2
std::future<void> first_half = std::async(&async_for_each<Iterator, Func>,
first, mid_point, f);
//⇽-- - 3
async_for_each(mid_point, last, f);
// ⇽-- - 4
first_half.get();
}
}
async可以帮助我们判断是否需要开启线程还是自动串行执行。每次我们将要处理的数据一分为2,前半部分交给一个async开辟线程处理,后半部分在本线程处理。而所谓的本线程不一定是主线程,因为我们通过async递归执行parallel_for_each,也就相当于在一个线程里独立执行了。
find的并行实现
find 的并行查找方式还是分两种,一种是将要查找的区间划分为几个段,每段交给一个线程查找。
另一种是采用递归的方式每次折半,前半部分交给一个线程查找,后半部分留在本线程查找。
我们先说第一种
find比较特殊,我们要防止线程忙等待,也要防止线程在其他线程已经查找到值后做无谓的浪费。可以用一个共享的全局atomic变量表示是否找到目标。
因为主线程要获取某个线程查找到的迭代器位置,所以我们用promise 设置 value为迭代器
template<typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match)
{
struct find_element //⇽-- - 1
{
void operator()(Iterator begin,Iterator end,
MatchType match,
std::promise<Iterator>*result,
std::atomic<bool>*done_flag)
{
try
{
for (; (begin != end) && !done_flag->load(); ++begin) //⇽-- - 2
{
if (*begin == match)
{
result->set_value(begin); //⇽-- - 3
done_flag->store(true); //⇽-- - 4
return;
}
}
}
catch (...) //⇽-- - 5
{
try
{
result->set_exception(std::current_exception()); //⇽-- - 6
done_flag->store(true);
}
catch (...) //⇽-- - 7
{}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length)
return last;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::promise<Iterator> result; //⇽-- - 8
std::atomic<bool> done_flag(false); //⇽-- - 9
std::vector<std::thread> threads(num_threads - 1); //⇽-- - 10
{
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size);
// ⇽-- - 11
threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
block_start = block_end;
}
// ⇽-- - 12
find_element()(block_start, last, match, &result, &done_flag);
}
// ⇽-- - 13
if (!done_flag.load())
{
return last;
}
//⇽-- - 14
return result.get_future().get();
}
1 find_element重载了()运算符,接受四个参数,分别是迭代器的开始,迭代起的结束,要查找的数值,以及用来通知外部的promise,还有线程之间用来检测是否有某个线程完成查找的原子变量。
2 find_element重载()的逻辑就是查找这个区间内满足某个值的位置,并将这个位置的迭代起设置到promise中,然后将完成的原子变量标记为true。
说第二种方式,利用递归折半查找,我们可以用async帮助我们完成并行任务。
template<typename Iterator, typename MatchType>
Iterator parallel_find_impl(Iterator first, Iterator last, MatchType match,
std::atomic<bool>& done) // ⇽-- - 1
{
try
{
unsigned long const length = std::distance(first,last);
unsigned long const min_per_thread = 25; // ⇽-- - 2
if (length < (2 * min_per_thread)) //⇽-- - 3
{
for (; (first != last) && !done.load(); ++first) //⇽-- - 4
{
if (*first == match)
{
done = true; //⇽-- - 5
return first;
}
}
return last; //⇽-- - 6
}
else
{
//⇽-- - 7
Iterator const mid_point = first + (length / 2);
//⇽-- - 8
std::future<Iterator> async_result = std::async(¶llel_find_impl<Iterator,MatchType>,
mid_point,last,match,std::ref(done));
//⇽-- - 9
Iterator const direct_result = parallel_find_impl(first,mid_point,match,done);
//⇽-- - 10
return (direct_result == mid_point) ?async_result.get() : direct_result;
}
}
catch (...)
{
// ⇽-- - 11
done = true;
throw;
}
}
template<typename Iterator, typename MatchType>
Iterator parallel_find_async(Iterator first, Iterator last, MatchType match)
{
std::atomic<bool> done(false);
//⇽-- - 12
return parallel_find_impl(first, last, match, done);
}
1 并行查找的方式种我们先根据长度是否小于50决定是否开启并行任务,如果小于50则采取单线程方式。
2 如果采用并行的方式,我们将长度折半,前半部分交给async,后半部分交给本线程。
3 最后我们在主线程中汇合,获取结果。
partial_sum并行版本
C++ 提供了累计计算求和的功能,比如一个vector中存储的数据为{1,2,3},那么经过计算,第一个元素仍然为1,第二个元素为1+2, 第三个元素为1+2+3,结果为{1,3,6}.
关于并行版本我们可以这么思考,假设元数组为{1,2,3,4,5,6,7},那我们可以划分为三个部分,第一部分为{1,2,3}交给第一个线程处理, 第二部分{4,5,6}交给第二个线程处理,7交给本线程处理。
但是我们要考虑的一个问题是线程2要用到线程1最后计算的结果,线程1计算后{1,3,6},线程2需要用到6做累加,我们可以先让线程1计算出第3个元素值6,再将这个6传递给线程2,剩下的就可以并行计算了。同样的道理本线程要处理最后一个元素的累加结果,他需要等到线程2处理完第6个元素的值。
所以基本思路是每个线程优先处理分区的最后一个元素,通过promise设置给其他线程,在这个阶段线程之间是串行的,等到所有线程都开始计算其他位置后就是并行了。
template<typename Iterator>
void parallel_partial_sum(Iterator first, Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_chunk //⇽-- - 1
{
void operator()(Iterator begin, Iterator last,
std::future<value_type>* previous_end_value,
std::promise<value_type>* end_value)
{
try
{
Iterator end = last;
++end;
std::partial_sum(begin, end, begin); //⇽-- - 2
if (previous_end_value) //⇽-- - 3
{
value_type addend = previous_end_value->get(); // ⇽-- - 4
*last += addend; // ⇽-- - 5
if (end_value)
{
end_value->set_value(*last); //⇽-- - 6
}
// ⇽-- - 7
std::for_each(begin, last, [addend](value_type& item)
{
item += addend;
});
}
else if (end_value)
{
// ⇽-- - 8
end_value->set_value(*last);
}
}
catch (...) // ⇽-- - 9
{
if (end_value)
{
end_value->set_exception(std::current_exception()); // ⇽-- - 10
}
else
{
throw; // ⇽-- - 11
}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length) {
return;
}
unsigned long const min_per_thread = 25; //⇽-- - 12
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
typedef typename Iterator::value_type value_type;
std::vector<std::thread> threads(num_threads - 1); // ⇽-- - 13
std::vector<std::promise<value_type> > end_values(num_threads - 1); // ⇽-- - 14
std::vector<std::future<value_type> > previous_end_values; // ⇽-- - 15
previous_end_values.reserve(num_threads - 1); // ⇽-- - 16
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_last = block_start;
std::advance(block_last, block_size - 1); // ⇽-- - 17
// ⇽-- - 18
threads[i] = std::thread(process_chunk(), block_start, block_last,
(i != 0) ? &previous_end_values[i - 1] : 0,
&end_values[i]);
block_start = block_last;
++block_start; // ⇽-- - 19
previous_end_values.push_back(end_values[i].get_future()); // ⇽-- - 20
}
Iterator final_element = block_start;
std::advance(final_element, std::distance(block_start, last) - 1); // ⇽-- - 21
// ⇽-- - 22
process_chunk()(block_start, final_element, (num_threads > 1) ? &previous_end_values.back() : 0,
0);
}
1 定义了process_chunk类,重载了()运算符,在重载的逻辑里我们先计算区间内的partial_sum累计求和(2处)
2 因为我们处理的区间不一定是首个区间,也就是他还需要加上前面区间处理得出的最后一个元素的值,所以我们通过previouse_end_value判断本区间不是首个区间,并且加上前面处理的结果。优先将最后一个值计算出来设置给promise。然后在利用for_each遍历计算其他位置的值。
总结
本文介绍了如何并行设计stl的相关算法,读者有好的思路可以互相交流一下。
测试代码和项目代码链接:
https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day21-ParallenAlgorithm
视频链接
https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290