C++11 一些 task-based thread 的取值用法 (async, packaged_task, and promise)

目標

計算 4 個 vector int 總和的結果,也就是抓取int long_time_sum(const std::vector<int>& vec) function 的回傳值。

主要的計算 task

模擬一個需要花上 1 second 的 procedure。實際上他會加總 vector 內 int 的 和。
int long_time_sum(const std::vector<int>& vec)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return std::accumulate(vec.begin(), vec.end(), 0, [](int a, int b) {
        return a + b;
    });
}

計算的 data set

準備 4 個含有 100 個 int 的 psudeo 隨機產生的 vector,置於 src
std::vector<std::vector<int>> src = {
  tool::gen_vector(100),
  tool::gen_vector(100),
  tool::gen_vector(100),
  tool::gen_vector(100)
};

...

std::vector<int> gen_vector(std::size_t s)
{
  std::random_device eng;
  std::uniform_int_distribution<> gen(0, 100);
  std::vector<int> vec(s);
  std::generate(vec.begin(), vec.end(), [&]() { return gen(eng); });
  return vec;
}
再傳入 data set 到這五種的發派方式
sequential(src);
concurrent_async(src);
concurrent_byref(src);
concurrent_packaged(src);
concurrent_promise(src);

發派方式

1. Sequential

void sequential(const std::vector<std::vector<int>>& src)
{
    tool::ScopeTimer timer;
    for (const auto& vec : src)
    {
        std::cout << long_time_sum(vec) << std::endl;
    }
}

2. Pass by Reference

void concurrent_byref(const std::vector<std::vector<int>>& src)
{
    tool::ScopeTimer timer;
    auto lambda = [](const std::vector<int>& vec, int& result) {
        result = long_time_sum(vec);
    };
    
    // Dispatch
    std::vector<std::thread> ts;
    std::vector<int> rets(src.size());
    for (std::size_t i = 0; i < src.size(); i++)
    {
        ts.emplace_back(lambda, std::cref(src[i]), std::ref(rets[i]));
    }
    
    // Output
    for (std::size_t i = 0; i < src.size(); i++)
    {
        ts[i].join();
        std::cout << rets[i] << std::endl;
    }
}

3. std::async

void concurrent_async(const std::vector<std::vector<int>>& src)
{
    tool::ScopeTimer timer;
    
    // Dispatch
    std::vector<std::future<int>> fs;
    for (const auto& vec : src)
    {
        fs.push_back(std::async(long_time_sum, std::cref(vec)));
    }
    // Output
    for (auto& f : fs)
    {
        std::cout << f.get() << std::endl;
    }
}

4. std::packaged_task

void concurrent_packaged(const std::vector<std::vector<int>>& src)
{
    tool::ScopeTimer timer;
    
    // Dispatch
    std::vector<std::future<int>> fs;
    for (const auto& vec: src)
    {
        std::packaged_task<decltype(long_time_sum)> task(long_time_sum);
        fs.push_back(task.get_future());
        std::thread(std::move(task), std::cref(vec)).detach();
    }
    // Output
    for (auto& f : fs)
    {
        std::cout << f.get() << std::endl;
    }
}

5. std::promise

void concurrent_promise(const std::vector<std::vector<int>>& src)
{
    tool::ScopeTimer timer;
    
    auto lambda = [](const std::vector<int>& vec, std::promise<int>& p) {
        try {
            auto ret = long_time_sum(vec);
            p.set_value(ret);
        }
        catch (...)
        {
           p.set_exception(std::current_exception());
        }
    };
    
    // Dispatch
    std::vector<std::promise<int>> ps(src.size());
    std::vector<std::future<int>> fs;
    for (std::size_t i = 0; i < src.size(); i++)
    {
        fs.push_back(ps[i].get_future());
        std::thread(lambda, std::cref(src[i]), std::ref(ps[i])).detach();
    }
    // Output
    for (auto& f : fs)
    {
            std::cout << f.get() << std::endl;
    }
}

完整程式碼

https://gist.github.com/ot32em/e677b9724af12859beb9

comments powered by Disqus