Halide
thread_pool_common.h
Go to the documentation of this file.
1 #define EXTENDED_DEBUG 0
2 
3 #if EXTENDED_DEBUG
4 // This code is currently setup for Linux debugging. Switch to using pthread_self on e.g. Mac OS X.
5 extern "C" int syscall(int);
6 
7 namespace {
8 int gettid() {
9 #ifdef BITS_32
10  return syscall(224);
11 #else
12  return syscall(186);
13 #endif
14 }
15 } // namespace
16 
17 // clang-format off
18 #define log_message(stuff) do { print(nullptr) << gettid() << ": " << stuff << "\n"; } while (0)
19 // clang-format on
20 
21 #else
22 
23 // clang-format off
24 #define log_message(stuff) do { /*nothing*/ } while (0)
25 // clang-format on
26 
27 #endif
28 
29 namespace Halide {
30 namespace Runtime {
31 namespace Internal {
32 
33 struct work {
35 
36  // If we come in to the task system via do_par_for we just have a
37  // halide_task_t, not a halide_loop_task_t.
39 
45 
46  void *user_context;
50  // which condition variable is the owner sleeping on. nullptr if it isn't sleeping.
52 
57  // Note that we don't release the semaphores already
58  // acquired. We never have two consumers contending
59  // over the same semaphore, so it's not helpful to do
60  // so.
61  return false;
62  }
63  }
64  // Future iterations of this task need to acquire the semaphores from scratch.
65  next_semaphore = 0;
66  return true;
67  }
68 
69  ALWAYS_INLINE bool running() const {
70  return task.extent || active_workers;
71  }
72 };
73 
74 ALWAYS_INLINE int clamp_num_threads(int threads) {
75  if (threads > MAX_THREADS) {
76  return MAX_THREADS;
77  } else if (threads < 1) {
78  return 1;
79  } else {
80  return threads;
81  }
82 }
83 
85  char *threads_str = getenv("HL_NUM_THREADS");
86  if (!threads_str) {
87  // Legacy name for HL_NUM_THREADS
88  threads_str = getenv("HL_NUMTHREADS");
89  }
90  return threads_str ?
91  atoi(threads_str) :
93 }
94 
95 // The work queue and thread pool is weak, so one big work queue is shared by all halide functions
96 struct work_queue_t {
97  // all fields are protected by this mutex.
99 
100  // The desired number threads doing work (HL_NUM_THREADS).
102 
103  // All fields after this must be zero in the initial state. See assert_zeroed
104  // Field serves both to mark the offset in struct and as layout padding.
106 
107  // Singly linked list for job stack
109 
110  // The number threads created
112 
113  // Workers sleep on one of two condition variables, to make it
114  // easier to wake up the right number if a small number of tasks
115  // are enqueued. There are A-team workers and B-team workers. The
116  // following variables track the current size and the desired size
117  // of the A team.
119 
120  // The condition variables that workers and owners sleep on. We
121  // may want to wake them up independently. Any code that may
122  // invalidate any of the reasons a worker or owner may have slept
123  // must signal or broadcast the appropriate condition variable.
125 
126  // The number of sleeping workers and owners. An over-estimate - a
127  // waking-up thread may not have decremented this yet.
129 
130  // Keep track of threads so they can be joined at shutdown
131  halide_thread *threads[MAX_THREADS];
132 
133  // Global flags indicating the threadpool should shut down, and
134  // whether the thread pool has been initialized.
136 
137  // The number of threads that are currently commited to possibly block
138  // via outstanding jobs queued or being actively worked on. Used to limit
139  // the number of iterations of parallel for loops that are invoked so as
140  // to prevent deadlock due to oversubscription of threads.
142 
143  ALWAYS_INLINE bool running() const {
144  return !shutdown;
145  }
146 
147  // Used to check initial state is correct.
149  // Assert that all fields except the mutex and desired threads count are zeroed.
150  const char *bytes = ((const char *)&this->zero_marker);
151  const char *limit = ((const char *)this) + sizeof(work_queue_t);
152  while (bytes < limit && *bytes == 0) {
153  bytes++;
154  }
155  halide_abort_if_false(nullptr, bytes == limit && "Logic error in thread pool work queue initialization.\n");
156  }
157 
158  // Return the work queue to initial state. Must be called while locked
159  // and queue will remain locked.
161  // Ensure all fields except the mutex and desired hreads count are zeroed.
162  char *bytes = ((char *)&this->zero_marker);
163  char *limit = ((char *)this) + sizeof(work_queue_t);
164  memset(bytes, 0, limit - bytes);
165  }
166 };
167 
169 
170 #if EXTENDED_DEBUG
171 
172 WEAK void print_job(work *job, const char *indent, const char *prefix = nullptr) {
173  if (prefix == nullptr) {
174  prefix = indent;
175  }
176  const char *name = job->task.name ? job->task.name : "<no name>";
177  const char *parent_name = job->parent_job ? (job->parent_job->task.name ? job->parent_job->task.name : "<no name>") : "<no parent job>";
178  log_message(prefix << name << "[" << job << "] serial: " << job->task.serial << " active_workers: " << job->active_workers << " min: " << job->task.min << " extent: " << job->task.extent << " siblings: " << job->siblings << " sibling count: " << job->sibling_count << " min_threads " << job->task.min_threads << " next_sempaphore: " << job->next_semaphore << " threads_reserved: " << job->threads_reserved << " parent_job: " << parent_name << "[" << job->parent_job << "]");
179  for (int i = 0; i < job->task.num_semaphores; i++) {
180  log_message(indent << " semaphore " << (void *)job->task.semaphores[i].semaphore << " count " << job->task.semaphores[i].count << " val " << *(int *)job->task.semaphores[i].semaphore);
181  }
182 }
183 
184 WEAK void dump_job_state() {
185  log_message("Dumping job state, jobs in queue:");
186  work *job = work_queue.jobs;
187  while (job != nullptr) {
188  print_job(job, " ");
189  job = job->next_job;
190  }
191  log_message("Done dumping job state.");
192 }
193 
194 #else
195 
196 // clang-format off
197 #define print_job(job, indent, prefix) do { /*nothing*/ } while (0)
198 #define dump_job_state() do { /*nothing*/ } while (0)
199 // clang-format on
200 
201 #endif
202 
203 WEAK void worker_thread(void *);
204 
206  int spin_count = 0;
207  const int max_spin_count = 40;
208 
209  while (owned_job ? owned_job->running() : !work_queue.shutdown) {
210  work *job = work_queue.jobs;
211  work **prev_ptr = &work_queue.jobs;
212 
213  if (owned_job) {
214  if (owned_job->exit_status != halide_error_code_success) {
215  if (owned_job->active_workers == 0) {
216  while (job != owned_job) {
217  prev_ptr = &job->next_job;
218  job = job->next_job;
219  }
220  *prev_ptr = job->next_job;
221  job->task.extent = 0;
222  continue; // So loop exit is always in the same place.
223  }
224  } else if (owned_job->parent_job && owned_job->parent_job->exit_status != halide_error_code_success) {
225  owned_job->exit_status = owned_job->parent_job->exit_status;
226  // The wakeup can likely be only done under certain conditions, but it is only happening
227  // in when an error has already occured and it seems more important to ensure reliable
228  // termination than to optimize this path.
230  continue;
231  }
232  }
233 
234  dump_job_state();
235 
236  // Find a job to run, prefering things near the top of the stack.
237  while (job) {
238  print_job(job, "", "Considering job ");
239  // Only schedule tasks with enough free worker threads
240  // around to complete. They may get stolen later, but only
241  // by tasks which can themselves use them to complete
242  // work, so forward progress is made.
243  bool enough_threads;
244 
245  work *parent_job = job->parent_job;
246 
247  int threads_available;
248  if (parent_job == nullptr) {
249  // The + 1 is because work_queue.threads_created does not include the main thread.
250  threads_available = (work_queue.threads_created + 1) - work_queue.threads_reserved;
251  } else {
252  if (parent_job->active_workers == 0) {
253  threads_available = parent_job->task.min_threads - parent_job->threads_reserved;
254  } else {
255  threads_available = parent_job->active_workers * parent_job->task.min_threads - parent_job->threads_reserved;
256  }
257  }
258  enough_threads = threads_available >= job->task.min_threads;
259 
260  if (!enough_threads) {
261  log_message("Not enough threads for job " << job->task.name << " available: " << threads_available << " min_threads: " << job->task.min_threads);
262  }
263  bool can_use_this_thread_stack = !owned_job || (job->siblings == owned_job->siblings) || job->task.min_threads == 0;
264  if (!can_use_this_thread_stack) {
265  log_message("Cannot run job " << job->task.name << " on this thread.");
266  }
267  bool can_add_worker = (!job->task.serial || (job->active_workers == 0));
268  if (!can_add_worker) {
269  log_message("Cannot add worker to job " << job->task.name);
270  }
271 
272  if (enough_threads && can_use_this_thread_stack && can_add_worker) {
273  if (job->make_runnable()) {
274  break;
275  } else {
276  log_message("Cannot acquire semaphores for " << job->task.name);
277  }
278  }
279  prev_ptr = &(job->next_job);
280  job = job->next_job;
281  }
282 
283  if (!job) {
284  // There is no runnable job. Go to sleep.
285  if (owned_job) {
286  if (spin_count++ < max_spin_count) {
287  // Give the workers a chance to finish up before sleeping
291  } else {
293  owned_job->owner_is_sleeping = true;
295  owned_job->owner_is_sleeping = false;
297  }
298  } else {
301  // Transition to B team
305  } else if (spin_count++ < max_spin_count) {
306  // Spin waiting for new work
310  } else {
312  }
314  }
315  continue;
316  } else {
317  spin_count = 0;
318  }
319 
320  log_message("Working on job " << job->task.name);
321 
322  // Increment the active_worker count so that other threads
323  // are aware that this job is still in progress even
324  // though there are no outstanding tasks for it.
325  job->active_workers++;
326 
327  if (job->parent_job == nullptr) {
329  log_message("Reserved " << job->task.min_threads << " on work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
330  } else {
332  log_message("Reserved " << job->task.min_threads << " on " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
333  }
334 
335  int result = halide_error_code_success;
336 
337  if (job->task.serial) {
338  // Remove it from the stack while we work on it
339  *prev_ptr = job->next_job;
340 
341  // Release the lock and do the task.
343  int total_iters = 0;
344  int iters = 1;
345  while (result == halide_error_code_success) {
346  // Claim as many iterations as possible
347  while ((job->task.extent - total_iters) > iters &&
348  job->make_runnable()) {
349  iters++;
350  }
351  if (iters == 0) {
352  break;
353  }
354 
355  // Do them
356  result = halide_do_loop_task(job->user_context, job->task.fn,
357  job->task.min + total_iters, iters,
358  job->task.closure, job);
359  total_iters += iters;
360  iters = 0;
361  }
363 
364  job->task.min += total_iters;
365  job->task.extent -= total_iters;
366 
367  // Put it back on the job stack, if it hasn't failed.
368  if (result != halide_error_code_success) {
369  job->task.extent = 0; // Force job to be finished.
370  } else if (job->task.extent > 0) {
371  job->next_job = work_queue.jobs;
372  work_queue.jobs = job;
373  }
374  } else {
375  // Claim a task from it.
376  work myjob = *job;
377  job->task.min++;
378  job->task.extent--;
379 
380  // If there were no more tasks pending for this job, remove it
381  // from the stack.
382  if (job->task.extent == 0) {
383  *prev_ptr = job->next_job;
384  }
385 
386  // Release the lock and do the task.
388  if (myjob.task_fn) {
389  result = halide_do_task(myjob.user_context, myjob.task_fn,
390  myjob.task.min, myjob.task.closure);
391  } else {
392  result = halide_do_loop_task(myjob.user_context, myjob.task.fn,
393  myjob.task.min, 1,
394  myjob.task.closure, job);
395  }
397  }
398 
399  if (result != halide_error_code_success) {
400  log_message("Saw thread pool saw error from task: " << (int)result);
401  }
402 
403  bool wake_owners = false;
404 
405  // If this task failed, set the exit status on the job.
406  if (result != halide_error_code_success) {
407  job->exit_status = result;
408  // Mark all siblings as also failed.
409  for (int i = 0; i < job->sibling_count; i++) {
410  log_message("Marking " << job->sibling_count << " siblings ");
412  job->siblings[i].exit_status = result;
413  wake_owners |= (job->active_workers == 0 && job->siblings[i].owner_is_sleeping);
414  }
415  log_message("Done marking siblings.");
416  }
417  }
418 
419  if (job->parent_job == nullptr) {
421  log_message("Returned " << job->task.min_threads << " to work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
422  } else {
424  log_message("Returned " << job->task.min_threads << " to " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
425  }
426 
427  // We are no longer active on this job
428  job->active_workers--;
429 
430  log_message("Done working on job " << job->task.name);
431 
432  if (wake_owners ||
433  (job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != halide_error_code_success) && job->owner_is_sleeping)) {
434  // The job is done or some owned job failed via sibling linkage. Wake up the owner.
436  }
437  }
438 }
439 
440 WEAK void worker_thread(void *arg) {
444 }
445 
446 WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent) {
447  if (!work_queue.initialized) {
449 
450  // Compute the desired number of threads to use. Other code
451  // can also mess with this value, but only when the work queue
452  // is locked.
455  }
457  work_queue.initialized = true;
458  }
459 
460  // Gather some information about the work.
461 
462  // Some tasks require a minimum number of threads to make forward
463  // progress. Also assume the blocking tasks need to run concurrently.
464  int min_threads = 0;
465 
466  // Count how many workers to wake. Start at -1 because this thread
467  // will contribute.
468  int workers_to_wake = -1;
469 
470  // Could stalled owners of other tasks conceivably help with one
471  // of these jobs.
472  bool stealable_jobs = false;
473 
474  bool job_has_acquires = false;
475  bool job_may_block = false;
476  for (int i = 0; i < num_jobs; i++) {
477  if (jobs[i].task.min_threads == 0) {
478  stealable_jobs = true;
479  } else {
480  job_may_block = true;
481  min_threads += jobs[i].task.min_threads;
482  }
483  if (jobs[i].task.num_semaphores != 0) {
484  job_has_acquires = true;
485  }
486 
487  if (jobs[i].task.serial) {
488  workers_to_wake++;
489  } else {
490  workers_to_wake += jobs[i].task.extent;
491  }
492  }
493 
494  if (task_parent == nullptr) {
495  // This is here because some top-level jobs may block, but are not accounted for
496  // in any enclosing min_threads count. In order to handle extern stages and such
497  // correctly, we likely need to make the total min_threads for an invocation of
498  // a pipeline a property of the entire thing. This approach works because we use
499  // the increased min_threads count to increase the size of the thread pool. It should
500  // even be safe against reservation races because this is happening under the work
501  // queue lock and that lock will be held into running the job. However that's many
502  // lines of code from here to there and it is not guaranteed this will be the first
503  // job run.
504  if (job_has_acquires || job_may_block) {
505  log_message("enqueue_work_already_locked adding one to min_threads.");
506  min_threads += 1;
507  }
508 
509  // Spawn more threads if necessary.
510  while (work_queue.threads_created < MAX_THREADS &&
512  (work_queue.threads_created + 1) - work_queue.threads_reserved < min_threads)) {
513  // We might need to make some new threads, if work_queue.desired_threads_working has
514  // increased, or if there aren't enough threads to complete this new task.
518  }
519  log_message("enqueue_work_already_locked top level job " << jobs[0].task.name << " with min_threads " << min_threads << " work_queue.threads_created " << work_queue.threads_created << " work_queue.threads_reserved " << work_queue.threads_reserved);
520  if (job_has_acquires || job_may_block) {
522  }
523  } else {
524  log_message("enqueue_work_already_locked job " << jobs[0].task.name << " with min_threads " << min_threads << " task_parent " << task_parent->task.name << " task_parent->task.min_threads " << task_parent->task.min_threads << " task_parent->threads_reserved " << task_parent->threads_reserved);
525  halide_abort_if_false(nullptr, (min_threads <= ((task_parent->task.min_threads * task_parent->active_workers) -
526  task_parent->threads_reserved)) &&
527  "Logic error: thread over commit.\n");
528  if (job_has_acquires || job_may_block) {
529  task_parent->threads_reserved++;
530  }
531  }
532 
533  // Push the jobs onto the stack.
534  for (int i = num_jobs - 1; i >= 0; i--) {
535  // We could bubble it downwards based on some heuristics, but
536  // it's not strictly necessary to do so.
537  jobs[i].next_job = work_queue.jobs;
538  jobs[i].siblings = &jobs[0];
539  jobs[i].sibling_count = num_jobs;
540  jobs[i].threads_reserved = 0;
541  work_queue.jobs = jobs + i;
542  }
543 
544  bool nested_parallelism =
547 
548  // Wake up an appropriate number of threads
549  if (nested_parallelism || workers_to_wake > work_queue.workers_sleeping) {
550  // If there's nested parallelism going on, we just wake up
551  // everyone. TODO: make this more precise.
553  } else {
554  work_queue.target_a_team_size = workers_to_wake;
555  }
556 
560  if (stealable_jobs) {
562  }
563  }
564 
565  if (job_has_acquires || job_may_block) {
566  if (task_parent != nullptr) {
567  task_parent->threads_reserved--;
568  } else {
570  }
571  }
572 }
573 
581 
582 } // namespace Internal
583 } // namespace Runtime
584 } // namespace Halide
585 
586 using namespace Halide::Runtime::Internal;
587 
588 extern "C" {
589 
590 namespace {
591 WEAK __attribute__((destructor)) void halide_thread_pool_cleanup() {
593 }
594 } // namespace
595 
596 WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx,
597  uint8_t *closure) {
598  return f(user_context, idx, closure);
599 }
600 
602  int min, int extent, uint8_t *closure,
603  void *task_parent) {
604  return f(user_context, min, extent, closure, task_parent);
605 }
606 
607 WEAK int halide_default_do_par_for(void *user_context, halide_task_t f,
608  int min, int size, uint8_t *closure) {
609  if (size <= 0) {
611  }
612 
613  work job;
614  job.task.fn = nullptr;
615  job.task.min = min;
616  job.task.extent = size;
617  job.task.serial = false;
618  job.task.semaphores = nullptr;
619  job.task.num_semaphores = 0;
620  job.task.closure = closure;
621  job.task.min_threads = 0;
622  job.task.name = nullptr;
623  job.task_fn = f;
624  job.user_context = user_context;
626  job.active_workers = 0;
627  job.next_semaphore = 0;
628  job.owner_is_sleeping = false;
629  job.siblings = &job; // guarantees no other job points to the same siblings.
630  job.sibling_count = 0;
631  job.parent_job = nullptr;
633  enqueue_work_already_locked(1, &job, nullptr);
636  return job.exit_status;
637 }
638 
639 WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks,
640  struct halide_parallel_task_t *tasks,
641  void *task_parent) {
642  work *jobs = (work *)__builtin_alloca(sizeof(work) * num_tasks);
643 
644  for (int i = 0; i < num_tasks; i++) {
645  if (tasks->extent <= 0) {
646  // Skip extent zero jobs
647  num_tasks--;
648  continue;
649  }
650  jobs[i].task = *tasks++;
651  jobs[i].task_fn = nullptr;
652  jobs[i].user_context = user_context;
654  jobs[i].active_workers = 0;
655  jobs[i].next_semaphore = 0;
656  jobs[i].owner_is_sleeping = false;
657  jobs[i].parent_job = (work *)task_parent;
658  }
659 
660  if (num_tasks == 0) {
662  }
663 
665  enqueue_work_already_locked(num_tasks, jobs, (work *)task_parent);
666  int exit_status = halide_error_code_success;
667  for (int i = 0; i < num_tasks; i++) {
668  // It doesn't matter what order we join the tasks in, because
669  // we'll happily assist with siblings too.
671  if (jobs[i].exit_status != halide_error_code_success) {
672  exit_status = jobs[i].exit_status;
673  }
674  }
676  return exit_status;
677 }
678 
680  if (n < 0) {
681  halide_error(nullptr, "halide_set_num_threads: must be >= 0.");
682  }
683  // Don't make this an atomic swap - we don't want to be changing
684  // the desired number of threads while another thread is in the
685  // middle of a sequence of non-atomic operations.
687  if (n == 0) {
689  }
693  return old;
694 }
695 
697  if (work_queue.initialized) {
698  // Wake everyone up and tell them the party's over and it's time
699  // to go home
701 
702  work_queue.shutdown = true;
707 
708  // Wait until they leave
709  for (int i = 0; i < work_queue.threads_created; i++) {
711  }
712 
713  // Tidy up
714  work_queue.reset();
715  }
716 }
717 
719  int value;
720 };
721 
724  Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->value, &n);
725  return n;
726 }
727 
730  int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->value, n);
731  // TODO(abadams|zvookin): Is this correct if an acquire can be for say count of 2 and the releases are 1 each?
732  if (old_val == 0 && n != 0) { // Don't wake if nothing released.
733  // We may have just made a job runnable
738  }
739  return old_val + n;
740 }
741 
743  if (n == 0) {
744  return true;
745  }
747  // Decrement and get new value
748  int expected;
749  int desired;
750  Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->value, &expected);
751  do {
752  desired = expected - n;
753  } while (desired >= 0 &&
754  !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->value, &expected, &desired));
755  return desired >= 0;
756 }
757 
760  custom_do_task = f;
761  return result;
762 }
763 
767  return result;
768 }
769 
772  custom_do_par_for = f;
773  return result;
774 }
775 
777  halide_do_par_for_t do_par_for,
778  halide_do_task_t do_task,
779  halide_do_loop_task_t do_loop_task,
780  halide_do_parallel_tasks_t do_parallel_tasks,
781  halide_semaphore_init_t semaphore_init,
782  halide_semaphore_try_acquire_t semaphore_try_acquire,
783  halide_semaphore_release_t semaphore_release) {
784 
785  custom_do_par_for = do_par_for;
786  custom_do_task = do_task;
787  custom_do_loop_task = do_loop_task;
788  custom_do_parallel_tasks = do_parallel_tasks;
789  custom_semaphore_init = semaphore_init;
790  custom_semaphore_try_acquire = semaphore_try_acquire;
791  custom_semaphore_release = semaphore_release;
792 }
793 
794 WEAK int halide_do_task(void *user_context, halide_task_t f, int idx,
795  uint8_t *closure) {
796  return (*custom_do_task)(user_context, f, idx, closure);
797 }
798 
799 WEAK int halide_do_par_for(void *user_context, halide_task_t f,
800  int min, int size, uint8_t *closure) {
801  return (*custom_do_par_for)(user_context, f, min, size, closure);
802 }
803 
804 WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f,
805  int min, int size, uint8_t *closure, void *task_parent) {
806  return custom_do_loop_task(user_context, f, min, size, closure, task_parent);
807 }
808 
809 WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks,
810  struct halide_parallel_task_t *tasks,
811  void *task_parent) {
812  return custom_do_parallel_tasks(user_context, num_tasks, tasks, task_parent);
813 }
814 
815 WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count) {
816  return custom_semaphore_init(sema, count);
817 }
818 
819 WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count) {
820  return custom_semaphore_release(sema, count);
821 }
822 
823 WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count) {
824  return custom_semaphore_try_acquire(sema, count);
825 }
826 }
halide_set_custom_parallel_runtime
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
Definition: thread_pool_common.h:776
Halide::Runtime::Internal::work_queue_t::threads_reserved
int threads_reserved
Definition: thread_pool_common.h:141
Halide::Runtime::Internal::custom_do_loop_task
WEAK halide_do_loop_task_t custom_do_loop_task
Definition: thread_pool_common.h:575
halide_task_t
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
Definition: HalideRuntime.h:204
halide_do_par_for_t
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
Definition: HalideRuntime.h:213
halide_semaphore_init
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
Definition: thread_pool_common.h:815
Halide::Runtime::Internal::custom_semaphore_init
WEAK halide_semaphore_init_t custom_semaphore_init
Definition: thread_pool_common.h:578
Halide::Runtime::Internal::work::user_context
void * user_context
Definition: thread_pool_common.h:46
Halide::Runtime::Internal::work::next_semaphore
int next_semaphore
Definition: thread_pool_common.h:49
uint8_t
unsigned __INT8_TYPE__ uint8_t
Definition: runtime_internal.h:29
halide_do_task_t
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
Definition: HalideRuntime.h:305
Halide::Runtime::Internal::work_queue_t::wake_owners
halide_cond wake_owners
Definition: thread_pool_common.h:124
Halide::Runtime::Internal::work
Definition: thread_pool_common.h:33
halide_error_code_success
@ halide_error_code_success
There was no error.
Definition: HalideRuntime.h:1039
halide_cond_wait
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
Definition: synchronization_common.h:898
halide_loop_task_t
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
Definition: HalideRuntime.h:238
halide_parallel_task_t::extent
int extent
Definition: HalideRuntime.h:265
Halide::Runtime::Internal::work::owner_is_sleeping
bool owner_is_sleeping
Definition: thread_pool_common.h:51
Halide::Runtime::Internal::work_queue_t::target_a_team_size
int target_a_team_size
Definition: thread_pool_common.h:118
Halide::min
Expr min(const FuncRef &a, const FuncRef &b)
Explicit overloads of min and max for FuncRef.
Definition: Func.h:584
Halide::Runtime::Internal::work::sibling_count
int sibling_count
Definition: thread_pool_common.h:42
Halide::Runtime::Internal::custom_semaphore_try_acquire
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
Definition: thread_pool_common.h:579
halide_mutex_lock
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
Definition: synchronization_common.h:874
halide_cond
Cross platform condition variable.
Definition: HalideRuntime.h:171
Halide::Runtime::Internal::work_queue_t::reset
ALWAYS_INLINE void reset()
Definition: thread_pool_common.h:160
halide_semaphore_try_acquire
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
Definition: thread_pool_common.h:823
Halide::Runtime::Internal::enqueue_work_already_locked
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
Definition: thread_pool_common.h:446
Halide::Runtime::Internal::work::parent_job
work * parent_job
Definition: thread_pool_common.h:43
Halide::Runtime::Internal::work_queue_t::owners_sleeping
int owners_sleeping
Definition: thread_pool_common.h:128
getenv
char * getenv(const char *)
halide_parallel_task_t::num_semaphores
int num_semaphores
Definition: HalideRuntime.h:261
halide_do_par_for
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
Definition: thread_pool_common.h:799
halide_do_parallel_tasks
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
Definition: thread_pool_common.h:809
halide_semaphore_acquire_t::count
int count
Definition: HalideRuntime.h:225
halide_do_task
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
Definition: thread_pool_common.h:794
Halide::Runtime::Internal::work::task_fn
halide_task_t task_fn
Definition: thread_pool_common.h:38
halide_do_parallel_tasks_t
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
Definition: HalideRuntime.h:327
Halide
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
Definition: AbstractGenerator.h:19
halide_error
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
halide_mutex_unlock
void halide_mutex_unlock(struct halide_mutex *mutex)
Definition: synchronization_common.h:880
atoi
int atoi(const char *)
Halide::Runtime::Internal::work_queue_t::workers_sleeping
int workers_sleeping
Definition: thread_pool_common.h:128
memset
void * memset(void *s, int val, size_t n)
Halide::Runtime::Internal::work_queue_t::shutdown
bool shutdown
Definition: thread_pool_common.h:135
halide_host_cpu_count
WEAK int halide_host_cpu_count()
halide_semaphore_init_t
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
Definition: HalideRuntime.h:230
Halide::Runtime::Internal::clamp_num_threads
ALWAYS_INLINE int clamp_num_threads(int threads)
Definition: thread_pool_common.h:74
halide_set_custom_do_loop_task
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
Definition: thread_pool_common.h:764
Halide::Runtime::Internal::work_queue_t::threads_created
int threads_created
Definition: thread_pool_common.h:111
print_job
#define print_job(job, indent, prefix)
Definition: thread_pool_common.h:197
halide_spawn_thread
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
halide_parallel_task_t::closure
uint8_t * closure
Definition: HalideRuntime.h:253
Halide::Runtime::Internal::work_queue_t::zero_marker
int zero_marker
Definition: thread_pool_common.h:105
Halide::Runtime::Internal::work_queue_t::mutex
halide_mutex mutex
Definition: thread_pool_common.h:98
halide_semaphore_acquire_t::semaphore
struct halide_semaphore_t * semaphore
Definition: HalideRuntime.h:224
halide_do_loop_task_t
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
Definition: HalideRuntime.h:314
Halide::Runtime::Internal::work::task
halide_parallel_task_t task
Definition: thread_pool_common.h:34
Halide::Runtime::Internal::work_queue_t::a_team_size
int a_team_size
Definition: thread_pool_common.h:118
halide_semaphore_impl_t
Definition: thread_pool_common.h:718
halide_parallel_task_t::min_threads
int min_threads
Definition: HalideRuntime.h:284
Halide::Runtime::Internal::work::threads_reserved
int threads_reserved
Definition: thread_pool_common.h:44
log_message
#define log_message(stuff)
Definition: thread_pool_common.h:24
Halide::Runtime::Internal::worker_thread
WEAK void worker_thread(void *)
Definition: thread_pool_common.h:440
Halide::Runtime::Internal::default_desired_num_threads
WEAK int default_desired_num_threads()
Definition: thread_pool_common.h:84
halide_semaphore_try_acquire_t
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
Definition: HalideRuntime.h:232
Halide::Runtime::Internal::custom_semaphore_release
WEAK halide_semaphore_release_t custom_semaphore_release
Definition: thread_pool_common.h:580
halide_mutex
Cross-platform mutex.
Definition: HalideRuntime.h:166
halide_default_do_loop_task
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
Definition: thread_pool_common.h:601
Halide::Runtime::Internal::work_queue_t::jobs
work * jobs
Definition: thread_pool_common.h:108
halide_parallel_task_t::fn
halide_loop_task_t fn
Definition: HalideRuntime.h:250
Halide::Runtime::Internal::work_queue_t::wake_b_team
halide_cond wake_b_team
Definition: thread_pool_common.h:124
Halide::Runtime::Internal::worker_thread_already_locked
WEAK void worker_thread_already_locked(work *owned_job)
Definition: thread_pool_common.h:205
halide_parallel_task_t::min
int min
Definition: HalideRuntime.h:265
halide_default_semaphore_try_acquire
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
Definition: thread_pool_common.h:742
Halide::Runtime::Internal::custom_do_parallel_tasks
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
Definition: thread_pool_common.h:577
halide_set_num_threads
WEAK int halide_set_num_threads(int n)
Set the number of threads used by Halide's thread pool.
Definition: thread_pool_common.h:679
halide_thread_yield
void halide_thread_yield()
Halide::Runtime::Internal::work_queue_t::running
ALWAYS_INLINE bool running() const
Definition: thread_pool_common.h:143
Halide::Runtime::Internal::work_queue_t::initialized
bool initialized
Definition: thread_pool_common.h:135
halide_parallel_task_t::semaphores
struct halide_semaphore_acquire_t * semaphores
Definition: HalideRuntime.h:260
ALWAYS_INLINE
#define ALWAYS_INLINE
Definition: runtime_internal.h:55
halide_do_loop_task
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
Definition: thread_pool_common.h:804
halide_parallel_task_t::serial
bool serial
Definition: HalideRuntime.h:289
halide_join_thread
void halide_join_thread(struct halide_thread *)
Join a thread.
halide_default_semaphore_init
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
Definition: thread_pool_common.h:722
dump_job_state
#define dump_job_state()
Definition: thread_pool_common.h:198
Halide::Runtime::Internal::work_queue_t::threads
halide_thread * threads[MAX_THREADS]
Definition: thread_pool_common.h:131
halide_semaphore_release
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)
Definition: thread_pool_common.h:819
halide_parallel_task_t
A parallel task to be passed to halide_do_parallel_tasks.
Definition: HalideRuntime.h:247
halide_cond_broadcast
void halide_cond_broadcast(struct halide_cond *cond)
Definition: synchronization_common.h:886
Halide::Runtime::Internal::work_queue_t::assert_zeroed
ALWAYS_INLINE void assert_zeroed() const
Definition: thread_pool_common.h:148
halide_default_do_par_for
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
Definition: thread_pool_common.h:607
halide_semaphore_release_t
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
Definition: HalideRuntime.h:231
Halide::Runtime::Internal::work::running
ALWAYS_INLINE bool running() const
Definition: thread_pool_common.h:69
Halide::Runtime::Internal::custom_do_task
WEAK halide_do_task_t custom_do_task
Definition: thread_pool_common.h:574
Halide::Runtime::Internal::work_queue
WEAK work_queue_t work_queue
Definition: thread_pool_common.h:168
Halide::Runtime::Internal::work::next_job
work * next_job
Definition: thread_pool_common.h:40
halide_abort_if_false
#define halide_abort_if_false(user_context, cond)
Definition: runtime_internal.h:262
Halide::Runtime::Internal::work_queue_t::wake_a_team
halide_cond wake_a_team
Definition: thread_pool_common.h:124
halide_semaphore_t
An opaque struct representing a semaphore.
Definition: HalideRuntime.h:217
WEAK
#define WEAK
Definition: runtime_internal.h:52
halide_set_custom_do_task
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
Definition: thread_pool_common.h:758
halide_set_custom_do_par_for
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
Definition: thread_pool_common.h:770
Halide::Runtime::Internal::custom_do_par_for
WEAK halide_do_par_for_t custom_do_par_for
Definition: thread_pool_common.h:576
Halide::Runtime::Internal::work::make_runnable
ALWAYS_INLINE bool make_runnable()
Definition: thread_pool_common.h:53
halide_semaphore_impl_t::value
int value
Definition: thread_pool_common.h:719
Halide::Runtime::Internal::work::exit_status
int exit_status
Definition: thread_pool_common.h:48
halide_default_semaphore_release
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
Definition: thread_pool_common.h:728
Halide::Runtime::Internal::work::active_workers
int active_workers
Definition: thread_pool_common.h:47
Halide::Runtime::Internal::work_queue_t
Definition: thread_pool_common.h:96
Halide::Runtime::Internal
Definition: constants.h:13
halide_parallel_task_t::name
const char * name
Definition: HalideRuntime.h:256
Halide::Runtime::Internal::work::siblings
work * siblings
Definition: thread_pool_common.h:41
halide_default_do_parallel_tasks
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Definition: thread_pool_common.h:639
halide_default_do_task
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
Definition: thread_pool_common.h:596
halide_shutdown_thread_pool
WEAK void halide_shutdown_thread_pool()
Definition: thread_pool_common.h:696
Halide::Runtime::Internal::work_queue_t::desired_threads_working
int desired_threads_working
Definition: thread_pool_common.h:101