简介

前文介绍了几种数据划分的方式,包括按照线程数量划分,按照递归方式划分,以及按照任务类型划分等。

本文结合之前的划分方式,基于stl的find, for_each以及partial_sum等算法实现并行版本。

并行版本for_each

实现并行的for_each,最简单的方式就是将数据划分,每个线程分别处理一段连续的数据即可。

在介绍并行版本之前,我们先实现一个管理线程 的类join_threads,用来管控线程防止线程过早退出

class join_threads
{
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) :
        threads(threads_)
    {}
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); ++i)
        {
            if (threads[i].joinable())
                threads[i].join();
        }
    }
};

接下来我们实现第一种方式

template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
{
    unsigned long const length = std::distance(first, last);
    if (!length)
        return;
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads =
        (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads =
        std::thread::hardware_concurrency();
    unsigned long const num_threads =
        std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;
    std::vector<std::future<void>> futures(num_threads - 1);   //⇽-- - 1
        std::vector<std::thread> threads(num_threads - 1);
    join_threads joiner(threads);
    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_threads - 1); ++i)
    {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);
        std::packaged_task<void(void)> task( // ⇽-- - 2
            [=]()
        {
            std::for_each(block_start, block_end, f);
        });
        futures[i] = task.get_future();
        threads[i] = std::thread(std::move(task));    //⇽-- - 3
            block_start = block_end;
    }
    std::for_each(block_start, last, f);
    for (unsigned long i = 0; i < (num_threads - 1); ++i)
    {
        futures[i].get();   // ⇽-- - 4
    }
}

1 我们规定如果处理的数量不超过25个则用单线程。否则根据处理的数量划分任务,计算开辟的线程数,如果要开辟的线程数大于内核线程的数量,则以内核线程数为准。

2 根据实际开辟的线程数num_threads计算每个线程处理的块大小。并且初始化两个vector,分别用来存储处理结果的future和处理任务的线程。

3 我们在(2处)代码生成了一个任务task,然后获取future赋值给vector对应下标为i的future元素,并且把任务绑定给对应下标为i的thread。

4 numthreads-1个线程并行处理for_each,剩下的主线程处理余下的for_each,最后通过futures.get汇总

第二种划分方式是我们采取递归的方式,我们知道采用递归的方式无法提前开辟准确数量的线程,我们采用async帮我们完成这个任务

template<typename Iterator, typename Func>
void async_for_each(Iterator first, Iterator last, Func f)
{
    unsigned long const length = std::distance(first, last);
    if (!length)
        return;
    unsigned long const min_per_thread = 25;
    if (length < (2 * min_per_thread))
    {
        std::for_each(first, last, f);    //⇽-- - 1
    }
    else
    {
        Iterator const mid_point = first + length / 2;
        //⇽-- - 2
        std::future<void> first_half =   std::async(&async_for_each<Iterator, Func>,
                first, mid_point, f);
        //⇽-- - 3
        async_for_each(mid_point, last, f); 
        // ⇽-- - 4
        first_half.get();   
    }
}

async可以帮助我们判断是否需要开启线程还是自动串行执行。每次我们将要处理的数据一分为2,前半部分交给一个async开辟线程处理,后半部分在本线程处理。而所谓的本线程不一定是主线程,因为我们通过async递归执行parallel_for_each,也就相当于在一个线程里独立执行了。

find的并行实现

find 的并行查找方式还是分两种,一种是将要查找的区间划分为几个段,每段交给一个线程查找。

另一种是采用递归的方式每次折半,前半部分交给一个线程查找,后半部分留在本线程查找。

我们先说第一种

find比较特殊,我们要防止线程忙等待,也要防止线程在其他线程已经查找到值后做无谓的浪费。可以用一个共享的全局atomic变量表示是否找到目标。

因为主线程要获取某个线程查找到的迭代器位置,所以我们用promise 设置 value为迭代器

