1#define EXTENDED_DEBUG 0
5extern "C" int syscall(
int);
18#define log_message(stuff) do { print(nullptr) << gettid() << ": " << stuff << "\n"; } while (0)
24#define log_message(stuff) do { } while (0)
45 Synchronization::atomic_load_relaxed(&
counter, &initial);
47 for (
int spin = 0; spin < 40; spin++) {
50 Synchronization::atomic_load_relaxed(&
counter, ¤t);
51 if (current != initial) {
64 Synchronization::atomic_load_relaxed(&
counter, ¤t);
65 if (current != initial) {
126 if (threads > MAX_THREADS) {
128 }
else if (threads < 1) {
136 char *threads_str =
getenv(
"HL_NUM_THREADS");
139 threads_str =
getenv(
"HL_NUMTHREADS");
201 const char *bytes = ((
const char *)&this->zero_marker);
202 const char *limit = ((
const char *)
this) +
sizeof(
work_queue_t);
203 while (bytes < limit && *bytes == 0) {
206 halide_abort_if_false(
nullptr, bytes == limit &&
"Logic error in thread pool work queue initialization.\n");
213 char *bytes = ((
char *)&this->zero_marker);
215 memset(bytes, 0, limit - bytes);
223WEAK void print_job(
work *job,
const char *indent,
const char *prefix =
nullptr) {
224 if (prefix ==
nullptr) {
238 while (job !=
nullptr) {
248#define print_job(job, indent, prefix) do { } while (0)
249#define dump_job_state() do { } while (0)
264 while (job != owned_job) {
295 int threads_available;
296 if (parent_job ==
nullptr) {
308 if (!enough_threads) {
312 if (!can_use_this_thread_stack) {
316 if (!can_add_worker) {
320 if (enough_threads && can_use_this_thread_stack && can_add_worker) {
381 while ((job->
task.
extent - total_iters) > iters &&
391 job->
task.
min + total_iters, iters,
393 total_iters += iters;
434 log_message(
"Saw thread pool saw error from task: " << (
int)result);
437 bool wake_owners =
false;
502 int workers_to_wake = -1;
506 bool stealable_jobs =
false;
508 bool job_has_acquires =
false;
509 bool job_may_block =
false;
510 for (
int i = 0; i < num_jobs; i++) {
511 if (jobs[i].task.min_threads == 0) {
512 stealable_jobs =
true;
514 job_may_block =
true;
517 if (jobs[i].task.num_semaphores != 0) {
518 job_has_acquires =
true;
521 if (jobs[i].task.serial) {
528 if (task_parent ==
nullptr) {
538 if (job_has_acquires || job_may_block) {
539 log_message(
"enqueue_work_already_locked adding one to min_threads.");
554 if (job_has_acquires || job_may_block) {
558 log_message(
"enqueue_work_already_locked job " << jobs[0].task.name <<
" with min_threads " << min_threads <<
" task_parent " << task_parent->
task.
name <<
" task_parent->task.min_threads " << task_parent->
task.
min_threads <<
" task_parent->threads_reserved " << task_parent->
threads_reserved);
561 "Logic error: thread over commit.\n");
562 if (job_has_acquires || job_may_block) {
568 for (
int i = num_jobs - 1; i >= 0; i--) {
578 bool nested_parallelism =
594 if (stealable_jobs) {
599 if (job_has_acquires || job_may_block) {
600 if (task_parent !=
nullptr) {
625WEAK __attribute__((destructor))
void halide_thread_pool_cleanup() {
636 int min,
int extent,
uint8_t *closure,
638 return f(
user_context, min, extent, closure, task_parent);
642 int min,
int size,
uint8_t *closure) {
676 work *jobs = (
work *)__builtin_alloca(
sizeof(
work) * num_tasks);
678 for (
int i = 0; i < num_tasks; i++) {
684 jobs[i].
task = *tasks++;
694 if (num_tasks == 0) {
701 for (
int i = 0; i < num_tasks; i++) {
715 halide_error(
nullptr,
"halide_set_num_threads: must be >= 0.");
765 Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->
value, &n);
771 int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->
value, n);
773 if (old_val == 0 && n != 0) {
791 Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->
value, &expected);
793 desired = expected - n;
794 }
while (desired >= 0 &&
795 !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->
value, &expected, &desired));
841 int min,
int size,
uint8_t *closure) {
846 int min,
int size,
uint8_t *closure,
void *task_parent) {
int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
bool halide_default_semaphore_try_acquire(struct halide_semaphore_t *, int n)
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
int halide_default_do_par_for(void *user_context, halide_task_t task, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int halide_default_semaphore_init(struct halide_semaphore_t *, int n)
void halide_mutex_unlock(struct halide_mutex *mutex)
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
int halide_default_semaphore_release(struct halide_semaphore_t *, int n)
void halide_join_thread(struct halide_thread *)
Join a thread.
@ halide_error_code_success
There was no error.
void halide_cond_broadcast(struct halide_cond *cond)
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
WEAK halide_semaphore_release_t custom_semaphore_release
WEAK halide_semaphore_init_t custom_semaphore_init
WEAK int default_desired_num_threads()
WEAK halide_do_task_t custom_do_task
WEAK halide_do_par_for_t custom_do_par_for
ALWAYS_INLINE int clamp_num_threads(int threads)
WEAK void worker_thread(void *)
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
WEAK void worker_thread_already_locked(work *owned_job)
WEAK halide_do_loop_task_t custom_do_loop_task
WEAK work_queue_t work_queue
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
WEAK int halide_host_cpu_count()
__UINTPTR_TYPE__ uintptr_t
unsigned __INT8_TYPE__ uint8_t
void halide_thread_yield()
void * memset(void *s, int val, size_t n)
#define halide_abort_if_false(user_context, cond)
char * getenv(const char *)
void wait(halide_mutex *mutex)
int desired_threads_working
ALWAYS_INLINE bool running() const
halide_cond_with_spinning wake_b_team
halide_cond_with_spinning wake_owners
halide_thread * threads[MAX_THREADS]
ALWAYS_INLINE void reset()
halide_cond_with_spinning wake_a_team
ALWAYS_INLINE void assert_zeroed() const
ALWAYS_INLINE bool running() const
ALWAYS_INLINE bool make_runnable()
halide_parallel_task_t task
Cross platform condition variable.
A parallel task to be passed to halide_do_parallel_tasks.
struct halide_semaphore_acquire_t * semaphores
struct halide_semaphore_t * semaphore
An opaque struct representing a semaphore.
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
WEAK int halide_get_num_threads()
Get or set the number of threads used by Halide's thread pool.
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
#define log_message(stuff)
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
#define print_job(job, indent, prefix)
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
WEAK void halide_shutdown_thread_pool()
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_set_num_threads(int n)
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)