Halide  12.0.1
Halide compiler and libraries
synchronization_common.h
Go to the documentation of this file.
1 #include "HalideRuntime.h"
2 #include "printer.h"
3 #include "scoped_spin_lock.h"
4 
5 /* This provides an implementation of pthreads-like mutex and
6  * condition variables with fast default case performance. The code
7  * is based on the "parking lot" design and specifically Amanieu
8  * d'Antras' Rust implementation:
9  * https://github.com/Amanieu/parking_lot
10  * and the one in WTF:
11  * https://webkit.org/blog/6161/locking-in-webkit/
12  *
13  * Neither of the above implementations were used directly largely for
14  * dependency management. This implementation lacks a few features
15  * relative to those two. Specifically timeouts are not
16  * supported. Nor is optional fairness or deadlock detection.
17  * This implementation should provide a faily standalone "one file"
18  * fast synchronization layer on top of readily available system primitives.
19  *
20  * TODO: Implement pthread_once equivalent.
21  * TODO: Add read/write lock and move SharedExclusiveSpinLock from tracing.cpp
22  * to this mechanism.
23  * TODO: Add timeouts and optional fairness if needed.
24  * TODO: Relying on condition variables has issues for old versions of Windows
25  * and likely has portability issues to some very bare bones embedded OSes.
26  * Doing an implementation using only semaphores or event counters should
27  * be doable.
28  */
29 
30 // Copied from tsan_interface.h
31 #ifndef TSAN_ANNOTATIONS
32 #define TSAN_ANNOTATIONS 0
33 #endif
34 
35 #if TSAN_ANNOTATIONS
36 extern "C" {
37 const unsigned __tsan_mutex_linker_init = 1 << 0;
38 void __tsan_mutex_pre_lock(void *addr, unsigned flags);
39 void __tsan_mutex_post_lock(void *addr, unsigned flags, int recursion);
40 int __tsan_mutex_pre_unlock(void *addr, unsigned flags);
41 void __tsan_mutex_post_unlock(void *addr, unsigned flags);
42 void __tsan_mutex_pre_signal(void *addr, unsigned flags);
43 void __tsan_mutex_post_signal(void *addr, unsigned flags);
44 }
45 #endif
46 
47 namespace Halide {
48 namespace Runtime {
49 namespace Internal {
50 
51 namespace Synchronization {
52 
53 namespace {
54 
55 #if TSAN_ANNOTATIONS
56 ALWAYS_INLINE void if_tsan_pre_lock(void *mutex) {
57  __tsan_mutex_pre_lock(mutex, __tsan_mutex_linker_init);
58 };
59 // TODO(zalman|dvyukov): Is 1 the right value for a non-recursive lock? pretty sure value is ignored.
60 ALWAYS_INLINE void if_tsan_post_lock(void *mutex) {
61  __tsan_mutex_post_lock(mutex, __tsan_mutex_linker_init, 1);
62 }
63 // TODO(zalman|dvyukov): Is it safe to ignore return value here if locks are not recursive?
64 ALWAYS_INLINE void if_tsan_pre_unlock(void *mutex) {
65  (void)__tsan_mutex_pre_unlock(mutex, __tsan_mutex_linker_init);
66 }
67 ALWAYS_INLINE void if_tsan_post_unlock(void *mutex) {
68  __tsan_mutex_post_unlock(mutex, __tsan_mutex_linker_init);
69 }
70 ALWAYS_INLINE void if_tsan_pre_signal(void *cond) {
71  __tsan_mutex_pre_signal(cond, 0);
72 }
73 ALWAYS_INLINE void if_tsan_post_signal(void *cond) {
74  __tsan_mutex_post_signal(cond, 0);
75 }
76 #else
77 ALWAYS_INLINE void if_tsan_pre_lock(void *) {
78 }
79 ALWAYS_INLINE void if_tsan_post_lock(void *) {
80 }
81 ALWAYS_INLINE void if_tsan_pre_unlock(void *) {
82 }
83 ALWAYS_INLINE void if_tsan_post_unlock(void *) {
84 }
85 ALWAYS_INLINE void if_tsan_pre_signal(void *) {
86 }
87 ALWAYS_INLINE void if_tsan_post_signal(void *) {
88 }
89 #endif
90 
91 #ifdef BITS_32
92 ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
93  return __sync_and_and_fetch(addr, val);
94 }
95 
96 template<typename T>
97 ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
98  return __sync_fetch_and_add(addr, val);
99 }
100 
101 template<typename T>
102 ALWAYS_INLINE bool cas_strong_sequentially_consistent_helper(T *addr, T *expected, T *desired) {
103  T oldval = *expected;
104  T gotval = __sync_val_compare_and_swap(addr, oldval, *desired);
105  *expected = gotval;
106  return oldval == gotval;
107 }
108 
109 ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
110  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
111 }
112 
113 ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
114  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
115 }
116 
117 template<typename T>
118 ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
119  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
120 }
121 
122 ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
123  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
124 }
125 
126 ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
127  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
128 }
129 
130 ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
131  return __sync_fetch_and_and(addr, val);
132 }
133 
134 template<typename T>
135 ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
136  *val = *addr;
137 }
138 
139 template<typename T>
140 ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
141  __sync_synchronize();
142  *val = *addr;
143 }
144 
145 ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
146  return __sync_or_and_fetch(addr, val);
147 }
148 
149 ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
150  *addr = *val;
151 }
152 
153 template<typename T>
154 ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
155  *addr = *val;
156  __sync_synchronize();
157 }
158 
159 ALWAYS_INLINE void atomic_thread_fence_acquire() {
160  __sync_synchronize();
161 }
162 
163 #else
164 
165 ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
166  return __atomic_and_fetch(addr, val, __ATOMIC_RELEASE);
167 }
168 
169 template<typename T>
170 ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
171  return __atomic_fetch_add(addr, val, __ATOMIC_ACQ_REL);
172 }
173 
174 ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
175  return __atomic_compare_exchange(addr, expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
176 }
177 
178 template<typename T>
179 ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
180  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
181 }
182 
183 ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
184  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
185 }
186 
187 ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
188  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
189 }
190 
191 ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
192  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
193 }
194 
195 ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
196  return __atomic_fetch_and(addr, val, __ATOMIC_RELEASE);
197 }
198 
199 template<typename T>
200 ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
201  __atomic_load(addr, val, __ATOMIC_RELAXED);
202 }
203 
204 template<typename T>
205 ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
206  __atomic_load(addr, val, __ATOMIC_ACQUIRE);
207 }
208 
209 ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
210  return __atomic_or_fetch(addr, val, __ATOMIC_RELAXED);
211 }
212 
213 ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
214  __atomic_store(addr, val, __ATOMIC_RELAXED);
215 }
216 
217 template<typename T>
218 ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
219  __atomic_store(addr, val, __ATOMIC_RELEASE);
220 }
221 
222 ALWAYS_INLINE void atomic_thread_fence_acquire() {
223  __atomic_thread_fence(__ATOMIC_ACQUIRE);
224 }
225 
226 #endif
227 
228 } // namespace
229 
231  int spin_count = 40;
232 
233 public:
234  // Everyone says this should be 40. Have not measured it.
236 
238  if (spin_count > 0) {
239  spin_count--;
240  }
241  return spin_count > 0;
242  }
243 
245  spin_count = 40;
246  }
247 };
248 
249 // Low order two bits are used for locking state,
250 static constexpr uint8_t lock_bit = 0x01;
251 static constexpr uint8_t queue_lock_bit = 0x02;
252 static constexpr uint8_t parked_bit = 0x02;
253 
255  thread_parker parker; // TODO: member or pointer?
256 
257  // This design is from the Rust parking lot implementation by Amanieu d'Antras.
258  // Comment from original:
259  //
260  // Linked list of threads in the queue. The queue is split into two parts:
261  // the processed part and the unprocessed part. When new nodes are added to
262  // the list, they only have the next pointer set, and queue_tail is null.
263  //
264  // Nodes are processed with the queue lock held, which consists of setting
265  // the prev pointer for each node and setting the queue_tail pointer on the
266  // first processed node of the list.
267  //
268  // This setup allows nodes to be added to the queue without a lock, while
269  // still allowing O(1) removal of nodes from the processed part of the list.
270  // The only cost is the O(n) processing, but this only needs to be done
271  // once for each node, and therefore isn't too expensive.
272 
276 
278 
279  // Inlined, empty dtor needed to avoid confusing MachO builds
281 };
282 
283 class word_lock {
284  uintptr_t state = 0;
285 
286  void lock_full();
287  void unlock_full();
288 
289 public:
292  if_tsan_pre_lock(this);
293 
294  uintptr_t expected = 0;
295  uintptr_t desired = lock_bit;
296  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
297  if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
298  lock_full();
299  }
300 
301  if_tsan_post_lock(this);
302  }
303 
305  if_tsan_pre_unlock(this);
306 
307  uintptr_t val = atomic_fetch_and_release(&state, ~(uintptr_t)lock_bit);
308  // If another thread is currently queueing, that thread will ensure
309  // it acquires the lock or wakes a waiting thread.
310  bool no_thread_queuing = (val & queue_lock_bit) == 0;
311  // Only need to do a wakeup if there are threads waiting.
312  bool some_queued = (val & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0;
313  if (no_thread_queuing && some_queued) {
314  unlock_full();
315  }
316 
317  if_tsan_post_unlock(this);
318  }
319 };
320 
321 WEAK void word_lock::lock_full() {
322  spin_control spinner;
323  uintptr_t expected;
324  atomic_load_relaxed(&state, &expected);
325 
326  while (true) {
327  if (!(expected & lock_bit)) {
328  uintptr_t desired = expected | lock_bit;
329 
330  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
331  return;
332  }
333  continue;
334  }
335 
336  if (((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0) && spinner.should_spin()) {
338  atomic_load_relaxed(&state, &expected);
339  continue;
340  }
341 
342  word_lock_queue_data node;
343 
344  node.parker.prepare_park();
345  // TODO set up prelinkage parking state
346 
347  word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
348  if (head == nullptr) {
349  node.tail = &node;
350  // constructor set node.prev = nullptr;
351  } else {
352  // Mark the tail as nullptr. The unlock routine will walk the list and wakeup
353  // the thread at the end.
354  // constructor set node.tail = nullptr;
355  // constructor set node.prev = nullptr;
356  node.next = head;
357  }
358 
359  uintptr_t desired = ((uintptr_t)&node) | (expected & (queue_lock_bit | lock_bit));
360  if (atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
361  node.parker.park();
362  spinner.reset();
363  atomic_load_relaxed(&state, &expected);
364  }
365  }
366 }
367 
368 WEAK void word_lock::unlock_full() {
369  uintptr_t expected;
370  atomic_load_relaxed(&state, &expected);
371 
372  while (true) {
373  // If another thread is currently queueing, that thread will ensure
374  // it acquires the lock or wakes a waiting thread.
375  bool thread_queuing = (expected & queue_lock_bit);
376  // Only need to do a wakeup if there are threads waiting.
377  bool none_queued = (expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0;
378  if (thread_queuing || none_queued) {
379  return;
380  }
381 
382  uintptr_t desired = expected | queue_lock_bit;
383  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
384  break;
385  }
386  }
387 
388  while (true) {
389  word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
390  word_lock_queue_data *current = head;
391  word_lock_queue_data *tail = current->tail;
392  int times_through = 0;
393  while (tail == nullptr) {
394  word_lock_queue_data *next = current->next;
395  halide_assert(nullptr, next != nullptr);
396  next->prev = current;
397  current = next;
398  tail = current->tail;
399  times_through++;
400  }
401  head->tail = tail;
402 
403  // If the lock is now locked, unlock the queue and have the thread
404  // that currently holds the lock do the wakeup
405  if (expected & lock_bit) {
406  uintptr_t desired = expected & ~(uintptr_t)queue_lock_bit;
407  if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
408  return;
409  }
410  atomic_thread_fence_acquire();
411  continue;
412  }
413 
414  word_lock_queue_data *new_tail = tail->prev;
415  if (new_tail == nullptr) {
416  bool continue_outer = false;
417  while (!continue_outer) {
418  uintptr_t desired = expected & lock_bit;
419  if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
420  break;
421  }
422  if ((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0) {
423  continue;
424  } else {
425  atomic_thread_fence_acquire();
426  continue_outer = true;
427  }
428  }
429 
430  if (continue_outer) {
431  continue;
432  }
433  } else {
434  head->tail = new_tail;
435  atomic_and_fetch_release(&state, ~(uintptr_t)queue_lock_bit);
436  }
437 
438  // TODO: The reason there are three calls here is other things can happen between them.
439  // Also it is not clear if unpark_start has to return the mutex/flag used by unpark
440  // and unpark_finish due to memory lifetime reasons.
441  tail->parker.unpark_start();
442  tail->parker.unpark();
443  tail->parker.unpark_finish();
444  break;
445  }
446 }
447 
448 struct queue_data {
449  thread_parker parker; // TODO: member or pointer?
450 
451  uintptr_t sleep_address = 0;
452  queue_data *next = nullptr;
453  uintptr_t unpark_info = 0;
454 
456  // Inlined, empty dtor needed to avoid confusing MachO builds
458 };
459 
460 // Must be a power of two.
461 #define LOAD_FACTOR 4
462 
463 struct hash_bucket {
465 
466  queue_data *head; // Is this queue_data or thread_data?
467  queue_data *tail; // Is this queue_data or thread_data?
468 };
469 
470 // The use of a #define here and table_storage is because if
471 // a class with a constructor is used, clang generates a COMDAT
472 // which cannot be lowered for Mac OS due to MachO. A better
473 // solution is desired of course.
474 #define HASH_TABLE_BITS 10
475 struct hash_table {
477 };
479 #define table (*(hash_table *)table_storage)
480 
481 ALWAYS_INLINE void check_hash(uintptr_t hash) {
482  halide_assert(nullptr, hash < sizeof(table.buckets) / sizeof(table.buckets[0]));
483 }
484 
485 #if 0
486 WEAK void dump_hash() {
487  int i = 0;
488  for (auto &bucket : table.buckets) {
489  queue_data *head = bucket.head;
490  while (head != nullptr) {
491  print(nullptr) << "Bucket index " << i << " addr " << (void *)head->sleep_address << "\n";
492  head = head->next;
493  }
494  i++;
495  }
496 }
497 #endif
498 
499 static ALWAYS_INLINE uintptr_t addr_hash(uintptr_t addr, uint32_t bits) {
500  // Fibonacci hashing. The golden ratio is 1.9E3779B97F4A7C15F39...
501  // in hexadecimal.
502  if (sizeof(uintptr_t) >= 8) {
503  return (addr * (uintptr_t)0x9E3779B97F4A7C15) >> (64 - bits);
504  } else {
505  return (addr * (uintptr_t)0x9E3779B9) >> (32 - bits);
506  }
507 }
508 
509 WEAK hash_bucket &lock_bucket(uintptr_t addr) {
510  uintptr_t hash = addr_hash(addr, HASH_TABLE_BITS);
511 
512  check_hash(hash);
513 
514  // TODO: if resizing is implemented, loop, etc.
515  hash_bucket &bucket = table.buckets[hash];
516 
517  bucket.mutex.lock();
518 
519  return bucket;
520 }
521 
522 struct bucket_pair {
525 
527  : from(from), to(to) {
528  }
529 };
530 
531 WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to) {
532  // TODO: if resizing is implemented, loop, etc.
533  uintptr_t hash_from = addr_hash(addr_from, HASH_TABLE_BITS);
534  uintptr_t hash_to = addr_hash(addr_to, HASH_TABLE_BITS);
535 
536  check_hash(hash_from);
537  check_hash(hash_to);
538 
539  // Lock the bucket with the smaller hash first in order
540  // to prevent deadlock.
541  if (hash_from == hash_to) {
542  hash_bucket &first = table.buckets[hash_from];
543  first.mutex.lock();
544  return bucket_pair(first, first);
545  } else if (hash_from < hash_to) {
546  hash_bucket &first = table.buckets[hash_from];
547  hash_bucket &second = table.buckets[hash_to];
548  first.mutex.lock();
549  second.mutex.lock();
550  return bucket_pair(first, second);
551  } else {
552  hash_bucket &first = table.buckets[hash_to];
553  hash_bucket &second = table.buckets[hash_from];
554  first.mutex.lock();
555  second.mutex.lock();
556  return bucket_pair(second, first);
557  }
558 }
559 
561  // In the lock routine, the buckets are locked smaller hash index first.
562  // Here we reverse this ordering by comparing the pointers. This works
563  // since the pointers are obtained by indexing an array with the hash
564  // values.
565  if (&buckets.from == &buckets.to) {
566  buckets.from.mutex.unlock();
567  } else if (&buckets.from > &buckets.to) {
568  buckets.from.mutex.unlock();
569  buckets.to.mutex.unlock();
570  } else {
571  buckets.to.mutex.unlock();
572  buckets.from.mutex.unlock();
573  }
574 }
575 
577  bool unpark_one = false;
578  uintptr_t invalid_unpark_info = 0;
579 
581 };
582 
583 WEAK bool parking_control_validate(void *control, validate_action &action) {
584  return true;
585 };
586 WEAK void parking_control_before_sleep(void *control){};
587 WEAK uintptr_t parking_control_unpark(void *control, int unparked, bool more_waiters) {
588  return 0;
589 };
590 WEAK void parking_control_requeue_callback(void *control, const validate_action &action, bool one_to_wake, bool some_requeued){};
591 
593  bool (*validate)(void *control, validate_action &action);
594  void (*before_sleep)(void *control);
595  uintptr_t (*unpark)(void *control, int unparked, bool more_waiters);
596  void (*requeue_callback)(void *control, const validate_action &action, bool one_to_wake, bool some_requeued);
597 
601  }
602 };
603 
604 // TODO: Do we need a park_result thing here?
605 WEAK uintptr_t park(uintptr_t addr, parking_control &control) {
607 
608  hash_bucket &bucket = lock_bucket(addr);
609 
610  validate_action action;
611  if (!control.validate(&control, action)) {
612  bucket.mutex.unlock();
613  return action.invalid_unpark_info;
614  }
615 
616  queue_data.next = nullptr;
617  queue_data.sleep_address = addr;
618  queue_data.parker.prepare_park();
619  if (bucket.head != nullptr) {
620  bucket.tail->next = &queue_data;
621  } else {
622  bucket.head = &queue_data;
623  }
624  bucket.tail = &queue_data;
625  bucket.mutex.unlock();
626 
627  control.before_sleep(&control);
628 
629  queue_data.parker.park();
630 
631  return queue_data.unpark_info;
632 
633  // TODO: handling timeout.
634 }
635 
636 WEAK uintptr_t unpark_one(uintptr_t addr, parking_control &control) {
637  hash_bucket &bucket = lock_bucket(addr);
638 
639  queue_data **data_location = &bucket.head;
640  queue_data *prev = nullptr;
641  queue_data *data = *data_location;
642  while (data != nullptr) {
643  uintptr_t cur_addr;
644  atomic_load_relaxed(&data->sleep_address, &cur_addr);
645  if (cur_addr == addr) {
646  queue_data *next = data->next;
647  *data_location = next;
648 
649  bool more_waiters = false;
650 
651  if (bucket.tail == data) {
652  bucket.tail = prev;
653  } else {
654  queue_data *data2 = next;
655  while (data2 != nullptr && !more_waiters) {
656  uintptr_t cur_addr2;
657  atomic_load_relaxed(&data2->sleep_address, &cur_addr2);
658  more_waiters = (cur_addr2 == addr);
659  data2 = data2->next;
660  }
661  }
662 
663  data->unpark_info = control.unpark(&control, 1, more_waiters);
664 
665  data->parker.unpark_start();
666  bucket.mutex.unlock();
667  data->parker.unpark();
668  data->parker.unpark_finish();
669 
670  // TODO: Figure out ret type.
671  return more_waiters ? 1 : 0;
672  } else {
673  data_location = &data->next;
674  prev = data;
675  data = data->next;
676  }
677  }
678 
679  control.unpark(&control, 0, false);
680 
681  bucket.mutex.unlock();
682 
683  // TODO: decide if this is the right return value.
684  return 0;
685 }
686 
687 WEAK uintptr_t unpark_all(uintptr_t addr, uintptr_t unpark_info) {
688  hash_bucket &bucket = lock_bucket(addr);
689 
690  queue_data **data_location = &bucket.head;
691  queue_data *prev = nullptr;
692  queue_data *data = *data_location;
693  size_t waiters = 0;
694  queue_data *temp_list_storage[16];
695  queue_data **temp_list = &temp_list_storage[0];
696  size_t max_waiters = sizeof(temp_list_storage) / sizeof(temp_list_storage[0]);
697 
698  while (data != nullptr) {
699  uintptr_t cur_addr;
700  atomic_load_relaxed(&data->sleep_address, &cur_addr);
701 
702  queue_data *next = data->next;
703  if (cur_addr == addr) {
704  *data_location = next;
705 
706  if (bucket.tail == data) {
707  bucket.tail = prev;
708  }
709 
710  if (waiters == max_waiters) {
711  queue_data **temp = temp_list;
712  temp_list = (queue_data **)malloc(sizeof(queue_data *) * max_waiters * 2);
713  for (size_t i = 0; i < max_waiters; i++) {
714  temp_list[i] = temp[i];
715  }
716  max_waiters *= 2;
717  if (temp != &temp_list_storage[0]) {
718  free(temp);
719  }
720  }
721 
722  data->unpark_info = unpark_info;
723 
724  temp_list[waiters++] = data;
725  data->parker.unpark_start();
726 
727  data = next;
728  } else {
729  *data_location = data->next;
730  prev = data;
731  data = next;
732  }
733  }
734 
735  bucket.mutex.unlock();
736 
737  for (size_t i = 0; i < waiters; i++) {
738  temp_list[i]->parker.unpark();
739  }
740 
741  // TODO: decide if this really needs to be two loops.
742  for (size_t i = 0; i < waiters; i++) {
743  temp_list[i]->parker.unpark_finish();
744  }
745 
746  if (temp_list != &temp_list_storage[0]) {
747  free(temp_list);
748  }
749 
750  return waiters;
751 }
752 
753 WEAK int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, parking_control &control, uintptr_t unpark_info) {
754  bucket_pair buckets = lock_bucket_pair(addr_from, addr_to);
755 
756  validate_action action;
757  if (!control.validate(&control, action)) {
758  unlock_bucket_pair(buckets);
759  return 0;
760  }
761 
762  queue_data **data_location = &buckets.from.head;
763  queue_data *prev = nullptr;
764  queue_data *data = *data_location;
765  queue_data *requeue = nullptr;
766  queue_data *requeue_tail = nullptr;
767  queue_data *wakeup = nullptr;
768 
769  while (data != nullptr) {
770  uintptr_t cur_addr;
771  atomic_load_relaxed(&data->sleep_address, &cur_addr);
772 
773  queue_data *next = data->next;
774  if (cur_addr == addr_from) {
775  *data_location = next;
776 
777  if (buckets.from.tail == data) {
778  buckets.from.tail = prev;
779  }
780 
781  if (action.unpark_one && wakeup == nullptr) {
782  wakeup = data;
783  } else {
784  if (requeue == nullptr) {
785  requeue = data;
786  } else {
787  requeue_tail->next = data;
788  }
789 
790  requeue_tail = data;
791  atomic_store_relaxed(&data->sleep_address, &addr_to);
792  }
793  data = next;
794  // TODO: prev ptr?
795  } else {
796  data_location = &data->next;
797  prev = data;
798  data = next;
799  }
800  }
801 
802  if (requeue != nullptr) {
803  requeue_tail->next = nullptr;
804  if (buckets.to.head == nullptr) {
805  buckets.to.head = requeue;
806  } else {
807  buckets.to.tail->next = requeue;
808  }
809  buckets.to.tail = requeue_tail;
810  }
811 
812  control.requeue_callback(&control, action, wakeup != nullptr, requeue != nullptr);
813 
814  if (wakeup != nullptr) {
815  wakeup->unpark_info = unpark_info;
816  wakeup->parker.unpark_start();
817  unlock_bucket_pair(buckets);
818  wakeup->parker.unpark();
819  wakeup->parker.unpark_finish();
820  } else {
821  unlock_bucket_pair(buckets);
822  }
823 
824  return wakeup != nullptr && action.unpark_one;
825 }
826 
827 WEAK bool mutex_parking_control_validate(void *control, validate_action &action);
828 WEAK uintptr_t mutex_parking_control_unpark(void *control, int unparked, bool more_waiters);
830  uintptr_t *lock_state;
831 
836  }
837 };
838 
839 // Only used in parking -- lock_full.
841  mutex_parking_control *mutex_control = (mutex_parking_control *)control;
842 
843  uintptr_t result;
844  atomic_load_relaxed(mutex_control->lock_state, &result);
845  return result == (lock_bit | parked_bit);
846 }
847 
848 // Only used in unparking -- unlock_full.
849 WEAK uintptr_t mutex_parking_control_unpark(void *control, int unparked, bool more_waiters) {
850  mutex_parking_control *mutex_control = (mutex_parking_control *)control;
851 
852  // TODO: consider handling fairness.
853  uintptr_t return_state = more_waiters ? parked_bit : 0;
854  atomic_store_release(mutex_control->lock_state, &return_state);
855 
856  return 0;
857 }
858 
859 class fast_mutex {
860  uintptr_t state;
861 
862  ALWAYS_INLINE void lock_full() {
863  // Everyone says this should be 40. Have not measured it.
864  spin_control spinner;
865  uintptr_t expected;
866  atomic_load_relaxed(&state, &expected);
867 
868  while (true) {
869  if (!(expected & lock_bit)) {
870  uintptr_t desired = expected | lock_bit;
871  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
872  return;
873  }
874  continue;
875  }
876 
877  // Spin with spin count. Note that this occurs even if
878  // threads are parked. We're prioritizing throughput over
879  // fairness by letting sleeping threads lie.
880  if (spinner.should_spin()) {
882  atomic_load_relaxed(&state, &expected);
883  continue;
884  }
885 
886  // Mark mutex as having parked threads if not already done.
887  if ((expected & parked_bit) == 0) {
888  uintptr_t desired = expected | parked_bit;
889  if (!atomic_cas_weak_relaxed_relaxed(&state, &expected, &desired)) {
890  continue;
891  }
892  }
893 
894  // TODO: consider handling fairness, timeout
895  mutex_parking_control control(&state);
896  uintptr_t result = park((uintptr_t)this, control);
897  if (result == (uintptr_t)this) {
898  return;
899  }
900 
901  spinner.reset();
902  atomic_load_relaxed(&state, &expected);
903  }
904  }
905 
906  ALWAYS_INLINE void unlock_full() {
907  uintptr_t expected = lock_bit;
908  uintptr_t desired = 0;
909  // Try for a fast release of the lock. Redundant with code in lock, but done
910  // to make unlock_full a standalone unlock that can be called directly.
911  if (atomic_cas_strong_release_relaxed(&state, &expected, &desired)) {
912  return;
913  }
914 
915  mutex_parking_control control(&state);
916  unpark_one((uintptr_t)this, control);
917  }
918 
919 public:
921  uintptr_t expected = 0;
922  uintptr_t desired = lock_bit;
923  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
924  if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
925  lock_full();
926  }
927  }
928 
930  uintptr_t expected = lock_bit;
931  uintptr_t desired = 0;
932  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
933  if (!atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
934  unlock_full();
935  }
936  }
937 
939  uintptr_t val;
940  atomic_load_relaxed(&state, &val);
941  while (true) {
942  if (!(val & lock_bit)) {
943  return false;
944  }
945 
946  uintptr_t desired = val | parked_bit;
947  if (atomic_cas_weak_relaxed_relaxed(&state, &val, &desired)) {
948  return true;
949  }
950  }
951  }
952 
954  atomic_or_fetch_relaxed(&state, parked_bit);
955  }
956 };
957 WEAK uintptr_t signal_parking_control_unpark(void *control, int unparked, bool more_waiters);
959  uintptr_t *cond_state;
961 
965  }
966 };
967 
968 WEAK uintptr_t signal_parking_control_unpark(void *control, int unparked, bool more_waiters) {
969  signal_parking_control *signal_control = (signal_parking_control *)control;
970 
971  if (!more_waiters) {
972  uintptr_t val = 0;
973  atomic_store_relaxed(signal_control->cond_state, &val);
974  }
975 
976 #if 0 // TODO: figure out why this was here.
977  return (uintptr_t)signal_control->mutex;
978 #else
979  return 0;
980 #endif
981 }
982 WEAK bool broadcast_parking_control_validate(void *control, validate_action &action);
983 WEAK void broadcast_parking_control_requeue_callback(void *control, const validate_action &action,
984  bool one_to_wake, bool some_requeued);
986  uintptr_t *cond_state;
988 
993  }
994 };
995 
997  broadcast_parking_control *broadcast_control = (broadcast_parking_control *)control;
998 
999  uintptr_t val;
1000  atomic_load_relaxed(broadcast_control->cond_state, &val);
1001  // By the time this broadcast locked everything and was processed, the cond
1002  // has progressed to a new mutex, do nothing since any waiting threads have
1003  // to be waiting on what is effectively a different condition.
1004  if (val != (uintptr_t)broadcast_control->mutex) {
1005  return false;
1006  }
1007  // Clear the cond's connection to the mutex as all waiting threads are going to reque onto the mutex.
1008  val = 0;
1009  atomic_store_relaxed(broadcast_control->cond_state, &val);
1010 
1011  action.unpark_one = !broadcast_control->mutex->make_parked_if_locked();
1012 
1013  return true;
1014 }
1015 
1016 WEAK void broadcast_parking_control_requeue_callback(void *control, const validate_action &action, bool one_to_wake, bool some_requeued) {
1017  broadcast_parking_control *broadcast_control = (broadcast_parking_control *)control;
1018 
1019  if (action.unpark_one && some_requeued) {
1020  broadcast_control->mutex->make_parked();
1021  }
1022 }
1023 WEAK bool wait_parking_control_validate(void *control, validate_action &action);
1024 WEAK void wait_parking_control_before_sleep(void *control);
1025 WEAK uintptr_t wait_parking_control_unpark(void *control, int unparked, bool more_waiters);
1027  uintptr_t *cond_state;
1029 
1035  }
1036 };
1037 
1039  wait_parking_control *wait_control = (wait_parking_control *)control;
1040 
1041  uintptr_t val;
1042  atomic_load_relaxed(wait_control->cond_state, &val);
1043 
1044  if (val == 0) {
1045  val = (uintptr_t)wait_control->mutex;
1046  atomic_store_relaxed(wait_control->cond_state, &val);
1047  } else if (val != (uintptr_t)wait_control->mutex) {
1048  // TODO: signal error.
1049  action.invalid_unpark_info = (uintptr_t)wait_control->mutex;
1050  return false;
1051  }
1052 
1053  return true;
1054 }
1055 
1057  wait_parking_control *wait_control = (wait_parking_control *)control;
1058 
1059  wait_control->mutex->unlock();
1060 }
1061 
1062 WEAK uintptr_t wait_parking_control_unpark(void *control, int unparked, bool more_waiters) {
1063  wait_parking_control *wait_control = (wait_parking_control *)control;
1064 
1065  if (!more_waiters) {
1066  uintptr_t val = 0;
1067  atomic_store_relaxed(wait_control->cond_state, &val);
1068  }
1069  return 0;
1070 }
1071 
1072 class fast_cond {
1073  uintptr_t state = 0;
1074 
1075 public:
1077 
1079  if_tsan_pre_signal(this);
1080 
1081  uintptr_t val;
1082  atomic_load_relaxed(&state, &val);
1083  if (val == 0) {
1084  if_tsan_post_signal(this);
1085  return;
1086  }
1087  signal_parking_control control(&state, (fast_mutex *)val);
1088  unpark_one((uintptr_t)this, control);
1089  if_tsan_post_signal(this);
1090  }
1091 
1093  if_tsan_pre_signal(this);
1094  uintptr_t val;
1095  atomic_load_relaxed(&state, &val);
1096  if (val == 0) {
1097  if_tsan_post_signal(this);
1098  return;
1099  }
1100  broadcast_parking_control control(&state, (fast_mutex *)val);
1101  unpark_requeue((uintptr_t)this, val, control, 0);
1102  if_tsan_post_signal(this);
1103  }
1104 
1106  wait_parking_control control(&state, mutex);
1107  uintptr_t result = park((uintptr_t)this, control);
1108  if (result != (uintptr_t)mutex) {
1109  mutex->lock();
1110  } else {
1111  if_tsan_pre_lock(mutex);
1112 
1113  // TODO: this is debug only.
1114  uintptr_t val;
1115  atomic_load_relaxed((uintptr_t *)mutex, &val);
1116  halide_assert(nullptr, val & 0x1);
1117 
1118  if_tsan_post_lock(mutex);
1119  }
1120  }
1121 };
1122 
1123 } // namespace Synchronization
1124 
1125 } // namespace Internal
1126 } // namespace Runtime
1127 } // namespace Halide
1128 
1129 extern "C" {
1130 
1134  fast_mutex->lock();
1135 }
1136 
1140  fast_mutex->unlock();
1141 }
1142 
1146  fast_cond->broadcast();
1147 }
1148 
1152  fast_cond->signal();
1153 }
1154 
1155 WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex) {
1160  fast_cond->wait(fast_mutex);
1161 }
1162 
1163 // Actual definition of the mutex array.
1166 };
1167 
1169  // TODO: If sz is huge, we should probably hash it down to something smaller
1170  // in the accessors below. Check for deadlocks before doing so.
1172  nullptr, sizeof(halide_mutex_array));
1173  if (array == nullptr) {
1174  // Will result in a failed assertion and a call to halide_error.
1175  return nullptr;
1176  }
1177  array->array = (halide_mutex *)halide_malloc(
1178  nullptr, sz * sizeof(halide_mutex));
1179  if (array->array == nullptr) {
1180  halide_free(nullptr, array);
1181  // Will result in a failed assertion and a call to halide_error.
1182  return nullptr;
1183  }
1184  memset(array->array, 0, sz * sizeof(halide_mutex));
1185  return array;
1186 }
1187 
1189  struct halide_mutex_array *arr_ptr = (struct halide_mutex_array *)array;
1190  halide_free(user_context, arr_ptr->array);
1191  halide_free(user_context, arr_ptr);
1192 }
1193 
1195  halide_mutex_lock(&array->array[entry]);
1196  return 0;
1197 }
1198 
1200  halide_mutex_unlock(&array->array[entry]);
1201  return 0;
1202 }
1203 }
This file declares the routines used by Halide internally in its runtime.
void halide_free(void *user_context, void *ptr)
void * halide_malloc(void *user_context, size_t x)
Halide calls these functions to allocate and free memory.
WEAK uintptr_t signal_parking_control_unpark(void *control, int unparked, bool more_waiters)
WEAK void parking_control_requeue_callback(void *control, const validate_action &action, bool one_to_wake, bool some_requeued)
WEAK void unlock_bucket_pair(bucket_pair &buckets)
WEAK bool parking_control_validate(void *control, validate_action &action)
ALWAYS_INLINE void check_hash(uintptr_t hash)
WEAK int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, parking_control &control, uintptr_t unpark_info)
WEAK uintptr_t park(uintptr_t addr, parking_control &control)
WEAK uintptr_t unpark_one(uintptr_t addr, parking_control &control)
WEAK hash_bucket & lock_bucket(uintptr_t addr)
WEAK uintptr_t wait_parking_control_unpark(void *control, int unparked, bool more_waiters)
WEAK bool wait_parking_control_validate(void *control, validate_action &action)
WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to)
WEAK bool mutex_parking_control_validate(void *control, validate_action &action)
WEAK bool broadcast_parking_control_validate(void *control, validate_action &action)
WEAK void broadcast_parking_control_requeue_callback(void *control, const validate_action &action, bool one_to_wake, bool some_requeued)
WEAK uintptr_t mutex_parking_control_unpark(void *control, int unparked, bool more_waiters)
WEAK uintptr_t parking_control_unpark(void *control, int unparked, bool more_waiters)
WEAK uintptr_t unpark_all(uintptr_t addr, uintptr_t unpark_info)
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
@ Internal
Not visible externally, similar to 'static' linkage in C.
Expr print(const std::vector< Expr > &values)
Create an Expr that prints out its value whenever it is evaluated.
void * user_context
Definition: printer.h:33
void * malloc(size_t)
unsigned __INT8_TYPE__ uint8_t
void * memset(void *s, int val, size_t n)
void halide_thread_yield()
#define ALWAYS_INLINE
unsigned __INT32_TYPE__ uint32_t
#define halide_assert(user_context, cond)
#define WEAK
void free(void *)
ALWAYS_INLINE broadcast_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
ALWAYS_INLINE bucket_pair(hash_bucket &from, hash_bucket &to)
bool(* validate)(void *control, validate_action &action)
uintptr_t(* unpark)(void *control, int unparked, bool more_waiters)
void(* requeue_callback)(void *control, const validate_action &action, bool one_to_wake, bool some_requeued)
ALWAYS_INLINE signal_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
ALWAYS_INLINE wait_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
Cross platform condition variable.
struct halide_mutex * array
Cross-platform mutex.
WEAK int halide_mutex_array_lock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_array_destroy(void *user_context, void *array)
WEAK void halide_mutex_unlock(halide_mutex *mutex)
WEAK halide_mutex_array * halide_mutex_array_create(int sz)
WEAK void halide_cond_signal(struct halide_cond *cond)
#define table
WEAK int halide_mutex_array_unlock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_lock(halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
#define HASH_TABLE_BITS
#define LOAD_FACTOR
WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
WEAK void halide_cond_broadcast(struct halide_cond *cond)
#define MAX_THREADS