Halide
thread_pool_common.h
Go to the documentation of this file.
1 #define EXTENDED_DEBUG 0
2 
3 #if EXTENDED_DEBUG
4 // This code is currently setup for Linux debugging. Switch to using pthread_self on e.g. Mac OS X.
5 extern "C" int syscall(int);
6 
7 namespace {
8 int gettid() {
9 #ifdef BITS_32
10  return syscall(224);
11 #else
12  return syscall(186);
13 #endif
14 }
15 } // namespace
16 
17 #define log_message(stuff) print(NULL) << gettid() << ": " << stuff << "\n"
18 #else
19 #define log_message(stuff)
20 #endif
21 
22 namespace Halide {
23 namespace Runtime {
24 namespace Internal {
25 
26 struct work {
28 
29  // If we come in to the task system via do_par_for we just have a
30  // halide_task_t, not a halide_loop_task_t.
32 
38 
39  void *user_context;
43  // which condition variable is the owner sleeping on. NULL if it isn't sleeping.
45 
50  // Note that we don't release the semaphores already
51  // acquired. We never have two consumers contending
52  // over the same semaphore, so it's not helpful to do
53  // so.
54  return false;
55  }
56  }
57  // Future iterations of this task need to acquire the semaphores from scratch.
58  next_semaphore = 0;
59  return true;
60  }
61 
62  ALWAYS_INLINE bool running() const {
63  return task.extent || active_workers;
64  }
65 };
66 
67 #define MAX_THREADS 256
68 
69 WEAK int clamp_num_threads(int threads) {
70  if (threads > MAX_THREADS) {
71  threads = MAX_THREADS;
72  } else if (threads < 1) {
73  threads = 1;
74  }
75  return threads;
76 }
77 
79  int desired_num_threads = 0;
80  char *threads_str = getenv("HL_NUM_THREADS");
81  if (!threads_str) {
82  // Legacy name for HL_NUM_THREADS
83  threads_str = getenv("HL_NUMTHREADS");
84  }
85  if (threads_str) {
86  desired_num_threads = atoi(threads_str);
87  } else {
88  desired_num_threads = halide_host_cpu_count();
89  }
90  return desired_num_threads;
91 }
92 
93 // The work queue and thread pool is weak, so one big work queue is shared by all halide functions
94 struct work_queue_t {
95  // all fields are protected by this mutex.
97 
98  // The desired number threads doing work (HL_NUM_THREADS).
100 
101  // All fields after this must be zero in the initial state. See assert_zeroed
102  // Field serves both to mark the offset in struct and as layout padding.
104 
105  // Singly linked list for job stack
107 
108  // The number threads created
110 
111  // Workers sleep on one of two condition variables, to make it
112  // easier to wake up the right number if a small number of tasks
113  // are enqueued. There are A-team workers and B-team workers. The
114  // following variables track the current size and the desired size
115  // of the A team.
117 
118  // The condition variables that workers and owners sleep on. We
119  // may want to wake them up independently. Any code that may
120  // invalidate any of the reasons a worker or owner may have slept
121  // must signal or broadcast the appropriate condition variable.
123 
124  // The number of sleeping workers and owners. An over-estimate - a
125  // waking-up thread may not have decremented this yet.
127 
128  // Keep track of threads so they can be joined at shutdown
129  halide_thread *threads[MAX_THREADS];
130 
131  // Global flags indicating the threadpool should shut down, and
132  // whether the thread pool has been initialized.
134 
135  // The number of threads that are currently commited to possibly block
136  // via outstanding jobs queued or being actively worked on. Used to limit
137  // the number of iterations of parallel for loops that are invoked so as
138  // to prevent deadlock due to oversubscription of threads.
140 
141  ALWAYS_INLINE bool running() const {
142  return !shutdown;
143  }
144 
145  // Used to check initial state is correct.
147  // Assert that all fields except the mutex and desired hreads count are zeroed.
148  const char *bytes = ((const char *)&this->zero_marker);
149  const char *limit = ((const char *)this) + sizeof(work_queue_t);
150  while (bytes < limit && *bytes == 0) {
151  bytes++;
152  }
153  halide_assert(NULL, bytes == limit && "Logic error in thread pool work queue initialization.\n");
154  }
155 
156  // Return the work queue to initial state. Must be called while locked
157  // and queue will remain locked.
159  // Ensure all fields except the mutex and desired hreads count are zeroed.
160  char *bytes = ((char *)&this->zero_marker);
161  char *limit = ((char *)this) + sizeof(work_queue_t);
162  memset(bytes, 0, limit - bytes);
163  }
164 };
165 
167 
168 #if EXTENDED_DEBUG
169 WEAK void print_job(work *job, const char *indent, const char *prefix = NULL) {
170  if (prefix == NULL) {
171  prefix = indent;
172  }
173  const char *name = job->task.name ? job->task.name : "<no name>";
174  const char *parent_name = job->parent_job ? (job->parent_job->task.name ? job->parent_job->task.name : "<no name>") : "<no parent job>";
175  log_message(prefix << name << "[" << job << "] serial: " << job->task.serial << " active_workers: " << job->active_workers << " min: " << job->task.min << " extent: " << job->task.extent << " siblings: " << job->siblings << " sibling count: " << job->sibling_count << " min_threads " << job->task.min_threads << " next_sempaphore: " << job->next_semaphore << " threads_reserved: " << job->threads_reserved << " parent_job: " << parent_name << "[" << job->parent_job << "]");
176  for (int i = 0; i < job->task.num_semaphores; i++) {
177  log_message(indent << " semaphore " << (void *)job->task.semaphores[i].semaphore << " count " << job->task.semaphores[i].count << " val " << *(int *)job->task.semaphores[i].semaphore);
178  }
179 }
180 
181 WEAK void dump_job_state() {
182  log_message("Dumping job state, jobs in queue:");
183  work *job = work_queue.jobs;
184  while (job != NULL) {
185  print_job(job, " ");
186  job = job->next_job;
187  }
188  log_message("Done dumping job state.");
189 }
190 #else
191 #define print_job(job, indent, prefix)
192 #define dump_job_state()
193 #endif
194 
195 WEAK void worker_thread(void *);
196 
198  while (owned_job ? owned_job->running() : !work_queue.shutdown) {
199  work *job = work_queue.jobs;
200  work **prev_ptr = &work_queue.jobs;
201 
202  if (owned_job) {
203  if (owned_job->exit_status != 0) {
204  if (owned_job->active_workers == 0) {
205  while (job != owned_job) {
206  prev_ptr = &job->next_job;
207  job = job->next_job;
208  }
209  *prev_ptr = job->next_job;
210  job->task.extent = 0;
211  continue; // So loop exit is always in the same place.
212  }
213  } else if (owned_job->parent_job && owned_job->parent_job->exit_status != 0) {
214  owned_job->exit_status = owned_job->parent_job->exit_status;
215  // The wakeup can likely be only done under certain conditions, but it is only happening
216  // in when an error has already occured and it seems more important to ensure reliable
217  // termination than to optimize this path.
219  continue;
220  }
221  }
222 
223  dump_job_state();
224 
225  // Find a job to run, prefering things near the top of the stack.
226  while (job) {
227  print_job(job, "", "Considering job ");
228  // Only schedule tasks with enough free worker threads
229  // around to complete. They may get stolen later, but only
230  // by tasks which can themselves use them to complete
231  // work, so forward progress is made.
232  bool enough_threads;
233 
234  work *parent_job = job->parent_job;
235 
236  int threads_available;
237  if (parent_job == NULL) {
238  // The + 1 is because work_queue.threads_created does not include the main thread.
239  threads_available = (work_queue.threads_created + 1) - work_queue.threads_reserved;
240  } else {
241  if (parent_job->active_workers == 0) {
242  threads_available = parent_job->task.min_threads - parent_job->threads_reserved;
243  } else {
244  threads_available = parent_job->active_workers * parent_job->task.min_threads - parent_job->threads_reserved;
245  }
246  }
247  enough_threads = threads_available >= job->task.min_threads;
248 
249  if (!enough_threads) {
250 
251  log_message("Not enough threads for job " << job->task.name << " available: " << threads_available << " min_threads: " << job->task.min_threads);
252  }
253  bool can_use_this_thread_stack = !owned_job || (job->siblings == owned_job->siblings) || job->task.min_threads == 0;
254  if (!can_use_this_thread_stack) {
255  log_message("Cannot run job " << job->task.name << " on this thread.");
256  }
257  bool can_add_worker = (!job->task.serial || (job->active_workers == 0));
258  if (!can_add_worker) {
259  log_message("Cannot add worker to job " << job->task.name);
260  }
261 
262  if (enough_threads && can_use_this_thread_stack && can_add_worker) {
263  if (job->make_runnable()) {
264  break;
265  } else {
266  log_message("Cannot acquire semaphores for " << job->task.name);
267  }
268  }
269  prev_ptr = &(job->next_job);
270  job = job->next_job;
271  }
272 
273  if (!job) {
274  // There is no runnable job. Go to sleep.
275  if (owned_job) {
277  owned_job->owner_is_sleeping = true;
279  owned_job->owner_is_sleeping = false;
281  } else {
284  // Transition to B team
288  } else {
290  }
292  }
293  continue;
294  }
295 
296  log_message("Working on job " << job->task.name);
297 
298  // Increment the active_worker count so that other threads
299  // are aware that this job is still in progress even
300  // though there are no outstanding tasks for it.
301  job->active_workers++;
302 
303  if (job->parent_job == NULL) {
305  log_message("Reserved " << job->task.min_threads << " on work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
306  } else {
308  log_message("Reserved " << job->task.min_threads << " on " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
309  }
310 
311  int result = 0;
312 
313  if (job->task.serial) {
314  // Remove it from the stack while we work on it
315  *prev_ptr = job->next_job;
316 
317  // Release the lock and do the task.
319  int total_iters = 0;
320  int iters = 1;
321  while (result == 0) {
322  // Claim as many iterations as possible
323  while ((job->task.extent - total_iters) > iters &&
324  job->make_runnable()) {
325  iters++;
326  }
327  if (iters == 0) break;
328 
329  // Do them
330  result = halide_do_loop_task(job->user_context, job->task.fn,
331  job->task.min + total_iters, iters,
332  job->task.closure, job);
333  total_iters += iters;
334  iters = 0;
335  }
337 
338  job->task.min += total_iters;
339  job->task.extent -= total_iters;
340 
341  // Put it back on the job stack, if it hasn't failed.
342  if (result != 0) {
343  job->task.extent = 0; // Force job to be finished.
344  } else if (job->task.extent > 0) {
345  job->next_job = work_queue.jobs;
346  work_queue.jobs = job;
347  }
348  } else {
349  // Claim a task from it.
350  work myjob = *job;
351  job->task.min++;
352  job->task.extent--;
353 
354  // If there were no more tasks pending for this job, remove it
355  // from the stack.
356  if (job->task.extent == 0) {
357  *prev_ptr = job->next_job;
358  }
359 
360  // Release the lock and do the task.
362  if (myjob.task_fn) {
363  result = halide_do_task(myjob.user_context, myjob.task_fn,
364  myjob.task.min, myjob.task.closure);
365  } else {
366  result = halide_do_loop_task(myjob.user_context, myjob.task.fn,
367  myjob.task.min, 1,
368  myjob.task.closure, job);
369  }
371  }
372 
373  if (result != 0) {
374  log_message("Saw thread pool saw error from task: " << result);
375  }
376 
377  bool wake_owners = false;
378 
379  // If this task failed, set the exit status on the job.
380  if (result != 0) {
381  job->exit_status = result;
382  // Mark all siblings as also failed.
383  for (int i = 0; i < job->sibling_count; i++) {
384  log_message("Marking " << job->sibling_count << " siblings ");
385  if (job->siblings[i].exit_status == 0) {
386  job->siblings[i].exit_status = result;
387  wake_owners |= (job->active_workers == 0 && job->siblings[i].owner_is_sleeping);
388  }
389  log_message("Done marking siblings.");
390  }
391  }
392 
393  if (job->parent_job == NULL) {
395  log_message("Returned " << job->task.min_threads << " to work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
396  } else {
398  log_message("Returned " << job->task.min_threads << " to " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
399  }
400 
401  // We are no longer active on this job
402  job->active_workers--;
403 
404  log_message("Done working on job " << job->task.name);
405 
406  if (wake_owners ||
407  (job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != 0) && job->owner_is_sleeping)) {
408  // The job is done or some owned job failed via sibling linkage. Wake up the owner.
410  }
411  }
412 }
413 
414 WEAK void worker_thread(void *arg) {
418 }
419 
420 WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent) {
421  if (!work_queue.initialized) {
423 
424  // Compute the desired number of threads to use. Other code
425  // can also mess with this value, but only when the work queue
426  // is locked.
429  }
431  work_queue.initialized = true;
432  }
433 
434  // Gather some information about the work.
435 
436  // Some tasks require a minimum number of threads to make forward
437  // progress. Also assume the blocking tasks need to run concurrently.
438  int min_threads = 0;
439 
440  // Count how many workers to wake. Start at -1 because this thread
441  // will contribute.
442  int workers_to_wake = -1;
443 
444  // Could stalled owners of other tasks conceivably help with one
445  // of these jobs.
446  bool stealable_jobs = false;
447 
448  bool job_has_acquires = false;
449  bool job_may_block = false;
450  for (int i = 0; i < num_jobs; i++) {
451  if (jobs[i].task.min_threads == 0) {
452  stealable_jobs = true;
453  } else {
454  job_may_block = true;
455  min_threads += jobs[i].task.min_threads;
456  }
457  if (jobs[i].task.num_semaphores != 0) {
458  job_has_acquires = true;
459  }
460 
461  if (jobs[i].task.serial) {
462  workers_to_wake++;
463  } else {
464  workers_to_wake += jobs[i].task.extent;
465  }
466  }
467 
468  if (task_parent == NULL) {
469  // This is here because some top-level jobs may block, but are not accounted for
470  // in any enclosing min_threads count. In order to handle extern stages and such
471  // correctly, we likely need to make the total min_threads for an invocation of
472  // a pipeline a property of the entire thing. This approach works because we use
473  // the increased min_threads count to increase the size of the thread pool. It should
474  // even be safe against reservation races because this is happening under the work
475  // queue lock and that lock will be held into running the job. However that's many
476  // lines of code from here to there and it is not guaranteed this will be the first
477  // job run.
478  if (job_has_acquires || job_may_block) {
479  log_message("enqueue_work_already_locked adding one to min_threads.");
480  min_threads += 1;
481  }
482 
483  // Spawn more threads if necessary.
486  (work_queue.threads_created + 1) - work_queue.threads_reserved < min_threads)) {
487  // We might need to make some new threads, if work_queue.desired_threads_working has
488  // increased, or if there aren't enough threads to complete this new task.
492  }
493  log_message("enqueue_work_already_locked top level job " << jobs[0].task.name << " with min_threads " << min_threads << " work_queue.threads_created " << work_queue.threads_created << " work_queue.threads_reserved " << work_queue.threads_reserved);
494  if (job_has_acquires || job_may_block) {
496  }
497  } else {
498  log_message("enqueue_work_already_locked job " << jobs[0].task.name << " with min_threads " << min_threads << " task_parent " << task_parent->task.name << " task_parent->task.min_threads " << task_parent->task.min_threads << " task_parent->threads_reserved " << task_parent->threads_reserved);
499  halide_assert(NULL, (min_threads <= ((task_parent->task.min_threads * task_parent->active_workers) -
500  task_parent->threads_reserved)) &&
501  "Logic error: thread over commit.\n");
502  if (job_has_acquires || job_may_block) {
503  task_parent->threads_reserved++;
504  }
505  }
506 
507  // Push the jobs onto the stack.
508  for (int i = num_jobs - 1; i >= 0; i--) {
509  // We could bubble it downwards based on some heuristics, but
510  // it's not strictly necessary to do so.
511  jobs[i].next_job = work_queue.jobs;
512  jobs[i].siblings = &jobs[0];
513  jobs[i].sibling_count = num_jobs;
514  jobs[i].threads_reserved = 0;
515  work_queue.jobs = jobs + i;
516  }
517 
518  bool nested_parallelism =
521 
522  // Wake up an appropriate number of threads
523  if (nested_parallelism || workers_to_wake > work_queue.workers_sleeping) {
524  // If there's nested parallelism going on, we just wake up
525  // everyone. TODO: make this more precise.
527  } else {
528  work_queue.target_a_team_size = workers_to_wake;
529  }
530 
534  if (stealable_jobs) {
536  }
537  }
538 
539  if (job_has_acquires || job_may_block) {
540  if (task_parent != NULL) {
541  task_parent->threads_reserved--;
542  } else {
544  }
545  }
546 }
547 
555 
556 } // namespace Internal
557 } // namespace Runtime
558 } // namespace Halide
559 
560 using namespace Halide::Runtime::Internal;
561 
562 extern "C" {
563 
564 namespace {
565 WEAK __attribute__((destructor)) void halide_thread_pool_cleanup() {
567 }
568 } // namespace
569 
571  uint8_t *closure) {
572  return f(user_context, idx, closure);
573 }
574 
576  int min, int extent, uint8_t *closure,
577  void *task_parent) {
578  return f(user_context, min, extent, closure, task_parent);
579 }
580 
582  int min, int size, uint8_t *closure) {
583  if (size <= 0) {
584  return 0;
585  }
586 
587  work job;
588  job.task.fn = NULL;
589  job.task.min = min;
590  job.task.extent = size;
591  job.task.serial = false;
592  job.task.semaphores = NULL;
593  job.task.num_semaphores = 0;
594  job.task.closure = closure;
595  job.task.min_threads = 0;
596  job.task.name = NULL;
597  job.task_fn = f;
599  job.exit_status = 0;
600  job.active_workers = 0;
601  job.next_semaphore = 0;
602  job.owner_is_sleeping = false;
603  job.siblings = &job; // guarantees no other job points to the same siblings.
604  job.sibling_count = 0;
605  job.parent_job = NULL;
610  return job.exit_status;
611 }
612 
614  struct halide_parallel_task_t *tasks,
615  void *task_parent) {
616  work *jobs = (work *)__builtin_alloca(sizeof(work) * num_tasks);
617 
618  for (int i = 0; i < num_tasks; i++) {
619  if (tasks->extent <= 0) {
620  // Skip extent zero jobs
621  num_tasks--;
622  continue;
623  }
624  jobs[i].task = *tasks++;
625  jobs[i].task_fn = NULL;
626  jobs[i].user_context = user_context;
627  jobs[i].exit_status = 0;
628  jobs[i].active_workers = 0;
629  jobs[i].next_semaphore = 0;
630  jobs[i].owner_is_sleeping = false;
631  jobs[i].parent_job = (work *)task_parent;
632  }
633 
634  if (num_tasks == 0) {
635  return 0;
636  }
637 
639  enqueue_work_already_locked(num_tasks, jobs, (work *)task_parent);
640  int exit_status = 0;
641  for (int i = 0; i < num_tasks; i++) {
642  // It doesn't matter what order we join the tasks in, because
643  // we'll happily assist with siblings too.
645  if (jobs[i].exit_status != 0) {
646  exit_status = jobs[i].exit_status;
647  }
648  }
650  return exit_status;
651 }
652 
654  if (n < 0) {
655  halide_error(NULL, "halide_set_num_threads: must be >= 0.");
656  }
657  // Don't make this an atomic swap - we don't want to be changing
658  // the desired number of threads while another thread is in the
659  // middle of a sequence of non-atomic operations.
661  if (n == 0) {
663  }
667  return old;
668 }
669 
671  if (work_queue.initialized) {
672  // Wake everyone up and tell them the party's over and it's time
673  // to go home
675 
676  work_queue.shutdown = true;
681 
682  // Wait until they leave
683  for (int i = 0; i < work_queue.threads_created; i++) {
685  }
686 
687  // Tidy up
688  work_queue.reset();
689  }
690 }
691 
693  int value;
694 };
695 
698  Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->value, &n);
699  return n;
700 }
701 
704  int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->value, n);
705  // TODO(abadams|zvookin): Is this correct if an acquire can be for say count of 2 and the releases are 1 each?
706  if (old_val == 0 && n != 0) { // Don't wake if nothing released.
707  // We may have just made a job runnable
712  }
713  return old_val + n;
714 }
715 
717  if (n == 0) {
718  return true;
719  }
721  // Decrement and get new value
722  int expected;
723  int desired;
724  Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->value, &expected);
725  do {
726  desired = expected - n;
727  } while (desired >= 0 &&
728  !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->value, &expected, &desired));
729  return desired >= 0;
730 }
731 
734  custom_do_task = f;
735  return result;
736 }
737 
741  return result;
742 }
743 
746  custom_do_par_for = f;
747  return result;
748 }
749 
751  halide_do_par_for_t do_par_for,
752  halide_do_task_t do_task,
753  halide_do_loop_task_t do_loop_task,
754  halide_do_parallel_tasks_t do_parallel_tasks,
755  halide_semaphore_init_t semaphore_init,
756  halide_semaphore_try_acquire_t semaphore_try_acquire,
757  halide_semaphore_release_t semaphore_release) {
758 
759  custom_do_par_for = do_par_for;
760  custom_do_task = do_task;
761  custom_do_loop_task = do_loop_task;
762  custom_do_parallel_tasks = do_parallel_tasks;
763  custom_semaphore_init = semaphore_init;
764  custom_semaphore_try_acquire = semaphore_try_acquire;
765  custom_semaphore_release = semaphore_release;
766 }
767 
769  uint8_t *closure) {
770  return (*custom_do_task)(user_context, f, idx, closure);
771 }
772 
774  int min, int size, uint8_t *closure) {
775  return (*custom_do_par_for)(user_context, f, min, size, closure);
776 }
777 
779  int min, int size, uint8_t *closure, void *task_parent) {
780  return custom_do_loop_task(user_context, f, min, size, closure, task_parent);
781 }
782 
783 WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks,
784  struct halide_parallel_task_t *tasks,
785  void *task_parent) {
786  return custom_do_parallel_tasks(user_context, num_tasks, tasks, task_parent);
787 }
788 
789 WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count) {
790  return custom_semaphore_init(sema, count);
791 }
792 
793 WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count) {
794  return custom_semaphore_release(sema, count);
795 }
796 
797 WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count) {
798  return custom_semaphore_try_acquire(sema, count);
799 }
800 }
Halide::Runtime::Internal::clamp_num_threads
WEAK int clamp_num_threads(int threads)
Definition: thread_pool_common.h:69
halide_set_custom_parallel_runtime
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
Definition: thread_pool_common.h:750
Halide::Runtime::Internal::work_queue_t::threads_reserved
int threads_reserved
Definition: thread_pool_common.h:139
Halide::Runtime::Internal::custom_do_loop_task
WEAK halide_do_loop_task_t custom_do_loop_task
Definition: thread_pool_common.h:549
halide_task_t
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
Definition: HalideRuntime.h:154
MAX_THREADS
#define MAX_THREADS
Definition: thread_pool_common.h:67
halide_do_par_for_t
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
Definition: HalideRuntime.h:163
NULL
#define NULL
Definition: runtime_internal.h:32
halide_semaphore_init
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
Definition: thread_pool_common.h:789
Halide::Runtime::Internal::custom_semaphore_init
WEAK halide_semaphore_init_t custom_semaphore_init
Definition: thread_pool_common.h:552
Halide::Runtime::Internal::work::user_context
void * user_context
Definition: thread_pool_common.h:39
Halide::Runtime::Internal::work::next_semaphore
int next_semaphore
Definition: thread_pool_common.h:42
uint8_t
unsigned __INT8_TYPE__ uint8_t
Definition: runtime_internal.h:25
halide_do_task_t
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
Definition: HalideRuntime.h:255
Halide::Runtime::Internal::work_queue_t::wake_owners
halide_cond wake_owners
Definition: thread_pool_common.h:122
Halide::Runtime::Internal::work
Definition: thread_pool_common.h:26
halide_cond_wait
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
Definition: synchronization_common.h:1176
halide_loop_task_t
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
Definition: HalideRuntime.h:188
halide_parallel_task_t::extent
int extent
Definition: HalideRuntime.h:215
Halide::Runtime::Internal::work::owner_is_sleeping
bool owner_is_sleeping
Definition: thread_pool_common.h:44
Halide::Runtime::Internal::work_queue_t::target_a_team_size
int target_a_team_size
Definition: thread_pool_common.h:116
Halide::min
Expr min(const FuncRef &a, const FuncRef &b)
Explicit overloads of min and max for FuncRef.
Definition: Func.h:577
Halide::Runtime::Internal::work::sibling_count
int sibling_count
Definition: thread_pool_common.h:35
Halide::Runtime::Internal::custom_semaphore_try_acquire
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
Definition: thread_pool_common.h:553
halide_mutex_lock
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
Definition: synchronization_common.h:1152
halide_cond
Cross platform condition variable.
Definition: HalideRuntime.h:119
Halide::Runtime::Internal::work_queue_t::reset
ALWAYS_INLINE void reset()
Definition: thread_pool_common.h:158
halide_semaphore_try_acquire
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
Definition: thread_pool_common.h:797
Halide::Runtime::Internal::enqueue_work_already_locked
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
Definition: thread_pool_common.h:420
Halide::Runtime::Internal::work::parent_job
work * parent_job
Definition: thread_pool_common.h:36
Halide::Runtime::Internal::work_queue_t::owners_sleeping
int owners_sleeping
Definition: thread_pool_common.h:126
getenv
char * getenv(const char *)
halide_parallel_task_t::num_semaphores
int num_semaphores
Definition: HalideRuntime.h:211
halide_do_par_for
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
Definition: thread_pool_common.h:773
halide_do_parallel_tasks
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
Definition: thread_pool_common.h:783
halide_semaphore_acquire_t::count
int count
Definition: HalideRuntime.h:175
halide_do_task
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
Definition: thread_pool_common.h:768
Halide::Runtime::Internal::work::task_fn
halide_task_t task_fn
Definition: thread_pool_common.h:31
halide_do_parallel_tasks_t
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
Definition: HalideRuntime.h:277
Halide
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
Definition: AddAtomicMutex.h:21
halide_error
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
halide_mutex_unlock
void halide_mutex_unlock(struct halide_mutex *mutex)
Definition: synchronization_common.h:1158
atoi
int atoi(const char *)
Halide::Runtime::Internal::work_queue_t::workers_sleeping
int workers_sleeping
Definition: thread_pool_common.h:126
memset
void * memset(void *s, int val, size_t n)
Halide::Runtime::Internal::work_queue_t::shutdown
bool shutdown
Definition: thread_pool_common.h:133
halide_host_cpu_count
WEAK int halide_host_cpu_count()
halide_semaphore_init_t
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
Definition: HalideRuntime.h:180
halide_assert
#define halide_assert(user_context, cond)
Definition: runtime_internal.h:240
halide_set_custom_do_loop_task
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
Definition: thread_pool_common.h:738
Halide::Runtime::Internal::work_queue_t::threads_created
int threads_created
Definition: thread_pool_common.h:109
print_job
#define print_job(job, indent, prefix)
Definition: thread_pool_common.h:191
halide_spawn_thread
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
halide_parallel_task_t::closure
uint8_t * closure
Definition: HalideRuntime.h:203
Halide::Runtime::Internal::work_queue_t::zero_marker
int zero_marker
Definition: thread_pool_common.h:103
Halide::Runtime::Internal::work_queue_t::mutex
halide_mutex mutex
Definition: thread_pool_common.h:96
halide_semaphore_acquire_t::semaphore
struct halide_semaphore_t * semaphore
Definition: HalideRuntime.h:174
halide_do_loop_task_t
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
Definition: HalideRuntime.h:264
Halide::Runtime::Internal::work::task
halide_parallel_task_t task
Definition: thread_pool_common.h:27
Halide::Runtime::Internal::work_queue_t::a_team_size
int a_team_size
Definition: thread_pool_common.h:116
halide_semaphore_impl_t
Definition: thread_pool_common.h:692
halide_parallel_task_t::min_threads
int min_threads
Definition: HalideRuntime.h:234
Halide::Runtime::Internal::work::threads_reserved
int threads_reserved
Definition: thread_pool_common.h:37
log_message
#define log_message(stuff)
Definition: thread_pool_common.h:19
Halide::Runtime::Internal::worker_thread
WEAK void worker_thread(void *)
Definition: thread_pool_common.h:414
Halide::Runtime::Internal::default_desired_num_threads
WEAK int default_desired_num_threads()
Definition: thread_pool_common.h:78
halide_semaphore_try_acquire_t
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
Definition: HalideRuntime.h:182
Halide::Runtime::Internal::custom_semaphore_release
WEAK halide_semaphore_release_t custom_semaphore_release
Definition: thread_pool_common.h:554
halide_mutex
Cross-platform mutex.
Definition: HalideRuntime.h:114
halide_default_do_loop_task
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
Definition: thread_pool_common.h:575
Halide::Runtime::Internal::work_queue_t::jobs
work * jobs
Definition: thread_pool_common.h:106
halide_parallel_task_t::fn
halide_loop_task_t fn
Definition: HalideRuntime.h:200
Halide::Runtime::Internal::work_queue_t::wake_b_team
halide_cond wake_b_team
Definition: thread_pool_common.h:122
Halide::Runtime::Internal::worker_thread_already_locked
WEAK void worker_thread_already_locked(work *owned_job)
Definition: thread_pool_common.h:197
halide_parallel_task_t::min
int min
Definition: HalideRuntime.h:215
halide_default_semaphore_try_acquire
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
Definition: thread_pool_common.h:716
Halide::Runtime::Internal::custom_do_parallel_tasks
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
Definition: thread_pool_common.h:551
halide_set_num_threads
WEAK int halide_set_num_threads(int n)
Set the number of threads used by Halide's thread pool.
Definition: thread_pool_common.h:653
Halide::Runtime::Internal::work_queue_t::running
ALWAYS_INLINE bool running() const
Definition: thread_pool_common.h:141
Halide::Runtime::Internal::work_queue_t::initialized
bool initialized
Definition: thread_pool_common.h:133
halide_parallel_task_t::semaphores
struct halide_semaphore_acquire_t * semaphores
Definition: HalideRuntime.h:210
ALWAYS_INLINE
#define ALWAYS_INLINE
Definition: runtime_internal.h:53
halide_do_loop_task
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
Definition: thread_pool_common.h:778
halide_parallel_task_t::serial
bool serial
Definition: HalideRuntime.h:239
halide_join_thread
void halide_join_thread(struct halide_thread *)
Join a thread.
halide_default_semaphore_init
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
Definition: thread_pool_common.h:696
dump_job_state
#define dump_job_state()
Definition: thread_pool_common.h:192
Halide::Runtime::Internal::work_queue_t::threads
halide_thread * threads[MAX_THREADS]
Definition: thread_pool_common.h:129
halide_semaphore_release
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)
Definition: thread_pool_common.h:793
halide_parallel_task_t
A parallel task to be passed to halide_do_parallel_tasks.
Definition: HalideRuntime.h:197
halide_cond_broadcast
void halide_cond_broadcast(struct halide_cond *cond)
Definition: synchronization_common.h:1164
Halide::Runtime::Internal::work_queue_t::assert_zeroed
ALWAYS_INLINE void assert_zeroed() const
Definition: thread_pool_common.h:146
halide_default_do_par_for
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
Definition: thread_pool_common.h:581
halide_semaphore_release_t
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
Definition: HalideRuntime.h:181
Halide::Runtime::Internal::work::running
ALWAYS_INLINE bool running() const
Definition: thread_pool_common.h:62
Halide::Runtime::Internal::custom_do_task
WEAK halide_do_task_t custom_do_task
Definition: thread_pool_common.h:548
Halide::Runtime::Internal::work_queue
WEAK work_queue_t work_queue
Definition: thread_pool_common.h:166
Halide::Runtime::Internal::work::next_job
work * next_job
Definition: thread_pool_common.h:33
Halide::Runtime::Internal::work_queue_t::wake_a_team
halide_cond wake_a_team
Definition: thread_pool_common.h:122
halide_semaphore_t
An opaque struct representing a semaphore.
Definition: HalideRuntime.h:167
WEAK
#define WEAK
Definition: runtime_internal.h:50
halide_set_custom_do_task
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
Definition: thread_pool_common.h:732
halide_set_custom_do_par_for
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
Definition: thread_pool_common.h:744
Halide::Runtime::Internal::custom_do_par_for
WEAK halide_do_par_for_t custom_do_par_for
Definition: thread_pool_common.h:550
user_context
void * user_context
Definition: printer.h:33
Halide::Runtime::Internal::work::make_runnable
ALWAYS_INLINE bool make_runnable()
Definition: thread_pool_common.h:46
halide_semaphore_impl_t::value
int value
Definition: thread_pool_common.h:693
Halide::Runtime::Internal::work::exit_status
int exit_status
Definition: thread_pool_common.h:41
halide_default_semaphore_release
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
Definition: thread_pool_common.h:702
Halide::Runtime::Internal::work::active_workers
int active_workers
Definition: thread_pool_common.h:40
Halide::Runtime::Internal::work_queue_t
Definition: thread_pool_common.h:94
Halide::Runtime::Internal
Definition: cpu_features.h:9
halide_parallel_task_t::name
const char * name
Definition: HalideRuntime.h:206
Halide::Runtime::Internal::work::siblings
work * siblings
Definition: thread_pool_common.h:34
halide_default_do_parallel_tasks
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Definition: thread_pool_common.h:613
halide_default_do_task
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
Definition: thread_pool_common.h:570
halide_shutdown_thread_pool
WEAK void halide_shutdown_thread_pool()
Definition: thread_pool_common.h:670
Halide::Runtime::Internal::work_queue_t::desired_threads_working
int desired_threads_working
Definition: thread_pool_common.h:99