template<typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match)
{
    struct find_element    //⇽-- - 1
    {
        void operator()(Iterator begin,Iterator end,
                        MatchType match,
                        std::promise<Iterator>*result,
                        std::atomic<bool>*done_flag)
        {
            try
            {
                for (; (begin != end) && !done_flag->load(); ++begin)    //⇽-- - 2
                {
                    if (*begin == match)
                    {
                        result->set_value(begin);    //⇽-- - 3
                        done_flag->store(true);    //⇽-- - 4
                        return;
                    }
                }
            }
            catch (...)    //⇽-- - 5
            {
                try
                {
                    result->set_exception(std::current_exception());    //⇽-- - 6
                    done_flag->store(true);
                }
                catch (...)    //⇽-- - 7
                {}
            }
        }
    };
    unsigned long const length = std::distance(first, last);
    if (!length)
        return last;
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads = std::thread::hardware_concurrency();
    unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;
    std::promise<Iterator> result;    //⇽-- - 8
    std::atomic<bool> done_flag(false);     //⇽-- - 9
    std::vector<std::thread> threads(num_threads - 1); //⇽-- - 10
    {    
        join_threads joiner(threads);
        Iterator block_start = first;
        for (unsigned long i = 0; i < (num_threads - 1); ++i)
        {
            Iterator block_end = block_start;
            std::advance(block_end, block_size);
            // ⇽-- - 11
            threads[i] = std::thread(find_element(),  block_start, block_end, match, &result, &done_flag);
            block_start = block_end;
        }
        // ⇽-- - 12
        find_element()(block_start, last, match, &result, &done_flag);   
    }

    // ⇽-- - 13
    if (!done_flag.load())   
    {
        return last;
    }
    //⇽-- - 14
    return result.get_future().get();    
}

1 find_element重载了()运算符,接受四个参数,分别是迭代器的开始,迭代起的结束,要查找的数值,以及用来通知外部的promise,还有线程之间用来检测是否有某个线程完成查找的原子变量。

2 find_element重载()的逻辑就是查找这个区间内满足某个值的位置,并将这个位置的迭代起设置到promise中,然后将完成的原子变量标记为true。

说第二种方式,利用递归折半查找,我们可以用async帮助我们完成并行任务。

template<typename Iterator, typename MatchType>
Iterator parallel_find_impl(Iterator first, Iterator last, MatchType match,
    std::atomic<bool>& done)   // ⇽-- - 1
{
    try
    {
        unsigned long const length = std::distance(first,last);
        unsigned long const min_per_thread = 25;   // ⇽-- - 2
        if (length < (2 * min_per_thread))    //⇽-- - 3
        {
            for (; (first != last) && !done.load(); ++first)     //⇽-- - 4
            {
                if (*first == match)
                {
                    done = true;    //⇽-- - 5
                    return first;
                }
            }
            return last;    //⇽-- - 6
        }
        else
        {
            //⇽-- - 7
            Iterator const mid_point = first + (length / 2);   
            //⇽-- - 8
            std::future<Iterator> async_result = std::async(&parallel_find_impl<Iterator,MatchType>,    
                           mid_point,last,match,std::ref(done));
            //⇽-- - 9
            Iterator const direct_result = parallel_find_impl(first,mid_point,match,done); 
            //⇽-- - 10
            return (direct_result == mid_point) ?async_result.get() : direct_result;    
        }
    }
    catch (...)
    {
        // ⇽-- - 11
        done = true;   
        throw;
    }
}
template<typename Iterator, typename MatchType>
Iterator parallel_find_async(Iterator first, Iterator last, MatchType match)
{
    std::atomic<bool> done(false);
    //⇽-- - 12
    return parallel_find_impl(first, last, match, done);    
}

1 并行查找的方式种我们先根据长度是否小于50决定是否开启并行任务,如果小于50则采取单线程方式。

2 如果采用并行的方式,我们将长度折半,前半部分交给async,后半部分交给本线程。

3 最后我们在主线程中汇合,获取结果。

partial_sum并行版本

C++ 提供了累计计算求和的功能,比如一个vector中存储的数据为{1,2,3},那么经过计算,第一个元素仍然为1,第二个元素为1+2, 第三个元素为1+2+3,结果为{1,3,6}.

关于并行版本我们可以这么思考,假设元数组为{1,2,3,4,5,6,7},那我们可以划分为三个部分,第一部分为{1,2,3}交给第一个线程处理, 第二部分{4,5,6}交给第二个线程处理,7交给本线程处理。

