Blender  V2.93
task_iterator.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or
3  * modify it under the terms of the GNU General Public License
4  * as published by the Free Software Foundation; either version 2
5  * of the License, or (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software Foundation,
14  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
15  */
16 
23 #include <stdlib.h>
24 
25 #include "MEM_guardedalloc.h"
26 
27 #include "DNA_listBase.h"
28 
29 #include "BLI_listbase.h"
30 #include "BLI_math.h"
31 #include "BLI_mempool.h"
32 #include "BLI_task.h"
33 #include "BLI_threads.h"
34 
35 #include "atomic_ops.h"
36 
37 /* Allows to avoid using malloc for userdata_chunk in tasks, when small enough. */
38 #define MALLOCA(_size) ((_size) <= 8192) ? alloca((_size)) : MEM_mallocN((_size), __func__)
39 #define MALLOCA_FREE(_mem, _size) \
40  if (((_mem) != NULL) && ((_size) > 8192)) { \
41  MEM_freeN((_mem)); \
42  } \
43  ((void)0)
44 
46  const int tot_items,
47  int num_tasks,
48  int *r_chunk_size)
49 {
50  int chunk_size = 0;
51 
52  if (!settings->use_threading) {
53  /* Some users of this helper will still need a valid chunk size in case processing is not
54  * threaded. We can use a bigger one than in default threaded case then. */
55  chunk_size = 1024;
56  num_tasks = 1;
57  }
58  else if (settings->min_iter_per_thread > 0) {
59  /* Already set by user, no need to do anything here. */
60  chunk_size = settings->min_iter_per_thread;
61  }
62  else {
63  /* Multiplier used in heuristics below to define "optimal" chunk size.
64  * The idea here is to increase the chunk size to compensate for a rather measurable threading
65  * overhead caused by fetching tasks. With too many CPU threads we are starting
66  * to spend too much time in those overheads.
67  * First values are: 1 if num_tasks < 16;
68  * else 2 if num_tasks < 32;
69  * else 3 if num_tasks < 48;
70  * else 4 if num_tasks < 64;
71  * etc.
72  * Note: If we wanted to keep the 'power of two' multiplier, we'd need something like:
73  * 1 << max_ii(0, (int)(sizeof(int) * 8) - 1 - bitscan_reverse_i(num_tasks) - 3)
74  */
75  const int num_tasks_factor = max_ii(1, num_tasks >> 3);
76 
77  /* We could make that 'base' 32 number configurable in TaskParallelSettings too, or maybe just
78  * always use that heuristic using TaskParallelSettings.min_iter_per_thread as basis? */
79  chunk_size = 32 * num_tasks_factor;
80 
81  /* Basic heuristic to avoid threading on low amount of items.
82  * We could make that limit configurable in settings too. */
83  if (tot_items > 0 && tot_items < max_ii(256, chunk_size * 2)) {
84  chunk_size = tot_items;
85  }
86  }
87 
88  BLI_assert(chunk_size > 0);
89  *r_chunk_size = chunk_size;
90 }
91 
92 typedef struct TaskParallelIteratorState {
93  void *userdata;
96 
97  /* *** Data used to 'acquire' chunks of items from the iterator. *** */
98  /* Common data also passed to the generator callback. */
100  /* Total number of items. If unknown, set it to a negative number. */
103 
105  void *userdata_chunk)
106 {
107  TaskParallelTLS tls = {
108  .userdata_chunk = userdata_chunk,
109  };
110 
111  void **current_chunk_items;
112  int *current_chunk_indices;
113  int current_chunk_size;
114 
115  const size_t items_size = sizeof(*current_chunk_items) * (size_t)state->iter_shared.chunk_size;
116  const size_t indices_size = sizeof(*current_chunk_indices) *
117  (size_t)state->iter_shared.chunk_size;
118 
119  current_chunk_items = MALLOCA(items_size);
120  current_chunk_indices = MALLOCA(indices_size);
121  current_chunk_size = 0;
122 
123  for (bool do_abort = false; !do_abort;) {
124  if (state->iter_shared.spin_lock != NULL) {
125  BLI_spin_lock(state->iter_shared.spin_lock);
126  }
127 
128  /* Get current status. */
129  int index = state->iter_shared.next_index;
130  void *item = state->iter_shared.next_item;
131  int i;
132 
133  /* 'Acquire' a chunk of items from the iterator function. */
134  for (i = 0; i < state->iter_shared.chunk_size && !state->iter_shared.is_finished; i++) {
135  current_chunk_indices[i] = index;
136  current_chunk_items[i] = item;
137  state->iter_func(state->userdata, &tls, &item, &index, &state->iter_shared.is_finished);
138  }
139 
140  /* Update current status. */
141  state->iter_shared.next_index = index;
142  state->iter_shared.next_item = item;
143  current_chunk_size = i;
144 
145  do_abort = state->iter_shared.is_finished;
146 
147  if (state->iter_shared.spin_lock != NULL) {
148  BLI_spin_unlock(state->iter_shared.spin_lock);
149  }
150 
151  for (i = 0; i < current_chunk_size; ++i) {
152  state->func(state->userdata, current_chunk_items[i], current_chunk_indices[i], &tls);
153  }
154  }
155 
156  MALLOCA_FREE(current_chunk_items, items_size);
157  MALLOCA_FREE(current_chunk_indices, indices_size);
158 }
159 
160 static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk)
161 {
163 
164  parallel_iterator_func_do(state, userdata_chunk);
165 }
166 
169 {
170  /* Prepare user's TLS data. */
171  void *userdata_chunk = settings->userdata_chunk;
172  const size_t userdata_chunk_size = settings->userdata_chunk_size;
173  void *userdata_chunk_local = NULL;
174  const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
175  if (use_userdata_chunk) {
176  userdata_chunk_local = MALLOCA(userdata_chunk_size);
177  memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
178  }
179 
180  /* Also marking it as non-threaded for the iterator callback. */
181  state->iter_shared.spin_lock = NULL;
182 
183  parallel_iterator_func_do(state, userdata_chunk);
184 
185  if (use_userdata_chunk && settings->func_free != NULL) {
186  /* `func_free` should only free data that was created during execution of `func`. */
187  settings->func_free(state->userdata, userdata_chunk_local);
188  }
189 }
190 
193 {
194  const int num_threads = BLI_task_scheduler_num_threads();
195 
197  settings, state->tot_items, num_threads, &state->iter_shared.chunk_size);
198 
199  if (!settings->use_threading) {
201  return;
202  }
203 
204  const int chunk_size = state->iter_shared.chunk_size;
205  const int tot_items = state->tot_items;
206  const size_t num_tasks = tot_items >= 0 ?
207  (size_t)min_ii(num_threads, state->tot_items / chunk_size) :
208  (size_t)num_threads;
209 
210  BLI_assert(num_tasks > 0);
211  if (num_tasks == 1) {
213  return;
214  }
215 
216  SpinLock spin_lock;
217  BLI_spin_init(&spin_lock);
218  state->iter_shared.spin_lock = &spin_lock;
219 
220  void *userdata_chunk = settings->userdata_chunk;
221  const size_t userdata_chunk_size = settings->userdata_chunk_size;
222  void *userdata_chunk_local = NULL;
223  void *userdata_chunk_array = NULL;
224  const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
225 
227 
228  if (use_userdata_chunk) {
229  userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
230  }
231 
232  for (size_t i = 0; i < num_tasks; i++) {
233  if (use_userdata_chunk) {
234  userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
235  memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
236  }
237  /* Use this pool's pre-allocated tasks. */
238  BLI_task_pool_push(task_pool, parallel_iterator_func, userdata_chunk_local, false, NULL);
239  }
240 
243 
244  if (use_userdata_chunk && (settings->func_reduce != NULL || settings->func_free != NULL)) {
245  for (size_t i = 0; i < num_tasks; i++) {
246  userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
247  if (settings->func_reduce != NULL) {
248  settings->func_reduce(state->userdata, userdata_chunk, userdata_chunk_local);
249  }
250  if (settings->func_free != NULL) {
251  settings->func_free(state->userdata, userdata_chunk_local);
252  }
253  }
254  MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks);
255  }
256 
257  BLI_spin_end(&spin_lock);
258  state->iter_shared.spin_lock = NULL;
259 }
260 
276 void BLI_task_parallel_iterator(void *userdata,
278  void *init_item,
279  const int init_index,
280  const int tot_items,
282  const TaskParallelSettings *settings)
283 {
285 
286  state.tot_items = tot_items;
287  state.iter_shared.next_index = init_index;
288  state.iter_shared.next_item = init_item;
289  state.iter_shared.is_finished = false;
290  state.userdata = userdata;
291  state.iter_func = iter_func;
292  state.func = func;
293 
294  task_parallel_iterator_do(settings, &state);
295 }
296 
297 static void task_parallel_listbase_get(void *__restrict UNUSED(userdata),
298  const TaskParallelTLS *__restrict UNUSED(tls),
299  void **r_next_item,
300  int *r_next_index,
301  bool *r_do_abort)
302 {
303  /* Get current status. */
304  Link *link = *r_next_item;
305 
306  if (link->next == NULL) {
307  *r_do_abort = true;
308  }
309  *r_next_item = link->next;
310  (*r_next_index)++;
311 }
312 
325  void *userdata,
327  const TaskParallelSettings *settings)
328 {
329  if (BLI_listbase_is_empty(listbase)) {
330  return;
331  }
332 
334 
335  state.tot_items = BLI_listbase_count(listbase);
336  state.iter_shared.next_index = 0;
337  state.iter_shared.next_item = listbase->first;
338  state.iter_shared.is_finished = false;
339  state.userdata = userdata;
340  state.iter_func = task_parallel_listbase_get;
341  state.func = func;
342 
343  task_parallel_iterator_do(settings, &state);
344 }
345 
346 #undef MALLOCA
347 #undef MALLOCA_FREE
348 
349 typedef struct ParallelMempoolState {
350  void *userdata;
353 
354 static void parallel_mempool_func(TaskPool *__restrict pool, void *taskdata)
355 {
357  BLI_mempool_iter *iter = taskdata;
358  MempoolIterData *item;
359 
360  while ((item = BLI_mempool_iterstep(iter)) != NULL) {
361  state->func(state->userdata, item);
362  }
363 }
364 
378  void *userdata,
380  const bool use_threading)
381 {
384  int i, num_threads, num_tasks;
385 
386  if (BLI_mempool_len(mempool) == 0) {
387  return;
388  }
389 
390  if (!use_threading) {
391  BLI_mempool_iter iter;
392  BLI_mempool_iternew(mempool, &iter);
393 
394  for (void *item = BLI_mempool_iterstep(&iter); item != NULL;
395  item = BLI_mempool_iterstep(&iter)) {
396  func(userdata, item);
397  }
398  return;
399  }
400 
402  num_threads = BLI_task_scheduler_num_threads();
403 
404  /* The idea here is to prevent creating task for each of the loop iterations
405  * and instead have tasks which are evenly distributed across CPU cores and
406  * pull next item to be crunched using the threaded-aware BLI_mempool_iter.
407  */
408  num_tasks = num_threads + 2;
409 
410  state.userdata = userdata;
411  state.func = func;
412 
413  BLI_mempool_iter *mempool_iterators = BLI_mempool_iter_threadsafe_create(mempool,
414  (size_t)num_tasks);
415 
416  for (i = 0; i < num_tasks; i++) {
417  /* Use this pool's pre-allocated tasks. */
418  BLI_task_pool_push(task_pool, parallel_mempool_func, &mempool_iterators[i], false, NULL);
419  }
420 
423 
424  BLI_mempool_iter_threadsafe_free(mempool_iterators);
425 }
#define BLI_assert(a)
Definition: BLI_assert.h:58
#define BLI_INLINE
BLI_INLINE bool BLI_listbase_is_empty(const struct ListBase *lb)
Definition: BLI_listbase.h:124
int BLI_listbase_count(const struct ListBase *listbase) ATTR_WARN_UNUSED_RESULT ATTR_NONNULL(1)
MINLINE int min_ii(int a, int b)
MINLINE int max_ii(int a, int b)
void BLI_mempool_iternew(BLI_mempool *pool, BLI_mempool_iter *iter) ATTR_NONNULL()
Definition: BLI_mempool.c:537
BLI_mempool_iter * BLI_mempool_iter_threadsafe_create(BLI_mempool *pool, const size_t num_iter) ATTR_WARN_UNUSED_RESULT ATTR_NONNULL()
Definition: BLI_mempool.c:561
void * BLI_mempool_iterstep(BLI_mempool_iter *iter) ATTR_WARN_UNUSED_RESULT ATTR_NONNULL()
Definition: BLI_mempool.c:645
int BLI_mempool_len(BLI_mempool *pool) ATTR_NONNULL(1)
Definition: BLI_mempool.c:454
void BLI_mempool_iter_threadsafe_free(BLI_mempool_iter *iter_arr) ATTR_NONNULL()
Definition: BLI_mempool.c:583
int BLI_task_scheduler_num_threads(void)
struct MempoolIterData MempoolIterData
Definition: BLI_task.h:223
void * BLI_task_pool_user_data(TaskPool *pool)
Definition: task_pool.cc:541
void BLI_task_pool_work_and_wait(TaskPool *pool)
Definition: task_pool.cc:496
void(* TaskParallelIteratorIterFunc)(void *__restrict userdata, const TaskParallelTLS *__restrict tls, void **r_next_item, int *r_next_index, bool *r_do_abort)
Definition: BLI_task.h:199
void(* TaskParallelMempoolFunc)(void *userdata, MempoolIterData *iter)
Definition: BLI_task.h:224
TaskPool * BLI_task_pool_create(void *userdata, TaskPriority priority)
Definition: task_pool.cc:406
void(* TaskParallelIteratorFunc)(void *__restrict userdata, void *item, int index, const TaskParallelTLS *__restrict tls)
Definition: BLI_task.h:205
void BLI_task_pool_free(TaskPool *pool)
Definition: task_pool.cc:456
@ TASK_PRIORITY_HIGH
Definition: BLI_task.h:67
void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskFreeFunction freedata)
Definition: task_pool.cc:475
pthread_spinlock_t SpinLock
Definition: BLI_threads.h:111
void BLI_spin_init(SpinLock *spin)
Definition: threads.cc:447
void BLI_spin_unlock(SpinLock *spin)
Definition: threads.cc:480
void BLI_spin_lock(SpinLock *spin)
Definition: threads.cc:461
void BLI_spin_end(SpinLock *spin)
Definition: threads.cc:495
#define UNUSED(x)
These structs are the foundation for all linked lists in the library system.
Read Guarded memory(de)allocation.
Provides wrapper around system-specific atomic primitives, and some extensions (faked-atomic operatio...
TaskPool * task_pool
static ulong state[N]
void * first
Definition: DNA_listBase.h:47
TaskParallelMempoolFunc func
TaskParallelIteratorFunc func
Definition: task_iterator.c:95
TaskParallelIteratorStateShared iter_shared
Definition: task_iterator.c:99
TaskParallelIteratorIterFunc iter_func
Definition: task_iterator.c:94
TaskParallelReduceFunc func_reduce
Definition: BLI_task.h:158
TaskParallelFreeFunc func_free
Definition: BLI_task.h:160
size_t userdata_chunk_size
Definition: BLI_task.h:150
void * userdata_chunk
Definition: BLI_task.h:126
static void task_parallel_listbase_get(void *__restrict UNUSED(userdata), const TaskParallelTLS *__restrict UNUSED(tls), void **r_next_item, int *r_next_index, bool *r_do_abort)
BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settings, const int tot_items, int num_tasks, int *r_chunk_size)
Definition: task_iterator.c:45
#define MALLOCA_FREE(_mem, _size)
Definition: task_iterator.c:39
static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk)
static void task_parallel_iterator_do(const TaskParallelSettings *settings, TaskParallelIteratorState *state)
static void parallel_mempool_func(TaskPool *__restrict pool, void *taskdata)
#define MALLOCA(_size)
Definition: task_iterator.c:38
void BLI_task_parallel_listbase(ListBase *listbase, void *userdata, TaskParallelIteratorFunc func, const TaskParallelSettings *settings)
struct ParallelMempoolState ParallelMempoolState
static void task_parallel_iterator_no_threads(const TaskParallelSettings *settings, TaskParallelIteratorState *state)
void BLI_task_parallel_iterator(void *userdata, TaskParallelIteratorIterFunc iter_func, void *init_item, const int init_index, const int tot_items, TaskParallelIteratorFunc func, const TaskParallelSettings *settings)
struct TaskParallelIteratorState TaskParallelIteratorState
void BLI_task_parallel_mempool(BLI_mempool *mempool, void *userdata, TaskParallelMempoolFunc func, const bool use_threading)
static void parallel_iterator_func_do(TaskParallelIteratorState *__restrict state, void *userdata_chunk)