Halide 19.0.0
Halide compiler and libraries
Loading...
Searching...
No Matches
thread_pool_common.h
Go to the documentation of this file.
1#define EXTENDED_DEBUG 0
2
3#if EXTENDED_DEBUG
4// This code is currently setup for Linux debugging. Switch to using pthread_self on e.g. Mac OS X.
5extern "C" int syscall(int);
6
7namespace {
8int gettid() {
9#ifdef BITS_32
10 return syscall(224);
11#else
12 return syscall(186);
13#endif
14}
15} // namespace
16
17// clang-format off
18#define log_message(stuff) do { print(nullptr) << gettid() << ": " << stuff << "\n"; } while (0)
19// clang-format on
20
21#else
22
23// clang-format off
24#define log_message(stuff) do { /*nothing*/ } while (0)
25// clang-format on
26
27#endif
28
29namespace Halide {
30namespace Runtime {
31namespace Internal {
32
33struct work {
35
36 // If we come in to the task system via do_par_for we just have a
37 // halide_task_t, not a halide_loop_task_t.
39
45
50 // which condition variable is the owner sleeping on. nullptr if it isn't sleeping.
52
57 // Note that we don't release the semaphores already
58 // acquired. We never have two consumers contending
59 // over the same semaphore, so it's not helpful to do
60 // so.
61 return false;
62 }
63 }
64 // Future iterations of this task need to acquire the semaphores from scratch.
66 return true;
67 }
68
69 ALWAYS_INLINE bool running() const {
70 return task.extent || active_workers;
71 }
72};
73
75 if (threads > MAX_THREADS) {
76 return MAX_THREADS;
77 } else if (threads < 1) {
78 return 1;
79 } else {
80 return threads;
81 }
82}
83
85 char *threads_str = getenv("HL_NUM_THREADS");
86 if (!threads_str) {
87 // Legacy name for HL_NUM_THREADS
88 threads_str = getenv("HL_NUMTHREADS");
89 }
90 return threads_str ?
91 atoi(threads_str) :
93}
94
95// The work queue and thread pool is weak, so one big work queue is shared by all halide functions
97 // all fields are protected by this mutex.
99
100 // The desired number threads doing work (HL_NUM_THREADS).
102
103 // All fields after this must be zero in the initial state. See assert_zeroed
104 // Field serves both to mark the offset in struct and as layout padding.
106
107 // Singly linked list for job stack
109
110 // The number threads created
112
113 // Workers sleep on one of two condition variables, to make it
114 // easier to wake up the right number if a small number of tasks
115 // are enqueued. There are A-team workers and B-team workers. The
116 // following variables track the current size and the desired size
117 // of the A team.
119
120 // The condition variables that workers and owners sleep on. We
121 // may want to wake them up independently. Any code that may
122 // invalidate any of the reasons a worker or owner may have slept
123 // must signal or broadcast the appropriate condition variable.
125
126 // The number of sleeping workers and owners. An over-estimate - a
127 // waking-up thread may not have decremented this yet.
129
130 // Keep track of threads so they can be joined at shutdown
131 halide_thread *threads[MAX_THREADS];
132
133 // Global flags indicating the threadpool should shut down, and
134 // whether the thread pool has been initialized.
136
137 // The number of threads that are currently commited to possibly block
138 // via outstanding jobs queued or being actively worked on. Used to limit
139 // the number of iterations of parallel for loops that are invoked so as
140 // to prevent deadlock due to oversubscription of threads.
142
143 ALWAYS_INLINE bool running() const {
144 return !shutdown;
145 }
146
147 // Used to check initial state is correct.
149 // Assert that all fields except the mutex and desired threads count are zeroed.
150 const char *bytes = ((const char *)&this->zero_marker);
151 const char *limit = ((const char *)this) + sizeof(work_queue_t);
152 while (bytes < limit && *bytes == 0) {
153 bytes++;
154 }
155 halide_abort_if_false(nullptr, bytes == limit && "Logic error in thread pool work queue initialization.\n");
156 }
157
158 // Return the work queue to initial state. Must be called while locked
159 // and queue will remain locked.
161 // Ensure all fields except the mutex and desired hreads count are zeroed.
162 char *bytes = ((char *)&this->zero_marker);
163 char *limit = ((char *)this) + sizeof(work_queue_t);
164 memset(bytes, 0, limit - bytes);
165 }
166};
167
169
170#if EXTENDED_DEBUG
171
172WEAK void print_job(work *job, const char *indent, const char *prefix = nullptr) {
173 if (prefix == nullptr) {
174 prefix = indent;
175 }
176 const char *name = job->task.name ? job->task.name : "<no name>";
177 const char *parent_name = job->parent_job ? (job->parent_job->task.name ? job->parent_job->task.name : "<no name>") : "<no parent job>";
178 log_message(prefix << name << "[" << job << "] serial: " << job->task.serial << " active_workers: " << job->active_workers << " min: " << job->task.min << " extent: " << job->task.extent << " siblings: " << job->siblings << " sibling count: " << job->sibling_count << " min_threads " << job->task.min_threads << " next_sempaphore: " << job->next_semaphore << " threads_reserved: " << job->threads_reserved << " parent_job: " << parent_name << "[" << job->parent_job << "]");
179 for (int i = 0; i < job->task.num_semaphores; i++) {
180 log_message(indent << " semaphore " << (void *)job->task.semaphores[i].semaphore << " count " << job->task.semaphores[i].count << " val " << *(int *)job->task.semaphores[i].semaphore);
181 }
182}
183
184WEAK void dump_job_state() {
185 log_message("Dumping job state, jobs in queue:");
186 work *job = work_queue.jobs;
187 while (job != nullptr) {
188 print_job(job, " ");
189 job = job->next_job;
190 }
191 log_message("Done dumping job state.");
192}
193
194#else
195
196// clang-format off
197#define print_job(job, indent, prefix) do { /*nothing*/ } while (0)
198#define dump_job_state() do { /*nothing*/ } while (0)
199// clang-format on
200
201#endif
202
203WEAK void worker_thread(void *);
204
206 int spin_count = 0;
207 const int max_spin_count = 40;
208
209 while (owned_job ? owned_job->running() : !work_queue.shutdown) {
210 work *job = work_queue.jobs;
211 work **prev_ptr = &work_queue.jobs;
212
213 if (owned_job) {
214 if (owned_job->exit_status != halide_error_code_success) {
215 if (owned_job->active_workers == 0) {
216 while (job != owned_job) {
217 prev_ptr = &job->next_job;
218 job = job->next_job;
219 }
220 *prev_ptr = job->next_job;
221 job->task.extent = 0;
222 continue; // So loop exit is always in the same place.
223 }
224 } else if (owned_job->parent_job && owned_job->parent_job->exit_status != halide_error_code_success) {
225 owned_job->exit_status = owned_job->parent_job->exit_status;
226 // The wakeup can likely be only done under certain conditions, but it is only happening
227 // in when an error has already occured and it seems more important to ensure reliable
228 // termination than to optimize this path.
230 continue;
231 }
232 }
233
235
236 // Find a job to run, prefering things near the top of the stack.
237 while (job) {
238 print_job(job, "", "Considering job ");
239 // Only schedule tasks with enough free worker threads
240 // around to complete. They may get stolen later, but only
241 // by tasks which can themselves use them to complete
242 // work, so forward progress is made.
243 bool enough_threads;
244
245 work *parent_job = job->parent_job;
246
247 int threads_available;
248 if (parent_job == nullptr) {
249 // The + 1 is because work_queue.threads_created does not include the main thread.
250 threads_available = (work_queue.threads_created + 1) - work_queue.threads_reserved;
251 } else {
252 if (parent_job->active_workers == 0) {
253 threads_available = parent_job->task.min_threads - parent_job->threads_reserved;
254 } else {
255 threads_available = parent_job->active_workers * parent_job->task.min_threads - parent_job->threads_reserved;
256 }
257 }
258 enough_threads = threads_available >= job->task.min_threads;
259
260 if (!enough_threads) {
261 log_message("Not enough threads for job " << job->task.name << " available: " << threads_available << " min_threads: " << job->task.min_threads);
262 }
263 bool can_use_this_thread_stack = !owned_job || (job->siblings == owned_job->siblings) || job->task.min_threads == 0;
264 if (!can_use_this_thread_stack) {
265 log_message("Cannot run job " << job->task.name << " on this thread.");
266 }
267 bool can_add_worker = (!job->task.serial || (job->active_workers == 0));
268 if (!can_add_worker) {
269 log_message("Cannot add worker to job " << job->task.name);
270 }
271
272 if (enough_threads && can_use_this_thread_stack && can_add_worker) {
273 if (job->make_runnable()) {
274 break;
275 } else {
276 log_message("Cannot acquire semaphores for " << job->task.name);
277 }
278 }
279 prev_ptr = &(job->next_job);
280 job = job->next_job;
281 }
282
283 if (!job) {
284 // There is no runnable job. Go to sleep.
285 if (owned_job) {
286 if (spin_count++ < max_spin_count) {
287 // Give the workers a chance to finish up before sleeping
291 } else {
293 owned_job->owner_is_sleeping = true;
295 owned_job->owner_is_sleeping = false;
297 }
298 } else {
301 // Transition to B team
305 } else if (spin_count++ < max_spin_count) {
306 // Spin waiting for new work
310 } else {
312 }
314 }
315 continue;
316 } else {
317 spin_count = 0;
318 }
319
320 log_message("Working on job " << job->task.name);
321
322 // Increment the active_worker count so that other threads
323 // are aware that this job is still in progress even
324 // though there are no outstanding tasks for it.
325 job->active_workers++;
326
327 if (job->parent_job == nullptr) {
329 log_message("Reserved " << job->task.min_threads << " on work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
330 } else {
332 log_message("Reserved " << job->task.min_threads << " on " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
333 }
334
335 int result = halide_error_code_success;
336
337 if (job->task.serial) {
338 // Remove it from the stack while we work on it
339 *prev_ptr = job->next_job;
340
341 // Release the lock and do the task.
343 int total_iters = 0;
344 int iters = 1;
345 while (result == halide_error_code_success) {
346 // Claim as many iterations as possible
347 while ((job->task.extent - total_iters) > iters &&
348 job->make_runnable()) {
349 iters++;
350 }
351 if (iters == 0) {
352 break;
353 }
354
355 // Do them
356 result = halide_do_loop_task(job->user_context, job->task.fn,
357 job->task.min + total_iters, iters,
358 job->task.closure, job);
359 total_iters += iters;
360 iters = 0;
361 }
363
364 job->task.min += total_iters;
365 job->task.extent -= total_iters;
366
367 // Put it back on the job stack, if it hasn't failed.
368 if (result != halide_error_code_success) {
369 job->task.extent = 0; // Force job to be finished.
370 } else if (job->task.extent > 0) {
371 job->next_job = work_queue.jobs;
372 work_queue.jobs = job;
373 }
374 } else {
375 // Claim a task from it.
376 work myjob = *job;
377 job->task.min++;
378 job->task.extent--;
379
380 // If there were no more tasks pending for this job, remove it
381 // from the stack.
382 if (job->task.extent == 0) {
383 *prev_ptr = job->next_job;
384 }
385
386 // Release the lock and do the task.
388 if (myjob.task_fn) {
389 result = halide_do_task(myjob.user_context, myjob.task_fn,
390 myjob.task.min, myjob.task.closure);
391 } else {
392 result = halide_do_loop_task(myjob.user_context, myjob.task.fn,
393 myjob.task.min, 1,
394 myjob.task.closure, job);
395 }
397 }
398
399 if (result != halide_error_code_success) {
400 log_message("Saw thread pool saw error from task: " << (int)result);
401 }
402
403 bool wake_owners = false;
404
405 // If this task failed, set the exit status on the job.
406 if (result != halide_error_code_success) {
407 job->exit_status = result;
408 // Mark all siblings as also failed.
409 for (int i = 0; i < job->sibling_count; i++) {
410 log_message("Marking " << job->sibling_count << " siblings ");
412 job->siblings[i].exit_status = result;
413 wake_owners |= (job->active_workers == 0 && job->siblings[i].owner_is_sleeping);
414 }
415 log_message("Done marking siblings.");
416 }
417 }
418
419 if (job->parent_job == nullptr) {
421 log_message("Returned " << job->task.min_threads << " to work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
422 } else {
424 log_message("Returned " << job->task.min_threads << " to " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
425 }
426
427 // We are no longer active on this job
428 job->active_workers--;
429
430 log_message("Done working on job " << job->task.name);
431
432 if (wake_owners ||
433 (job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != halide_error_code_success) && job->owner_is_sleeping)) {
434 // The job is done or some owned job failed via sibling linkage. Wake up the owner.
436 }
437 }
438}
439
445
446WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent) {
447 if (!work_queue.initialized) {
449
450 // Compute the desired number of threads to use. Other code
451 // can also mess with this value, but only when the work queue
452 // is locked.
455 }
457 work_queue.initialized = true;
458 }
459
460 // Gather some information about the work.
461
462 // Some tasks require a minimum number of threads to make forward
463 // progress. Also assume the blocking tasks need to run concurrently.
464 int min_threads = 0;
465
466 // Count how many workers to wake. Start at -1 because this thread
467 // will contribute.
468 int workers_to_wake = -1;
469
470 // Could stalled owners of other tasks conceivably help with one
471 // of these jobs.
472 bool stealable_jobs = false;
473
474 bool job_has_acquires = false;
475 bool job_may_block = false;
476 for (int i = 0; i < num_jobs; i++) {
477 if (jobs[i].task.min_threads == 0) {
478 stealable_jobs = true;
479 } else {
480 job_may_block = true;
481 min_threads += jobs[i].task.min_threads;
482 }
483 if (jobs[i].task.num_semaphores != 0) {
484 job_has_acquires = true;
485 }
486
487 if (jobs[i].task.serial) {
488 workers_to_wake++;
489 } else {
490 workers_to_wake += jobs[i].task.extent;
491 }
492 }
493
494 if (task_parent == nullptr) {
495 // This is here because some top-level jobs may block, but are not accounted for
496 // in any enclosing min_threads count. In order to handle extern stages and such
497 // correctly, we likely need to make the total min_threads for an invocation of
498 // a pipeline a property of the entire thing. This approach works because we use
499 // the increased min_threads count to increase the size of the thread pool. It should
500 // even be safe against reservation races because this is happening under the work
501 // queue lock and that lock will be held into running the job. However that's many
502 // lines of code from here to there and it is not guaranteed this will be the first
503 // job run.
504 if (job_has_acquires || job_may_block) {
505 log_message("enqueue_work_already_locked adding one to min_threads.");
506 min_threads += 1;
507 }
508
509 // Spawn more threads if necessary.
510 while (work_queue.threads_created < MAX_THREADS &&
512 (work_queue.threads_created + 1) - work_queue.threads_reserved < min_threads)) {
513 // We might need to make some new threads, if work_queue.desired_threads_working has
514 // increased, or if there aren't enough threads to complete this new task.
518 }
519 log_message("enqueue_work_already_locked top level job " << jobs[0].task.name << " with min_threads " << min_threads << " work_queue.threads_created " << work_queue.threads_created << " work_queue.threads_reserved " << work_queue.threads_reserved);
520 if (job_has_acquires || job_may_block) {
522 }
523 } else {
524 log_message("enqueue_work_already_locked job " << jobs[0].task.name << " with min_threads " << min_threads << " task_parent " << task_parent->task.name << " task_parent->task.min_threads " << task_parent->task.min_threads << " task_parent->threads_reserved " << task_parent->threads_reserved);
525 halide_abort_if_false(nullptr, (min_threads <= ((task_parent->task.min_threads * task_parent->active_workers) -
526 task_parent->threads_reserved)) &&
527 "Logic error: thread over commit.\n");
528 if (job_has_acquires || job_may_block) {
529 task_parent->threads_reserved++;
530 }
531 }
532
533 // Push the jobs onto the stack.
534 for (int i = num_jobs - 1; i >= 0; i--) {
535 // We could bubble it downwards based on some heuristics, but
536 // it's not strictly necessary to do so.
537 jobs[i].next_job = work_queue.jobs;
538 jobs[i].siblings = &jobs[0];
539 jobs[i].sibling_count = num_jobs;
540 jobs[i].threads_reserved = 0;
541 work_queue.jobs = jobs + i;
542 }
543
544 bool nested_parallelism =
547
548 // Wake up an appropriate number of threads
549 if (nested_parallelism || workers_to_wake > work_queue.workers_sleeping) {
550 // If there's nested parallelism going on, we just wake up
551 // everyone. TODO: make this more precise.
553 } else {
554 work_queue.target_a_team_size = workers_to_wake;
555 }
556
560 if (stealable_jobs) {
562 }
563 }
564
565 if (job_has_acquires || job_may_block) {
566 if (task_parent != nullptr) {
567 task_parent->threads_reserved--;
568 } else {
570 }
571 }
572}
573
581
582} // namespace Internal
583} // namespace Runtime
584} // namespace Halide
585
586using namespace Halide::Runtime::Internal;
587
588extern "C" {
589
590namespace {
591WEAK __attribute__((destructor)) void halide_thread_pool_cleanup() {
593}
594} // namespace
595
596WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx,
597 uint8_t *closure) {
598 return f(user_context, idx, closure);
599}
600
602 int min, int extent, uint8_t *closure,
603 void *task_parent) {
604 return f(user_context, min, extent, closure, task_parent);
605}
606
608 int min, int size, uint8_t *closure) {
609 if (size <= 0) {
611 }
612
613 work job;
614 job.task.fn = nullptr;
615 job.task.min = min;
616 job.task.extent = size;
617 job.task.serial = false;
618 job.task.semaphores = nullptr;
619 job.task.num_semaphores = 0;
620 job.task.closure = closure;
621 job.task.min_threads = 0;
622 job.task.name = nullptr;
623 job.task_fn = f;
624 job.user_context = user_context;
626 job.active_workers = 0;
627 job.next_semaphore = 0;
628 job.owner_is_sleeping = false;
629 job.siblings = &job; // guarantees no other job points to the same siblings.
630 job.sibling_count = 0;
631 job.parent_job = nullptr;
633 enqueue_work_already_locked(1, &job, nullptr);
636 return job.exit_status;
637}
638
639WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks,
640 struct halide_parallel_task_t *tasks,
641 void *task_parent) {
642 work *jobs = (work *)__builtin_alloca(sizeof(work) * num_tasks);
643
644 for (int i = 0; i < num_tasks; i++) {
645 if (tasks->extent <= 0) {
646 // Skip extent zero jobs
647 num_tasks--;
648 continue;
649 }
650 jobs[i].task = *tasks++;
651 jobs[i].task_fn = nullptr;
652 jobs[i].user_context = user_context;
654 jobs[i].active_workers = 0;
655 jobs[i].next_semaphore = 0;
656 jobs[i].owner_is_sleeping = false;
657 jobs[i].parent_job = (work *)task_parent;
658 }
659
660 if (num_tasks == 0) {
662 }
663
665 enqueue_work_already_locked(num_tasks, jobs, (work *)task_parent);
666 int exit_status = halide_error_code_success;
667 for (int i = 0; i < num_tasks; i++) {
668 // It doesn't matter what order we join the tasks in, because
669 // we'll happily assist with siblings too.
671 if (jobs[i].exit_status != halide_error_code_success) {
672 exit_status = jobs[i].exit_status;
673 }
674 }
676 return exit_status;
677}
678
680 if (n < 0) {
681 halide_error(nullptr, "halide_set_num_threads: must be >= 0.");
682 }
683 // Don't make this an atomic swap - we don't want to be changing
684 // the desired number of threads while another thread is in the
685 // middle of a sequence of non-atomic operations.
687 if (n == 0) {
689 }
693 return old;
694}
695
698 // Wake everyone up and tell them the party's over and it's time
699 // to go home
701
702 work_queue.shutdown = true;
707
708 // Wait until they leave
709 for (int i = 0; i < work_queue.threads_created; i++) {
711 }
712
713 // Tidy up
715 }
716}
717
721
724 Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->value, &n);
725 return n;
726}
727
730 int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->value, n);
731 // TODO(abadams|zvookin): Is this correct if an acquire can be for say count of 2 and the releases are 1 each?
732 if (old_val == 0 && n != 0) { // Don't wake if nothing released.
733 // We may have just made a job runnable
738 }
739 return old_val + n;
740}
741
743 if (n == 0) {
744 return true;
745 }
747 // Decrement and get new value
748 int expected;
749 int desired;
750 Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->value, &expected);
751 do {
752 desired = expected - n;
753 } while (desired >= 0 &&
754 !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->value, &expected, &desired));
755 return desired >= 0;
756}
757
763
769
775
777 halide_do_par_for_t do_par_for,
778 halide_do_task_t do_task,
779 halide_do_loop_task_t do_loop_task,
780 halide_do_parallel_tasks_t do_parallel_tasks,
781 halide_semaphore_init_t semaphore_init,
782 halide_semaphore_try_acquire_t semaphore_try_acquire,
783 halide_semaphore_release_t semaphore_release) {
784
785 custom_do_par_for = do_par_for;
786 custom_do_task = do_task;
787 custom_do_loop_task = do_loop_task;
788 custom_do_parallel_tasks = do_parallel_tasks;
789 custom_semaphore_init = semaphore_init;
790 custom_semaphore_try_acquire = semaphore_try_acquire;
791 custom_semaphore_release = semaphore_release;
792}
793
794WEAK int halide_do_task(void *user_context, halide_task_t f, int idx,
795 uint8_t *closure) {
796 return (*custom_do_task)(user_context, f, idx, closure);
797}
798
799WEAK int halide_do_par_for(void *user_context, halide_task_t f,
800 int min, int size, uint8_t *closure) {
801 return (*custom_do_par_for)(user_context, f, min, size, closure);
802}
803
805 int min, int size, uint8_t *closure, void *task_parent) {
806 return custom_do_loop_task(user_context, f, min, size, closure, task_parent);
807}
808
809WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks,
810 struct halide_parallel_task_t *tasks,
811 void *task_parent) {
812 return custom_do_parallel_tasks(user_context, num_tasks, tasks, task_parent);
813}
814
815WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count) {
816 return custom_semaphore_init(sema, count);
817}
818
820 return custom_semaphore_release(sema, count);
821}
822
824 return custom_semaphore_try_acquire(sema, count);
825}
826}
int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
bool halide_default_semaphore_try_acquire(struct halide_semaphore_t *, int n)
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
int halide_default_do_par_for(void *user_context, halide_task_t task, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int halide_default_semaphore_init(struct halide_semaphore_t *, int n)
void halide_mutex_unlock(struct halide_mutex *mutex)
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
int halide_default_semaphore_release(struct halide_semaphore_t *, int n)
void halide_join_thread(struct halide_thread *)
Join a thread.
@ halide_error_code_success
There was no error.
void halide_cond_broadcast(struct halide_cond *cond)
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
WEAK halide_semaphore_release_t custom_semaphore_release
WEAK halide_semaphore_init_t custom_semaphore_init
WEAK halide_do_task_t custom_do_task
WEAK halide_do_par_for_t custom_do_par_for
ALWAYS_INLINE int clamp_num_threads(int threads)
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
WEAK void worker_thread_already_locked(work *owned_job)
WEAK halide_do_loop_task_t custom_do_loop_task
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
WEAK int halide_host_cpu_count()
int atoi(const char *)
unsigned __INT8_TYPE__ uint8_t
void halide_thread_yield()
#define ALWAYS_INLINE
void * memset(void *s, int val, size_t n)
#define halide_abort_if_false(user_context, cond)
#define WEAK
char * getenv(const char *)
ALWAYS_INLINE bool running() const
Cross platform condition variable.
Cross-platform mutex.
A parallel task to be passed to halide_do_parallel_tasks.
struct halide_semaphore_acquire_t * semaphores
halide_loop_task_t fn
struct halide_semaphore_t * semaphore
An opaque struct representing a semaphore.
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
#define dump_job_state()
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
#define log_message(stuff)
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
#define print_job(job, indent, prefix)
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
WEAK void halide_shutdown_thread_pool()
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_set_num_threads(int n)
Set the number of threads used by Halide's thread pool.
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)