但是我们要考虑的一个问题是线程2要用到线程1最后计算的结果,线程1计算后{1,3,6},线程2需要用到6做累加,我们可以先让线程1计算出第3个元素值6,再将这个6传递给线程2,剩下的就可以并行计算了。同样的道理本线程要处理最后一个元素的累加结果,他需要等到线程2处理完第6个元素的值。

所以基本思路是每个线程优先处理分区的最后一个元素,通过promise设置给其他线程,在这个阶段线程之间是串行的,等到所有线程都开始计算其他位置后就是并行了。

template<typename Iterator>
void parallel_partial_sum(Iterator first, Iterator last)
{
    typedef typename Iterator::value_type value_type;

    struct process_chunk    //⇽-- - 1
    {
        void operator()(Iterator begin, Iterator last,
            std::future<value_type>* previous_end_value,
            std::promise<value_type>* end_value)
        {
            try
            {
                Iterator end = last;
                ++end;
                std::partial_sum(begin, end, begin);    //⇽-- - 2
                if (previous_end_value)    //⇽-- - 3
                {
                    value_type addend = previous_end_value->get();   // ⇽-- - 4
                    *last += addend;   // ⇽-- - 5
                    if (end_value)
                    {
                        end_value->set_value(*last);    //⇽-- - 6
                    }
                    // ⇽-- - 7
                    std::for_each(begin, last, [addend](value_type& item)
                        {
                            item += addend;
                        });
                }
                else if (end_value)
                {
                    // ⇽-- - 8
                    end_value->set_value(*last);
                }
            }
            catch (...)  // ⇽-- - 9
            {
                if (end_value)
                {
                    end_value->set_exception(std::current_exception());   // ⇽-- - 10
                }
                else
                {
                    throw;   // ⇽-- - 11
                }

            }
        }
    };
        unsigned long const length = std::distance(first, last);

        if (!length) {
            return;
        }
        unsigned long const min_per_thread = 25;     //⇽-- - 12
        unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
        unsigned long const hardware_threads = std::thread::hardware_concurrency();
        unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
        unsigned long const block_size = length / num_threads;
        typedef typename Iterator::value_type value_type;

        std::vector<std::thread> threads(num_threads - 1);   // ⇽-- - 13

        std::vector<std::promise<value_type> > end_values(num_threads - 1);   // ⇽-- - 14

        std::vector<std::future<value_type> > previous_end_values;   // ⇽-- - 15
        previous_end_values.reserve(num_threads - 1);   // ⇽-- - 16
        join_threads joiner(threads);
        Iterator block_start = first;
        for (unsigned long i = 0; i < (num_threads - 1); ++i)
        {
            Iterator block_last = block_start;
            std::advance(block_last, block_size - 1);   // ⇽-- - 17
            // ⇽-- - 18
            threads[i] = std::thread(process_chunk(), block_start, block_last,
                (i != 0) ? &previous_end_values[i - 1] : 0,
                &end_values[i]);
            block_start = block_last;
            ++block_start;   // ⇽-- - 19
            previous_end_values.push_back(end_values[i].get_future());   // ⇽-- - 20
        }
        Iterator final_element = block_start;
        std::advance(final_element, std::distance(block_start, last) - 1);   // ⇽-- - 21
        // ⇽-- - 22
        process_chunk()(block_start, final_element, (num_threads > 1) ? &previous_end_values.back() : 0,
            0);

}

1 定义了process_chunk类,重载了()运算符,在重载的逻辑里我们先计算区间内的partial_sum累计求和(2处)

2 因为我们处理的区间不一定是首个区间,也就是他还需要加上前面区间处理得出的最后一个元素的值,所以我们通过previouse_end_value判断本区间不是首个区间,并且加上前面处理的结果。优先将最后一个值计算出来设置给promise。然后在利用for_each遍历计算其他位置的值。

总结

本文介绍了如何并行设计stl的相关算法,读者有好的思路可以互相交流一下。

测试代码和项目代码链接:

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day21-ParallenAlgorithm

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

results matching ""

    No results matching ""