1 #ifndef HALIDE_THREAD_POOL_H
2 #define HALIDE_THREAD_POOL_H
4 #include <condition_variable>
42 std::function<T()> func;
43 std::promise<T> result;
45 void run_unlocked(std::unique_lock<std::mutex> &unique_lock);
55 std::condition_variable wakeup_threads;
58 std::vector<std::thread> threads;
61 bool shutting_down{
false};
63 void worker_thread() {
64 std::unique_lock<std::mutex> unique_lock(mutex);
65 while (!shutting_down) {
68 wakeup_threads.wait(unique_lock);
71 Job cur_job = std::move(jobs.front());
73 cur_job.run_unlocked(unique_lock);
81 char *num_cores =
getenv(
"NUMBER_OF_PROCESSORS");
82 return num_cores ?
atoi(num_cores) : 8;
84 return sysconf(_SC_NPROCESSORS_ONLN);
92 assert(desired_num_threads > 0);
94 std::lock_guard<std::mutex> lock(mutex);
97 for (
size_t i = 0; i < desired_num_threads; ++i) {
98 threads.emplace_back([
this] { worker_thread(); });
105 std::lock_guard<std::mutex> lock(mutex);
106 shutting_down =
true;
107 wakeup_threads.notify_all();
111 for (
auto &t : threads) {
116 template<
typename Func,
typename... Args>
118 std::lock_guard<std::mutex> lock(mutex);
129 job.func = std::bind(func, args...);
130 jobs.emplace(std::move(job));
131 std::future<T> result = jobs.back().result.get_future();
134 wakeup_threads.notify_all();
141 inline void ThreadPool<T>::Job::run_unlocked(std::unique_lock<std::mutex> &unique_lock) {
142 unique_lock.unlock();
145 result.set_value(std::move(r));
149 inline void ThreadPool<void>::Job::run_unlocked(std::unique_lock<std::mutex> &unique_lock) {
150 unique_lock.unlock();
ThreadPool(size_t desired_num_threads=num_processors_online())
std::future< T > async(Func func, Args... args)
static size_t num_processors_online()
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
@ Internal
Not visible externally, similar to 'static' linkage in C.
char * getenv(const char *)