1#define EXTENDED_DEBUG 0
5extern "C" int syscall(
int);
18#define log_message(stuff) do { print(nullptr) << gettid() << ": " << stuff << "\n"; } while (0)
24#define log_message(stuff) do { } while (0)
75 if (threads > MAX_THREADS) {
77 }
else if (threads < 1) {
85 char *threads_str =
getenv(
"HL_NUM_THREADS");
88 threads_str =
getenv(
"HL_NUMTHREADS");
150 const char *bytes = ((
const char *)&this->zero_marker);
151 const char *limit = ((
const char *)
this) +
sizeof(
work_queue_t);
152 while (bytes < limit && *bytes == 0) {
155 halide_abort_if_false(
nullptr, bytes == limit &&
"Logic error in thread pool work queue initialization.\n");
162 char *bytes = ((
char *)&this->zero_marker);
164 memset(bytes, 0, limit - bytes);
172WEAK void print_job(
work *job,
const char *indent,
const char *prefix =
nullptr) {
173 if (prefix ==
nullptr) {
187 while (job !=
nullptr) {
197#define print_job(job, indent, prefix) do { } while (0)
198#define dump_job_state() do { } while (0)
207 const int max_spin_count = 40;
216 while (job != owned_job) {
247 int threads_available;
248 if (parent_job ==
nullptr) {
260 if (!enough_threads) {
264 if (!can_use_this_thread_stack) {
268 if (!can_add_worker) {
272 if (enough_threads && can_use_this_thread_stack && can_add_worker) {
286 if (spin_count++ < max_spin_count) {
305 }
else if (spin_count++ < max_spin_count) {
347 while ((job->
task.
extent - total_iters) > iters &&
357 job->
task.
min + total_iters, iters,
359 total_iters += iters;
400 log_message(
"Saw thread pool saw error from task: " << (
int)result);
403 bool wake_owners =
false;
468 int workers_to_wake = -1;
472 bool stealable_jobs =
false;
474 bool job_has_acquires =
false;
475 bool job_may_block =
false;
476 for (
int i = 0; i < num_jobs; i++) {
477 if (jobs[i].task.min_threads == 0) {
478 stealable_jobs =
true;
480 job_may_block =
true;
483 if (jobs[i].task.num_semaphores != 0) {
484 job_has_acquires =
true;
487 if (jobs[i].task.serial) {
494 if (task_parent ==
nullptr) {
504 if (job_has_acquires || job_may_block) {
505 log_message(
"enqueue_work_already_locked adding one to min_threads.");
520 if (job_has_acquires || job_may_block) {
524 log_message(
"enqueue_work_already_locked job " << jobs[0].task.name <<
" with min_threads " << min_threads <<
" task_parent " << task_parent->
task.
name <<
" task_parent->task.min_threads " << task_parent->
task.
min_threads <<
" task_parent->threads_reserved " << task_parent->
threads_reserved);
527 "Logic error: thread over commit.\n");
528 if (job_has_acquires || job_may_block) {
534 for (
int i = num_jobs - 1; i >= 0; i--) {
544 bool nested_parallelism =
560 if (stealable_jobs) {
565 if (job_has_acquires || job_may_block) {
566 if (task_parent !=
nullptr) {
591WEAK __attribute__((destructor))
void halide_thread_pool_cleanup() {
598 return f(user_context, idx, closure);
602 int min,
int extent,
uint8_t *closure,
604 return f(user_context, min, extent, closure, task_parent);
608 int min,
int size,
uint8_t *closure) {
642 work *jobs = (
work *)__builtin_alloca(
sizeof(
work) * num_tasks);
644 for (
int i = 0; i < num_tasks; i++) {
650 jobs[i].
task = *tasks++;
660 if (num_tasks == 0) {
667 for (
int i = 0; i < num_tasks; i++) {
681 halide_error(
nullptr,
"halide_set_num_threads: must be >= 0.");
724 Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->
value, &n);
730 int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->
value, n);
732 if (old_val == 0 && n != 0) {
750 Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->
value, &expected);
752 desired = expected - n;
753 }
while (desired >= 0 &&
754 !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->
value, &expected, &desired));
800 int min,
int size,
uint8_t *closure) {
805 int min,
int size,
uint8_t *closure,
void *task_parent) {
int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
bool halide_default_semaphore_try_acquire(struct halide_semaphore_t *, int n)
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
int halide_default_do_par_for(void *user_context, halide_task_t task, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int halide_default_semaphore_init(struct halide_semaphore_t *, int n)
void halide_mutex_unlock(struct halide_mutex *mutex)
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
int halide_default_semaphore_release(struct halide_semaphore_t *, int n)
void halide_join_thread(struct halide_thread *)
Join a thread.
@ halide_error_code_success
There was no error.
void halide_cond_broadcast(struct halide_cond *cond)
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
WEAK halide_semaphore_release_t custom_semaphore_release
WEAK halide_semaphore_init_t custom_semaphore_init
WEAK int default_desired_num_threads()
WEAK halide_do_task_t custom_do_task
WEAK halide_do_par_for_t custom_do_par_for
ALWAYS_INLINE int clamp_num_threads(int threads)
WEAK void worker_thread(void *)
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
WEAK void worker_thread_already_locked(work *owned_job)
WEAK halide_do_loop_task_t custom_do_loop_task
WEAK work_queue_t work_queue
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
WEAK int halide_host_cpu_count()
unsigned __INT8_TYPE__ uint8_t
void halide_thread_yield()
void * memset(void *s, int val, size_t n)
#define halide_abort_if_false(user_context, cond)
char * getenv(const char *)
int desired_threads_working
ALWAYS_INLINE bool running() const
halide_thread * threads[MAX_THREADS]
ALWAYS_INLINE void reset()
ALWAYS_INLINE void assert_zeroed() const
ALWAYS_INLINE bool running() const
ALWAYS_INLINE bool make_runnable()
halide_parallel_task_t task
Cross platform condition variable.
A parallel task to be passed to halide_do_parallel_tasks.
struct halide_semaphore_acquire_t * semaphores
struct halide_semaphore_t * semaphore
An opaque struct representing a semaphore.
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
#define log_message(stuff)
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
#define print_job(job, indent, prefix)
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
WEAK void halide_shutdown_thread_pool()
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_set_num_threads(int n)
Set the number of threads used by Halide's thread pool.
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)