Halide
thread_pool_common.h
Go to the documentation of this file.
1 
2 namespace Halide { namespace Runtime { namespace Internal {
3 
4 struct work {
6  int (*f)(void *, int, uint8_t *);
7  void *user_context;
8  int next, max;
12  bool running() { return next < max || active_workers > 0; }
13 };
14 
15 // The work queue and thread pool is weak, so one big work queue is shared by all halide functions
16 #define MAX_THREADS 64
17 struct work_queue_t {
18  // all fields are protected by this mutex.
20 
21  // Singly linked list for job stack
23 
24  // Worker threads are divided into an 'A' team and a 'B' team. The
25  // B team sleeps on the wakeup_b_team condition variable. The A
26  // team does work. Threads transition to the B team if they wake
27  // up and find that a_team_size > target_a_team_size. Threads
28  // move into the A team whenever they wake up and find that
29  // a_team_size < target_a_team_size.
30  int a_team_size, target_a_team_size;
31 
32  // Broadcast when a job completes.
34 
35  // Broadcast whenever items are added to the work queue.
37 
38  // May also be broadcast when items are added to the work queue if
39  // more threads are required than are currently in the A team.
41 
42  // Keep track of threads so they can be joined at shutdown
43  halide_thread *threads[MAX_THREADS];
44 
45  // The number threads created
47 
48  // The desired number threads doing work.
50 
51  // Global flags indicating the threadpool should shut down, and
52  // whether the thread pool has been initialized.
53  bool shutdown, initialized;
54 
55  bool running() {
56  return !shutdown;
57  }
58 
59 };
61 
62 WEAK int clamp_num_threads(int desired_num_threads) {
63  if (desired_num_threads > MAX_THREADS) {
64  desired_num_threads = MAX_THREADS;
65  } else if (desired_num_threads < 1) {
66  desired_num_threads = 1;
67  }
68  return desired_num_threads;
69 }
70 
72  int desired_num_threads = 0;
73  char *threads_str = getenv("HL_NUM_THREADS");
74  if (!threads_str) {
75  // Legacy name for HL_NUM_THREADS
76  threads_str = getenv("HL_NUMTHREADS");
77  }
78  if (threads_str) {
79  desired_num_threads = atoi(threads_str);
80  } else {
81  desired_num_threads = halide_host_cpu_count();
82  }
83  return desired_num_threads;
84 }
85 
87  // If I'm a job owner, then I was the thread that called
88  // do_par_for, and I should only stay in this function until my
89  // job is complete. If I'm a lowly worker thread, I should stay in
90  // this function as long as the work queue is running.
91  while (owned_job != NULL ? owned_job->running()
92  : work_queue.running()) {
93 
94  if (work_queue.jobs == NULL) {
95  if (owned_job) {
96  // There are no jobs pending. Wait for the last worker
97  // to signal that the job is finished.
98  halide_cond_wait(&work_queue.wakeup_owners, &work_queue.mutex);
99  } else if (work_queue.a_team_size <= work_queue.target_a_team_size) {
100  // There are no jobs pending. Wait until more jobs are enqueued.
101  halide_cond_wait(&work_queue.wakeup_a_team, &work_queue.mutex);
102  } else {
103  // There are no jobs pending, and there are too many
104  // threads in the A team. Transition to the B team
105  // until the wakeup_b_team condition is fired.
106  work_queue.a_team_size--;
107  halide_cond_wait(&work_queue.wakeup_b_team, &work_queue.mutex);
108  work_queue.a_team_size++;
109  }
110  } else {
111  // Grab the next job.
112  work *job = work_queue.jobs;
113 
114  // Claim a task from it.
115  work myjob = *job;
116  job->next++;
117 
118  // If there were no more tasks pending for this job,
119  // remove it from the stack.
120  if (job->next == job->max) {
121  work_queue.jobs = job->next_job;
122  }
123 
124  // Increment the active_worker count so that other threads
125  // are aware that this job is still in progress even
126  // though there are no outstanding tasks for it.
127  job->active_workers++;
128 
129  // Release the lock and do the task.
130  halide_mutex_unlock(&work_queue.mutex);
131  int result = halide_do_task(myjob.user_context, myjob.f, myjob.next,
132  myjob.closure);
133  halide_mutex_lock(&work_queue.mutex);
134 
135  // If this task failed, set the exit status on the job.
136  if (result) {
137  job->exit_status = result;
138  }
139 
140  // We are no longer active on this job
141  job->active_workers--;
142 
143  // If the job is done and I'm not the owner of it, wake up
144  // the owner.
145  if (!job->running() && job != owned_job) {
146  halide_cond_broadcast(&work_queue.wakeup_owners);
147  }
148  }
149  }
150 }
151 
152 WEAK void worker_thread(void *) {
153  halide_mutex_lock(&work_queue.mutex);
155  halide_mutex_unlock(&work_queue.mutex);
156 }
157 
158 }}} // namespace Halide::Runtime::Internal
159 
160 using namespace Halide::Runtime::Internal;
161 
162 extern "C" {
163 
165  uint8_t *closure) {
166  return f(user_context, idx, closure);
167 }
168 
170  int min, int size, uint8_t *closure) {
171  // Our for loops are expected to gracefully handle sizes <= 0
172  if (size <= 0) {
173  return 0;
174  }
175 
176  // Grab the lock. If it hasn't been initialized yet, then the
177  // field will be zero-initialized because it's a static global.
179 
180  if (!work_queue.initialized) {
181  work_queue.shutdown = false;
185  work_queue.jobs = NULL;
186 
187  // Compute the desired number of threads to use. Other code
188  // can also mess with this value, but only when the work queue
189  // is locked.
192  }
195 
196  // Everyone starts on the a team.
198 
199  work_queue.initialized = true;
200  }
201 
203  // We might need to make some new threads, if work_queue.desired_num_threads has
204  // increased.
207  }
208 
209  // Make the job.
210  work job;
211  job.f = f; // The job should call this function. It takes an index and a closure.
213  job.next = min; // Start at this index.
214  job.max = min + size; // Keep going until one less than this index.
215  job.closure = closure; // Use this closure.
216  job.exit_status = 0; // The job hasn't failed yet
217  job.active_workers = 0; // Nobody is working on this yet
218 
220  // If there's no nested parallelism happening and there are
221  // fewer tasks to do than threads, then set the target A team
222  // size so that some threads will put themselves to sleep
223  // until a larger job arrives.
225  } else {
226  // Otherwise the target A team size is
227  // desired_num_threads. This may still be less than
228  // threads_created if desired_num_threads has been reduced by
229  // other code.
231  }
232 
233  // Push the job onto the stack.
234  job.next_job = work_queue.jobs;
235  work_queue.jobs = &job;
236 
237  // Wake up our A team.
239 
240  // If there are fewer threads than we would like on the a team,
241  // wake up the b team too.
244  }
245 
246  // Do some work myself.
248 
250 
251  // Return zero if the job succeeded, otherwise return the exit
252  // status of one of the failing jobs (whichever one failed last).
253  return job.exit_status;
254 }
255 
257  if (n < 0) {
258  halide_error(NULL, "halide_set_num_threads: must be >= 0.");
259  }
260  // Don't make this an atomic swap - we don't want to be changing
261  // the desired number of threads while another thread is in the
262  // middle of a sequence of non-atomic operations.
264  if (n == 0) {
266  }
270  return old;
271 }
272 
274  if (!work_queue.initialized) return;
275 
276  // Wake everyone up and tell them the party's over and it's time
277  // to go home
279  work_queue.shutdown = true;
284 
285  // Wait until they leave
286  for (int i = 0; i < work_queue.threads_created; i++) {
288  }
289 
290  // Tidy up
295  work_queue.initialized = false;
296 }
297 
298 }
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
Cross-platform mutex.
Definition: HalideRuntime.h:97
int(* f)(void *, int, uint8_t *)
#define MAX_THREADS
WEAK int halide_host_cpu_count()
#define WEAK
WEAK void halide_shutdown_thread_pool()
Defines methods for manipulating and analyzing boolean expressions.
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
#define NULL
unsigned __INT8_TYPE__ uint8_t
Expr min(FuncRef a, FuncRef b)
Explicit overloads of min and max for FuncRef.
Definition: Func.h:418
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
WEAK void halide_cond_init(struct halide_cond *cond)
int atoi(const char *)
WEAK int clamp_num_threads(int desired_num_threads)
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
halide_thread * threads[MAX_THREADS]
WEAK int halide_set_num_threads(int n)
Set the number of threads used by Halide&#39;s thread pool.
WEAK void worker_thread(void *)
WEAK void halide_cond_destroy(struct halide_cond *cond)
WEAK work_queue_t work_queue
void halide_join_thread(struct halide_thread *)
Join a thread.
WEAK void halide_cond_broadcast(struct halide_cond *cond)
void halide_mutex_unlock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of do_task and do_par_for.
WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
WEAK void worker_thread_already_locked(work *owned_job)
char * getenv(const char *)
void halide_mutex_destroy(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...