Halide 21.0.0
Halide compiler and libraries
Loading...
Searching...
No Matches
thread_pool_common.h
Go to the documentation of this file.
1#define EXTENDED_DEBUG 0
2
3#if EXTENDED_DEBUG
4// This code is currently setup for Linux debugging. Switch to using pthread_self on e.g. Mac OS X.
5extern "C" int syscall(int);
6
7namespace {
8int gettid() {
9#ifdef BITS_32
10 return syscall(224);
11#else
12 return syscall(186);
13#endif
14}
15} // namespace
16
17// clang-format off
18#define log_message(stuff) do { print(nullptr) << gettid() << ": " << stuff << "\n"; } while (0)
19// clang-format on
20
21#else
22
23// clang-format off
24#define log_message(stuff) do { /*nothing*/ } while (0)
25// clang-format on
26
27#endif
28
29namespace Halide {
30namespace Runtime {
31namespace Internal {
32
33// A condition variable, augmented with a bit of spinning on an atomic counter
34// before going to sleep for real. This helps reduce overhead at the end of a
35// parallel for loop when idle worker threads are waiting for other threads to
36// finish so that the next parallel for loop can begin.
40
41 void wait(halide_mutex *mutex) {
42 // First spin for a bit, checking the counter for another thread to bump
43 // it.
44 uintptr_t initial;
45 Synchronization::atomic_load_relaxed(&counter, &initial);
47 for (int spin = 0; spin < 40; spin++) {
49 uintptr_t current;
50 Synchronization::atomic_load_relaxed(&counter, &current);
51 if (current != initial) {
52 halide_mutex_lock(mutex);
53 return;
54 }
55 }
56
57 // Give up on spinning and relock the mutex preparing to sleep for real.
58 halide_mutex_lock(mutex);
59
60 // Check one final time with the lock held. This guarantees we won't
61 // miss an increment of the counter because it is only ever incremented
62 // with the lock held.
63 uintptr_t current;
64 Synchronization::atomic_load_relaxed(&counter, &current);
65 if (current != initial) {
66 return;
67 }
68
69 halide_cond_wait(&cond, mutex);
70 }
71
72 void broadcast() {
73 // Release any spinning waiters
74 Synchronization::atomic_fetch_add_acquire_release(&counter, (uintptr_t)1);
75
76 // Release any sleeping waiters
78 }
79
80 // Note that this cond var variant doesn't have signal(), because it always
81 // wakes all spinning waiters.
82};
83
84struct work {
86
87 // If we come in to the task system via do_par_for we just have a
88 // halide_task_t, not a halide_loop_task_t.
90
96
101 // which condition variable is the owner sleeping on. nullptr if it isn't sleeping.
103
105 for (; next_semaphore < task.num_semaphores; next_semaphore++) {
107 task.semaphores[next_semaphore].count)) {
108 // Note that we don't release the semaphores already
109 // acquired. We never have two consumers contending
110 // over the same semaphore, so it's not helpful to do
111 // so.
112 return false;
113 }
114 }
115 // Future iterations of this task need to acquire the semaphores from scratch.
116 next_semaphore = 0;
117 return true;
118 }
119
120 ALWAYS_INLINE bool running() const {
121 return task.extent || active_workers;
122 }
123};
124
126 if (threads > MAX_THREADS) {
127 return MAX_THREADS;
128 } else if (threads < 1) {
129 return 1;
130 } else {
131 return threads;
132 }
133}
134
136 char *threads_str = getenv("HL_NUM_THREADS");
137 if (!threads_str) {
138 // Legacy name for HL_NUM_THREADS
139 threads_str = getenv("HL_NUMTHREADS");
140 }
141 return threads_str ?
142 atoi(threads_str) :
144}
145
146// The work queue and thread pool is weak, so one big work queue is shared by all halide functions
148 // all fields are protected by this mutex.
150
151 // The desired number threads doing work (HL_NUM_THREADS).
153
154 // All fields after this must be zero in the initial state. See assert_zeroed
155 // Field serves both to mark the offset in struct and as layout padding.
157
158 // Singly linked list for job stack
160
161 // The number threads created
163
164 // Workers sleep on one of two condition variables, to make it
165 // easier to wake up the right number if a small number of tasks
166 // are enqueued. There are A-team workers and B-team workers. The
167 // following variables track the current size and the desired size
168 // of the A team.
170
171 // The condition variables that workers and owners sleep on. We
172 // may want to wake them up independently. Any code that may
173 // invalidate any of the reasons a worker or owner may have slept
174 // must signal or broadcast the appropriate condition variable.
176
177 // The number of sleeping workers and owners. An over-estimate - a
178 // waking-up thread may not have decremented this yet.
180
181 // Keep track of threads so they can be joined at shutdown
182 halide_thread *threads[MAX_THREADS];
183
184 // Global flags indicating the threadpool should shut down, and
185 // whether the thread pool has been initialized.
187
188 // The number of threads that are currently commited to possibly block
189 // via outstanding jobs queued or being actively worked on. Used to limit
190 // the number of iterations of parallel for loops that are invoked so as
191 // to prevent deadlock due to oversubscription of threads.
193
194 ALWAYS_INLINE bool running() const {
195 return !shutdown;
196 }
197
198 // Used to check initial state is correct.
200 // Assert that all fields except the mutex and desired threads count are zeroed.
201 const char *bytes = ((const char *)&this->zero_marker);
202 const char *limit = ((const char *)this) + sizeof(work_queue_t);
203 while (bytes < limit && *bytes == 0) {
204 bytes++;
205 }
206 halide_abort_if_false(nullptr, bytes == limit && "Logic error in thread pool work queue initialization.\n");
207 }
208
209 // Return the work queue to initial state. Must be called while locked
210 // and queue will remain locked.
212 // Ensure all fields except the mutex and desired hreads count are zeroed.
213 char *bytes = ((char *)&this->zero_marker);
214 char *limit = ((char *)this) + sizeof(work_queue_t);
215 memset(bytes, 0, limit - bytes);
216 }
217};
218
220
221#if EXTENDED_DEBUG
222
223WEAK void print_job(work *job, const char *indent, const char *prefix = nullptr) {
224 if (prefix == nullptr) {
225 prefix = indent;
226 }
227 const char *name = job->task.name ? job->task.name : "<no name>";
228 const char *parent_name = job->parent_job ? (job->parent_job->task.name ? job->parent_job->task.name : "<no name>") : "<no parent job>";
229 log_message(prefix << name << "[" << job << "] serial: " << job->task.serial << " active_workers: " << job->active_workers << " min: " << job->task.min << " extent: " << job->task.extent << " siblings: " << job->siblings << " sibling count: " << job->sibling_count << " min_threads " << job->task.min_threads << " next_sempaphore: " << job->next_semaphore << " threads_reserved: " << job->threads_reserved << " parent_job: " << parent_name << "[" << job->parent_job << "]");
230 for (int i = 0; i < job->task.num_semaphores; i++) {
231 log_message(indent << " semaphore " << (void *)job->task.semaphores[i].semaphore << " count " << job->task.semaphores[i].count << " val " << *(int *)job->task.semaphores[i].semaphore);
232 }
233}
234
235WEAK void dump_job_state() {
236 log_message("Dumping job state, jobs in queue:");
237 work *job = work_queue.jobs;
238 while (job != nullptr) {
239 print_job(job, " ");
240 job = job->next_job;
241 }
242 log_message("Done dumping job state.");
243}
244
245#else
246
247// clang-format off
248#define print_job(job, indent, prefix) do { /*nothing*/ } while (0)
249#define dump_job_state() do { /*nothing*/ } while (0)
250// clang-format on
251
252#endif
253
254WEAK void worker_thread(void *);
255
256WEAK void worker_thread_stall(work *owned_job) {
257 work_queue.owners_sleeping++;
258 owned_job->owner_is_sleeping = true;
259 work_queue.wake_owners.wait(&work_queue.mutex);
260 owned_job->owner_is_sleeping = false;
261 work_queue.owners_sleeping--;
262}
263
265 work_queue.workers_sleeping++;
266 if (work_queue.a_team_size > work_queue.target_a_team_size) {
267 // Transition to B team
268 work_queue.a_team_size--;
269 work_queue.wake_b_team.wait(&work_queue.mutex);
270 work_queue.a_team_size++;
271 } else {
272 work_queue.wake_a_team.wait(&work_queue.mutex);
273 }
274 work_queue.workers_sleeping--;
275}
276
278 while (owned_job ? owned_job->running() : !work_queue.shutdown) {
279 work *job = work_queue.jobs;
280 work **prev_ptr = &work_queue.jobs;
281
282 if (owned_job) {
283 if (owned_job->exit_status != halide_error_code_success) {
284 if (owned_job->active_workers == 0) {
285 while (job != owned_job) {
286 prev_ptr = &job->next_job;
287 job = job->next_job;
288 }
289 *prev_ptr = job->next_job;
290 job->task.extent = 0;
291 continue; // So loop exit is always in the same place.
292 }
293 } else if (owned_job->parent_job && owned_job->parent_job->exit_status != halide_error_code_success) {
294 owned_job->exit_status = owned_job->parent_job->exit_status;
295 // The wakeup can likely be only done under certain conditions, but it is only happening
296 // in when an error has already occured and it seems more important to ensure reliable
297 // termination than to optimize this path.
298 work_queue.wake_owners.broadcast();
299 continue;
300 }
301 }
302
304
305 // Find a job to run, prefering things near the top of the stack.
306 while (job) {
307 print_job(job, "", "Considering job ");
308 // Only schedule tasks with enough free worker threads
309 // around to complete. They may get stolen later, but only
310 // by tasks which can themselves use them to complete
311 // work, so forward progress is made.
312 bool enough_threads;
313
314 work *parent_job = job->parent_job;
315
316 int threads_available;
317 if (parent_job == nullptr) {
318 // The + 1 is because work_queue.threads_created does not include the main thread.
319 threads_available = (work_queue.threads_created + 1) - work_queue.threads_reserved;
320 } else {
321 if (parent_job->active_workers == 0) {
322 threads_available = parent_job->task.min_threads - parent_job->threads_reserved;
323 } else {
324 threads_available = parent_job->active_workers * parent_job->task.min_threads - parent_job->threads_reserved;
325 }
326 }
327 enough_threads = threads_available >= job->task.min_threads;
328
329 if (!enough_threads) {
330 log_message("Not enough threads for job " << job->task.name << " available: " << threads_available << " min_threads: " << job->task.min_threads);
331 }
332 bool can_use_this_thread_stack = !owned_job || (job->siblings == owned_job->siblings) || job->task.min_threads == 0;
333 if (!can_use_this_thread_stack) {
334 log_message("Cannot run job " << job->task.name << " on this thread.");
335 }
336 bool can_add_worker = (!job->task.serial || (job->active_workers == 0));
337 if (!can_add_worker) {
338 log_message("Cannot add worker to job " << job->task.name);
339 }
340
341 if (enough_threads && can_use_this_thread_stack && can_add_worker) {
342 if (job->make_runnable()) {
343 break;
344 } else {
345 log_message("Cannot acquire semaphores for " << job->task.name);
346 }
347 }
348 prev_ptr = &(job->next_job);
349 job = job->next_job;
350 }
351
352 if (!job) {
353 // There is no runnable job. Go to sleep.
354 // The "stall" and "idle" function calls are not strictly necessary
355 // and could be inlined here, but having symbols for these situations
356 // is very informative when profiling.
357 if (owned_job) {
358 worker_thread_stall(owned_job);
359 } else {
361 }
362 continue;
363 }
364
365 log_message("Working on job " << job->task.name);
366
367 // Increment the active_worker count so that other threads
368 // are aware that this job is still in progress even
369 // though there are no outstanding tasks for it.
370 job->active_workers++;
371
372 if (job->parent_job == nullptr) {
373 work_queue.threads_reserved += job->task.min_threads;
374 log_message("Reserved " << job->task.min_threads << " on work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
375 } else {
377 log_message("Reserved " << job->task.min_threads << " on " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
378 }
379
380 int result = halide_error_code_success;
381
382 if (job->task.serial) {
383 // Remove it from the stack while we work on it
384 *prev_ptr = job->next_job;
385
386 // Release the lock and do the task.
388 int total_iters = 0;
389 int iters = 1;
390 while (result == halide_error_code_success) {
391 // Claim as many iterations as possible
392 while ((job->task.extent - total_iters) > iters &&
393 job->make_runnable()) {
394 iters++;
395 }
396 if (iters == 0) {
397 break;
398 }
399
400 // Do them
401 result = halide_do_loop_task(job->user_context, job->task.fn,
402 job->task.min + total_iters, iters,
403 job->task.closure, job);
404 total_iters += iters;
405 iters = 0;
406 }
408
409 job->task.min += total_iters;
410 job->task.extent -= total_iters;
411
412 // Put it back on the job stack, if it hasn't failed.
413 if (result != halide_error_code_success) {
414 job->task.extent = 0; // Force job to be finished.
415 } else if (job->task.extent > 0) {
416 job->next_job = work_queue.jobs;
417 work_queue.jobs = job;
418 }
419 } else {
420 // Claim a task from it.
421 work myjob = *job;
422 job->task.min++;
423 job->task.extent--;
424
425 // If there were no more tasks pending for this job, remove it
426 // from the stack.
427 if (job->task.extent == 0) {
428 *prev_ptr = job->next_job;
429 }
430
431 // Release the lock and do the task.
433 if (myjob.task_fn) {
434 result = halide_do_task(myjob.user_context, myjob.task_fn,
435 myjob.task.min, myjob.task.closure);
436 } else {
437 result = halide_do_loop_task(myjob.user_context, myjob.task.fn,
438 myjob.task.min, 1,
439 myjob.task.closure, job);
440 }
442 }
443
444 if (result != halide_error_code_success) {
445 log_message("Saw thread pool saw error from task: " << (int)result);
446 }
447
448 bool wake_owners = false;
449
450 // If this task failed, set the exit status on the job.
451 if (result != halide_error_code_success) {
452 job->exit_status = result;
453 // Mark all siblings as also failed.
454 for (int i = 0; i < job->sibling_count; i++) {
455 log_message("Marking " << job->sibling_count << " siblings ");
457 job->siblings[i].exit_status = result;
458 wake_owners |= (job->active_workers == 0 && job->siblings[i].owner_is_sleeping);
459 }
460 log_message("Done marking siblings.");
461 }
462 }
463
464 if (job->parent_job == nullptr) {
465 work_queue.threads_reserved -= job->task.min_threads;
466 log_message("Returned " << job->task.min_threads << " to work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
467 } else {
469 log_message("Returned " << job->task.min_threads << " to " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
470 }
471
472 // We are no longer active on this job
473 job->active_workers--;
474
475 log_message("Done working on job " << job->task.name);
476
477 if (wake_owners ||
478 (job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != halide_error_code_success) && job->owner_is_sleeping)) {
479 // The job is done or some owned job failed via sibling linkage. Wake up the owner.
480 work_queue.wake_owners.broadcast();
481 }
482 }
483}
484
490
491WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent) {
492 if (!work_queue.initialized) {
493 work_queue.assert_zeroed();
494
495 // Compute the desired number of threads to use. Other code
496 // can also mess with this value, but only when the work queue
497 // is locked.
498 if (!work_queue.desired_threads_working) {
499 work_queue.desired_threads_working = default_desired_num_threads();
500 }
501 work_queue.desired_threads_working = clamp_num_threads(work_queue.desired_threads_working);
502 work_queue.initialized = true;
503 }
504
505 // Gather some information about the work.
506
507 // Some tasks require a minimum number of threads to make forward
508 // progress. Also assume the blocking tasks need to run concurrently.
509 int min_threads = 0;
510
511 // Count how many workers to wake. Start at -1 because this thread
512 // will contribute.
513 int workers_to_wake = -1;
514
515 // Could stalled owners of other tasks conceivably help with one
516 // of these jobs.
517 bool stealable_jobs = false;
518
519 bool job_has_acquires = false;
520 bool job_may_block = false;
521 for (int i = 0; i < num_jobs; i++) {
522 if (jobs[i].task.min_threads == 0) {
523 stealable_jobs = true;
524 } else {
525 job_may_block = true;
526 min_threads += jobs[i].task.min_threads;
527 }
528 if (jobs[i].task.num_semaphores != 0) {
529 job_has_acquires = true;
530 }
531
532 if (jobs[i].task.serial) {
533 workers_to_wake++;
534 } else {
535 workers_to_wake += jobs[i].task.extent;
536 }
537 }
538
539 if (task_parent == nullptr) {
540 // This is here because some top-level jobs may block, but are not accounted for
541 // in any enclosing min_threads count. In order to handle extern stages and such
542 // correctly, we likely need to make the total min_threads for an invocation of
543 // a pipeline a property of the entire thing. This approach works because we use
544 // the increased min_threads count to increase the size of the thread pool. It should
545 // even be safe against reservation races because this is happening under the work
546 // queue lock and that lock will be held into running the job. However that's many
547 // lines of code from here to there and it is not guaranteed this will be the first
548 // job run.
549 if (job_has_acquires || job_may_block) {
550 log_message("enqueue_work_already_locked adding one to min_threads.");
551 min_threads += 1;
552 }
553
554 // Spawn more threads if necessary.
555 while (work_queue.threads_created < MAX_THREADS &&
556 ((work_queue.threads_created < work_queue.desired_threads_working - 1) ||
557 (work_queue.threads_created + 1) - work_queue.threads_reserved < min_threads)) {
558 // We might need to make some new threads, if work_queue.desired_threads_working has
559 // increased, or if there aren't enough threads to complete this new task.
560 work_queue.a_team_size++;
561 work_queue.threads[work_queue.threads_created++] =
563 }
564 log_message("enqueue_work_already_locked top level job " << jobs[0].task.name << " with min_threads " << min_threads << " work_queue.threads_created " << work_queue.threads_created << " work_queue.threads_reserved " << work_queue.threads_reserved);
565 if (job_has_acquires || job_may_block) {
566 work_queue.threads_reserved++;
567 }
568 } else {
569 log_message("enqueue_work_already_locked job " << jobs[0].task.name << " with min_threads " << min_threads << " task_parent " << task_parent->task.name << " task_parent->task.min_threads " << task_parent->task.min_threads << " task_parent->threads_reserved " << task_parent->threads_reserved);
570 halide_abort_if_false(nullptr, (min_threads <= ((task_parent->task.min_threads * task_parent->active_workers) -
571 task_parent->threads_reserved)) &&
572 "Logic error: thread over commit.\n");
573 if (job_has_acquires || job_may_block) {
574 task_parent->threads_reserved++;
575 }
576 }
577
578 // Push the jobs onto the stack.
579 for (int i = num_jobs - 1; i >= 0; i--) {
580 // We could bubble it downwards based on some heuristics, but
581 // it's not strictly necessary to do so.
582 jobs[i].next_job = work_queue.jobs;
583 jobs[i].siblings = &jobs[0];
584 jobs[i].sibling_count = num_jobs;
585 jobs[i].threads_reserved = 0;
586 work_queue.jobs = jobs + i;
587 }
588
589 bool nested_parallelism =
590 work_queue.owners_sleeping ||
591 (work_queue.workers_sleeping < work_queue.threads_created);
592
593 // Wake up an appropriate number of threads
594 if (nested_parallelism || workers_to_wake > work_queue.workers_sleeping) {
595 // If there's nested parallelism going on, we just wake up
596 // everyone. TODO: make this more precise.
597 work_queue.target_a_team_size = work_queue.threads_created;
598 } else {
599 work_queue.target_a_team_size = workers_to_wake;
600 }
601
602 work_queue.wake_a_team.broadcast();
603 if (work_queue.target_a_team_size > work_queue.a_team_size) {
604 work_queue.wake_b_team.broadcast();
605 if (stealable_jobs) {
606 work_queue.wake_owners.broadcast();
607 }
608 }
609
610 if (job_has_acquires || job_may_block) {
611 if (task_parent != nullptr) {
612 task_parent->threads_reserved--;
613 } else {
614 work_queue.threads_reserved--;
615 }
616 }
617}
618
626
627} // namespace Internal
628} // namespace Runtime
629} // namespace Halide
630
631using namespace Halide::Runtime::Internal;
632
633extern "C" {
634
635namespace {
636WEAK __attribute__((destructor)) void halide_thread_pool_cleanup() {
638}
639} // namespace
640
641WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx,
642 uint8_t *closure) {
643 return f(user_context, idx, closure);
644}
645
647 int min, int extent, uint8_t *closure,
648 void *task_parent) {
649 return f(user_context, min, extent, closure, task_parent);
650}
651
653 int min, int size, uint8_t *closure) {
654 if (size <= 0) {
656 }
657
658 work job;
659 job.task.fn = nullptr;
660 job.task.min = min;
661 job.task.extent = size;
662 job.task.serial = false;
663 job.task.semaphores = nullptr;
664 job.task.num_semaphores = 0;
665 job.task.closure = closure;
666 job.task.min_threads = 0;
667 job.task.name = nullptr;
668 job.task_fn = f;
669 job.user_context = user_context;
671 job.active_workers = 0;
672 job.next_semaphore = 0;
673 job.owner_is_sleeping = false;
674 job.siblings = &job; // guarantees no other job points to the same siblings.
675 job.sibling_count = 0;
676 job.parent_job = nullptr;
678 enqueue_work_already_locked(1, &job, nullptr);
681 return job.exit_status;
682}
683
684WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks,
685 struct halide_parallel_task_t *tasks,
686 void *task_parent) {
687 work *jobs = (work *)__builtin_alloca(sizeof(work) * num_tasks);
688
689 for (int i = 0; i < num_tasks; i++) {
690 if (tasks->extent <= 0) {
691 // Skip extent zero jobs
692 num_tasks--;
693 continue;
694 }
695 jobs[i].task = *tasks++;
696 jobs[i].task_fn = nullptr;
697 jobs[i].user_context = user_context;
699 jobs[i].active_workers = 0;
700 jobs[i].next_semaphore = 0;
701 jobs[i].owner_is_sleeping = false;
702 jobs[i].parent_job = (work *)task_parent;
703 }
704
705 if (num_tasks == 0) {
707 }
708
710 enqueue_work_already_locked(num_tasks, jobs, (work *)task_parent);
711 int exit_status = halide_error_code_success;
712 for (int i = 0; i < num_tasks; i++) {
713 // It doesn't matter what order we join the tasks in, because
714 // we'll happily assist with siblings too.
716 if (jobs[i].exit_status != halide_error_code_success) {
717 exit_status = jobs[i].exit_status;
718 }
719 }
721 return exit_status;
722}
723
725 if (n < 0) {
726 halide_error(nullptr, "halide_set_num_threads: must be >= 0.");
727 }
728 // Don't make this an atomic swap - we don't want to be changing
729 // the desired number of threads while another thread is in the
730 // middle of a sequence of non-atomic operations.
732 if (n == 0) {
734 }
735 int old = work_queue.desired_threads_working;
736 work_queue.desired_threads_working = clamp_num_threads(n);
738 return old;
739}
740
743 int n = work_queue.desired_threads_working;
745 return n;
746}
747
749 if (work_queue.initialized) {
750 // Wake everyone up and tell them the party's over and it's time
751 // to go home
753
754 work_queue.shutdown = true;
755 work_queue.wake_owners.broadcast();
756 work_queue.wake_a_team.broadcast();
757 work_queue.wake_b_team.broadcast();
759
760 // Wait until they leave
761 for (int i = 0; i < work_queue.threads_created; i++) {
762 halide_join_thread(work_queue.threads[i]);
763 }
764
765 // Tidy up
766 work_queue.reset();
767 }
768}
769
773
776 Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->value, &n);
777 return n;
778}
779
782 int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->value, n);
783 // TODO(abadams|zvookin): Is this correct if an acquire can be for say count of 2 and the releases are 1 each?
784 if (old_val == 0 && n != 0) { // Don't wake if nothing released.
785 // We may have just made a job runnable
787 work_queue.wake_a_team.broadcast();
788 work_queue.wake_owners.broadcast();
790 }
791 return old_val + n;
792}
793
795 if (n == 0) {
796 return true;
797 }
799 // Decrement and get new value
800 int expected;
801 int desired;
802 Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->value, &expected);
803 do {
804 desired = expected - n;
805 } while (desired >= 0 &&
806 !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->value, &expected, &desired));
807 return desired >= 0;
808}
809
815
821
827
829 halide_do_par_for_t do_par_for,
830 halide_do_task_t do_task,
831 halide_do_loop_task_t do_loop_task,
832 halide_do_parallel_tasks_t do_parallel_tasks,
833 halide_semaphore_init_t semaphore_init,
834 halide_semaphore_try_acquire_t semaphore_try_acquire,
835 halide_semaphore_release_t semaphore_release) {
836
837 custom_do_par_for = do_par_for;
838 custom_do_task = do_task;
839 custom_do_loop_task = do_loop_task;
840 custom_do_parallel_tasks = do_parallel_tasks;
841 custom_semaphore_init = semaphore_init;
842 custom_semaphore_try_acquire = semaphore_try_acquire;
843 custom_semaphore_release = semaphore_release;
844}
845
846WEAK int halide_do_task(void *user_context, halide_task_t f, int idx,
847 uint8_t *closure) {
848 return (*custom_do_task)(user_context, f, idx, closure);
849}
850
851WEAK int halide_do_par_for(void *user_context, halide_task_t f,
852 int min, int size, uint8_t *closure) {
853 return (*custom_do_par_for)(user_context, f, min, size, closure);
854}
855
857 int min, int size, uint8_t *closure, void *task_parent) {
858 return custom_do_loop_task(user_context, f, min, size, closure, task_parent);
859}
860
861WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks,
862 struct halide_parallel_task_t *tasks,
863 void *task_parent) {
864 return custom_do_parallel_tasks(user_context, num_tasks, tasks, task_parent);
865}
866
867WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count) {
868 return custom_semaphore_init(sema, count);
869}
870
872 return custom_semaphore_release(sema, count);
873}
874
876 return custom_semaphore_try_acquire(sema, count);
877}
878}
int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
bool halide_default_semaphore_try_acquire(struct halide_semaphore_t *, int n)
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
int halide_default_do_par_for(void *user_context, halide_task_t task, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int halide_default_semaphore_init(struct halide_semaphore_t *, int n)
void halide_mutex_unlock(struct halide_mutex *mutex)
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
int halide_default_semaphore_release(struct halide_semaphore_t *, int n)
void halide_join_thread(struct halide_thread *)
Join a thread.
@ halide_error_code_success
There was no error.
void halide_cond_broadcast(struct halide_cond *cond)
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
WEAK halide_semaphore_release_t custom_semaphore_release
WEAK halide_semaphore_init_t custom_semaphore_init
WEAK halide_do_task_t custom_do_task
WEAK halide_do_par_for_t custom_do_par_for
ALWAYS_INLINE int clamp_num_threads(int threads)
WEAK void worker_thread_stall(work *owned_job)
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
WEAK void worker_thread_already_locked(work *owned_job)
WEAK halide_do_loop_task_t custom_do_loop_task
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
Expr min(const FuncRef &a, const FuncRef &b)
Explicit overloads of min and max for FuncRef.
Definition Func.h:603
WEAK int halide_host_cpu_count()
__UINTPTR_TYPE__ uintptr_t
int atoi(const char *)
unsigned __INT8_TYPE__ uint8_t
void halide_thread_yield()
#define ALWAYS_INLINE
void * memset(void *s, int val, size_t n)
#define halide_abort_if_false(user_context, cond)
#define WEAK
char * getenv(const char *)
ALWAYS_INLINE bool running() const
Cross platform condition variable.
Cross-platform mutex.
A parallel task to be passed to halide_do_parallel_tasks.
struct halide_semaphore_acquire_t * semaphores
halide_loop_task_t fn
struct halide_semaphore_t * semaphore
An opaque struct representing a semaphore.
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
#define dump_job_state()
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
WEAK int halide_get_num_threads()
Get or set the number of threads used by Halide's thread pool.
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
#define log_message(stuff)
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
#define print_job(job, indent, prefix)
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
WEAK void halide_shutdown_thread_pool()
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_set_num_threads(int n)
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)