Halide
ThreadPool.h
Go to the documentation of this file.
1 #ifndef HALIDE_THREAD_POOL_H
2 #define HALIDE_THREAD_POOL_H
3 
4 #include <condition_variable>
5 #include <future>
6 #include <mutex>
7 #include <queue>
8 #include <thread>
9 #include <utility>
10 
11 #ifdef _MSC_VER
12 #else
13 #include <unistd.h>
14 #endif
15 
16 /** \file
17  * Define a simple thread pool utility that is modeled on the api of
18  * C++11 std::async(); since implementation details of std::async
19  * can vary considerably, with no control over thread spawning, this class
20  * allows us to use the same model but with precise control over thread usage.
21  *
22  * A ThreadPool is created with a specific number of threads, which will never
23  * vary over the life of the ThreadPool. (If created without a specific number
24  * of threads, it will attempt to use threads == number-of-cores.)
25  *
26  * Each async request will go into a queue, and will be serviced by the next
27  * available thread from the pool.
28  *
29  * The ThreadPool's dtor will block until all currently-executing tasks
30  * to finish (but won't schedule any more).
31  *
32  * Note that this is a fairly simpleminded ThreadPool, meant for tasks
33  * that are fairly coarse (e.g. different tasks in a test); it is specifically
34  * *not* intended to be the underlying implementation for Halide runtime threads
35  */
36 namespace Halide {
37 namespace Internal {
38 
39 template<typename T>
40 class ThreadPool {
41  struct Job {
42  std::function<T()> func;
43  std::promise<T> result;
44 
45  void run_unlocked(std::unique_lock<std::mutex> &unique_lock);
46  };
47 
48  // all fields are protected by this mutex.
49  std::mutex mutex;
50 
51  // Queue of Jobs.
52  std::queue<Job> jobs;
53 
54  // Broadcast whenever items are added to the Job queue.
55  std::condition_variable wakeup_threads;
56 
57  // Keep track of threads so they can be joined at shutdown
58  std::vector<std::thread> threads;
59 
60  // True if the pool is shutting down.
61  bool shutting_down{false};
62 
63  void worker_thread() {
64  std::unique_lock<std::mutex> unique_lock(mutex);
65  while (!shutting_down) {
66  if (jobs.empty()) {
67  // There are no jobs pending. Wait until more jobs are enqueued.
68  wakeup_threads.wait(unique_lock);
69  } else {
70  // Grab the next job.
71  Job cur_job = std::move(jobs.front());
72  jobs.pop();
73  cur_job.run_unlocked(unique_lock);
74  }
75  }
76  }
77 
78 public:
79  static size_t num_processors_online() {
80 #ifdef _WIN32
81  char *num_cores = getenv("NUMBER_OF_PROCESSORS");
82  return num_cores ? atoi(num_cores) : 8;
83 #else
84  return sysconf(_SC_NPROCESSORS_ONLN);
85 #endif
86  }
87 
88  // Default to number of available cores if not specified otherwise
89  ThreadPool(size_t desired_num_threads = num_processors_online()) {
90  // This file doesn't depend on anything else in libHalide, so
91  // we'll use assert, not internal_assert.
92  assert(desired_num_threads > 0);
93 
94  std::lock_guard<std::mutex> lock(mutex);
95 
96  // Create all the threads.
97  for (size_t i = 0; i < desired_num_threads; ++i) {
98  threads.emplace_back([this] { worker_thread(); });
99  }
100  }
101 
103  // Wake everyone up and tell them the party's over and it's time to go home
104  {
105  std::lock_guard<std::mutex> lock(mutex);
106  shutting_down = true;
107  wakeup_threads.notify_all();
108  }
109 
110  // Wait until they leave
111  for (auto &t : threads) {
112  t.join();
113  }
114  }
115 
116  template<typename Func, typename... Args>
117  std::future<T> async(Func func, Args... args) {
118  std::lock_guard<std::mutex> lock(mutex);
119 
120  Job job;
121  // Don't use std::forward here: we never want args passed by reference,
122  // since they will be accessed from an arbitrary thread.
123  //
124  // Some versions of GCC won't allow capturing variadic arguments in a lambda;
125  //
126  // job.func = [func, args...]() -> T { return func(args...); }; // Nope, sorry
127  //
128  // fortunately, we can use std::bind() to accomplish the same thing.
129  job.func = std::bind(func, args...);
130  jobs.emplace(std::move(job));
131  std::future<T> result = jobs.back().result.get_future();
132 
133  // Wake up our threads.
134  wakeup_threads.notify_all();
135 
136  return result;
137  }
138 };
139 
140 template<typename T>
141 inline void ThreadPool<T>::Job::run_unlocked(std::unique_lock<std::mutex> &unique_lock) {
142  unique_lock.unlock();
143  T r = func();
144  unique_lock.lock();
145  result.set_value(std::move(r));
146 }
147 
148 template<>
149 inline void ThreadPool<void>::Job::run_unlocked(std::unique_lock<std::mutex> &unique_lock) {
150  unique_lock.unlock();
151  func();
152  unique_lock.lock();
153  result.set_value();
154 }
155 
156 } // namespace Internal
157 } // namespace Halide
158 
159 #endif // HALIDE_THREAD_POOL_H
Halide::Internal::ThreadPool::~ThreadPool
~ThreadPool()
Definition: ThreadPool.h:102
getenv
char * getenv(const char *)
Halide::Internal::ThreadPool::ThreadPool
ThreadPool(size_t desired_num_threads=num_processors_online())
Definition: ThreadPool.h:89
Halide
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
Definition: AddAtomicMutex.h:21
atoi
int atoi(const char *)
Halide::LinkageType::Internal
@ Internal
Not visible externally, similar to 'static' linkage in C.
Halide::Internal::ThreadPool::async
std::future< T > async(Func func, Args... args)
Definition: ThreadPool.h:117
Halide::Internal::ThreadPool::num_processors_online
static size_t num_processors_online()
Definition: ThreadPool.h:79
Halide::Func
A halide function.
Definition: Func.h:667
Halide::Internal::ThreadPool
Definition: ThreadPool.h:40