Halide  14.0.0
Halide compiler and libraries
synchronization_common.h
Go to the documentation of this file.
1 #include "HalideRuntime.h"
2 #include "printer.h"
3 #include "scoped_spin_lock.h"
4 
5 /* This provides an implementation of pthreads-like mutex and
6  * condition variables with fast default case performance. The code
7  * is based on the "parking lot" design and specifically Amanieu
8  * d'Antras' Rust implementation:
9  * https://github.com/Amanieu/parking_lot
10  * and the one in WTF:
11  * https://webkit.org/blog/6161/locking-in-webkit/
12  *
13  * Neither of the above implementations were used directly largely for
14  * dependency management. This implementation lacks a few features
15  * relative to those two. Specifically timeouts are not
16  * supported. Nor is optional fairness or deadlock detection.
17  * This implementation should provide a faily standalone "one file"
18  * fast synchronization layer on top of readily available system primitives.
19  *
20  * TODO: Implement pthread_once equivalent.
21  * TODO: Add read/write lock and move SharedExclusiveSpinLock from tracing.cpp
22  * to this mechanism.
23  * TODO: Add timeouts and optional fairness if needed.
24  * TODO: Relying on condition variables has issues for old versions of Windows
25  * and likely has portability issues to some very bare bones embedded OSes.
26  * Doing an implementation using only semaphores or event counters should
27  * be doable.
28  */
29 
30 // Copied from tsan_interface.h
31 #ifndef TSAN_ANNOTATIONS
32 #define TSAN_ANNOTATIONS 0
33 #endif
34 
35 #if TSAN_ANNOTATIONS
36 extern "C" {
37 const unsigned __tsan_mutex_linker_init = 1 << 0;
38 void __tsan_mutex_pre_lock(void *addr, unsigned flags);
39 void __tsan_mutex_post_lock(void *addr, unsigned flags, int recursion);
40 int __tsan_mutex_pre_unlock(void *addr, unsigned flags);
41 void __tsan_mutex_post_unlock(void *addr, unsigned flags);
42 void __tsan_mutex_pre_signal(void *addr, unsigned flags);
43 void __tsan_mutex_post_signal(void *addr, unsigned flags);
44 }
45 #endif
46 
47 namespace Halide {
48 namespace Runtime {
49 namespace Internal {
50 
51 namespace Synchronization {
52 
53 namespace {
54 
55 #if TSAN_ANNOTATIONS
56 ALWAYS_INLINE void if_tsan_pre_lock(void *mutex) {
57  __tsan_mutex_pre_lock(mutex, __tsan_mutex_linker_init);
58 };
59 // TODO(zalman|dvyukov): Is 1 the right value for a non-recursive lock? pretty sure value is ignored.
60 ALWAYS_INLINE void if_tsan_post_lock(void *mutex) {
61  __tsan_mutex_post_lock(mutex, __tsan_mutex_linker_init, 1);
62 }
63 // TODO(zalman|dvyukov): Is it safe to ignore return value here if locks are not recursive?
64 ALWAYS_INLINE void if_tsan_pre_unlock(void *mutex) {
65  (void)__tsan_mutex_pre_unlock(mutex, __tsan_mutex_linker_init);
66 }
67 ALWAYS_INLINE void if_tsan_post_unlock(void *mutex) {
68  __tsan_mutex_post_unlock(mutex, __tsan_mutex_linker_init);
69 }
70 ALWAYS_INLINE void if_tsan_pre_signal(void *cond) {
71  __tsan_mutex_pre_signal(cond, 0);
72 }
73 ALWAYS_INLINE void if_tsan_post_signal(void *cond) {
74  __tsan_mutex_post_signal(cond, 0);
75 }
76 #else
77 ALWAYS_INLINE void if_tsan_pre_lock(void *) {
78 }
79 ALWAYS_INLINE void if_tsan_post_lock(void *) {
80 }
81 ALWAYS_INLINE void if_tsan_pre_unlock(void *) {
82 }
83 ALWAYS_INLINE void if_tsan_post_unlock(void *) {
84 }
85 ALWAYS_INLINE void if_tsan_pre_signal(void *) {
86 }
87 ALWAYS_INLINE void if_tsan_post_signal(void *) {
88 }
89 #endif
90 
91 #ifdef BITS_32
92 ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
93  return __sync_and_and_fetch(addr, val);
94 }
95 
96 template<typename T>
97 ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
98  return __sync_fetch_and_add(addr, val);
99 }
100 
101 template<typename T>
102 ALWAYS_INLINE bool cas_strong_sequentially_consistent_helper(T *addr, T *expected, T *desired) {
103  T oldval = *expected;
104  T gotval = __sync_val_compare_and_swap(addr, oldval, *desired);
105  *expected = gotval;
106  return oldval == gotval;
107 }
108 
109 ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
110  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
111 }
112 
113 ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
114  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
115 }
116 
117 template<typename T>
118 ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
119  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
120 }
121 
122 ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
123  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
124 }
125 
126 ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
127  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
128 }
129 
130 ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
131  return __sync_fetch_and_and(addr, val);
132 }
133 
134 template<typename T>
135 ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
136  *val = *addr;
137 }
138 
139 template<typename T>
140 ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
141  __sync_synchronize();
142  *val = *addr;
143 }
144 
145 ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
146  return __sync_or_and_fetch(addr, val);
147 }
148 
149 ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
150  *addr = *val;
151 }
152 
153 template<typename T>
154 ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
155  *addr = *val;
156  __sync_synchronize();
157 }
158 
159 ALWAYS_INLINE void atomic_thread_fence_acquire() {
160  __sync_synchronize();
161 }
162 
163 #else
164 
165 ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
166  return __atomic_and_fetch(addr, val, __ATOMIC_RELEASE);
167 }
168 
169 template<typename T>
170 ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
171  return __atomic_fetch_add(addr, val, __ATOMIC_ACQ_REL);
172 }
173 
174 ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
175  return __atomic_compare_exchange(addr, expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
176 }
177 
178 template<typename T>
179 ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
180  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
181 }
182 
183 ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
184  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
185 }
186 
187 ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
188  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
189 }
190 
191 ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
192  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
193 }
194 
195 ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
196  return __atomic_fetch_and(addr, val, __ATOMIC_RELEASE);
197 }
198 
199 template<typename T>
200 ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
201  __atomic_load(addr, val, __ATOMIC_RELAXED);
202 }
203 
204 template<typename T>
205 ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
206  __atomic_load(addr, val, __ATOMIC_ACQUIRE);
207 }
208 
209 ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
210  return __atomic_or_fetch(addr, val, __ATOMIC_RELAXED);
211 }
212 
213 ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
214  __atomic_store(addr, val, __ATOMIC_RELAXED);
215 }
216 
217 template<typename T>
218 ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
219  __atomic_store(addr, val, __ATOMIC_RELEASE);
220 }
221 
222 ALWAYS_INLINE void atomic_thread_fence_acquire() {
223  __atomic_thread_fence(__ATOMIC_ACQUIRE);
224 }
225 
226 #endif
227 
228 } // namespace
229 
231  // Everyone says this should be 40. Have not measured it.
232  int spin_count = 40;
233 
234 public:
236  if (spin_count > 0) {
237  spin_count--;
238  }
239  return spin_count > 0;
240  }
241 
243  spin_count = 40;
244  }
245 };
246 
247 // Low order two bits are used for locking state,
248 static constexpr uint8_t lock_bit = 0x01;
249 static constexpr uint8_t queue_lock_bit = 0x02;
250 static constexpr uint8_t parked_bit = 0x02;
251 
253  thread_parker parker; // TODO: member or pointer?
254 
255  // This design is from the Rust parking lot implementation by Amanieu d'Antras.
256  // Comment from original:
257  //
258  // Linked list of threads in the queue. The queue is split into two parts:
259  // the processed part and the unprocessed part. When new nodes are added to
260  // the list, they only have the next pointer set, and queue_tail is null.
261  //
262  // Nodes are processed with the queue lock held, which consists of setting
263  // the prev pointer for each node and setting the queue_tail pointer on the
264  // first processed node of the list.
265  //
266  // This setup allows nodes to be added to the queue without a lock, while
267  // still allowing O(1) removal of nodes from the processed part of the list.
268  // The only cost is the O(n) processing, but this only needs to be done
269  // once for each node, and therefore isn't too expensive.
270 
274 };
275 
276 class word_lock {
277  uintptr_t state = 0;
278 
279  void lock_full();
280  void unlock_full();
281 
282 public:
284  if_tsan_pre_lock(this);
285 
286  uintptr_t expected = 0;
287  uintptr_t desired = lock_bit;
288  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
289  if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
290  lock_full();
291  }
292 
293  if_tsan_post_lock(this);
294  }
295 
297  if_tsan_pre_unlock(this);
298 
299  uintptr_t val = atomic_fetch_and_release(&state, ~(uintptr_t)lock_bit);
300  // If another thread is currently queueing, that thread will ensure
301  // it acquires the lock or wakes a waiting thread.
302  bool no_thread_queuing = (val & queue_lock_bit) == 0;
303  // Only need to do a wakeup if there are threads waiting.
304  bool some_queued = (val & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0;
305  if (no_thread_queuing && some_queued) {
306  unlock_full();
307  }
308 
309  if_tsan_post_unlock(this);
310  }
311 };
312 
313 WEAK void word_lock::lock_full() {
314  spin_control spinner;
315  uintptr_t expected;
316  atomic_load_relaxed(&state, &expected);
317 
318  while (true) {
319  if (!(expected & lock_bit)) {
320  uintptr_t desired = expected | lock_bit;
321 
322  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
323  return;
324  }
325  continue;
326  }
327 
328  if (((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0) && spinner.should_spin()) {
330  atomic_load_relaxed(&state, &expected);
331  continue;
332  }
333 
334  word_lock_queue_data node;
335 
336  node.parker.prepare_park();
337  // TODO set up prelinkage parking state
338 
339  word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
340  if (head == nullptr) {
341  node.tail = &node;
342  // constructor set node.prev = nullptr;
343  } else {
344  // Mark the tail as nullptr. The unlock routine will walk the list and wakeup
345  // the thread at the end.
346  // constructor set node.tail = nullptr;
347  // constructor set node.prev = nullptr;
348  node.next = head;
349  }
350 
351  uintptr_t desired = ((uintptr_t)&node) | (expected & (queue_lock_bit | lock_bit));
352  if (atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
353  node.parker.park();
354  spinner.reset();
355  atomic_load_relaxed(&state, &expected);
356  }
357  }
358 }
359 
360 WEAK void word_lock::unlock_full() {
361  uintptr_t expected;
362  atomic_load_relaxed(&state, &expected);
363 
364  while (true) {
365  // If another thread is currently queueing, that thread will ensure
366  // it acquires the lock or wakes a waiting thread.
367  bool thread_queuing = (expected & queue_lock_bit);
368  // Only need to do a wakeup if there are threads waiting.
369  bool none_queued = (expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0;
370  if (thread_queuing || none_queued) {
371  return;
372  }
373 
374  uintptr_t desired = expected | queue_lock_bit;
375  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
376  break;
377  }
378  }
379 
380  while (true) {
381  word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
382  word_lock_queue_data *current = head;
383  word_lock_queue_data *tail = current->tail;
384  while (tail == nullptr) {
385  word_lock_queue_data *next = current->next;
386  halide_abort_if_false(nullptr, next != nullptr);
387  next->prev = current;
388  current = next;
389  tail = current->tail;
390  }
391  head->tail = tail;
392 
393  // If the lock is now locked, unlock the queue and have the thread
394  // that currently holds the lock do the wakeup
395  if (expected & lock_bit) {
396  uintptr_t desired = expected & ~(uintptr_t)queue_lock_bit;
397  if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
398  return;
399  }
400  atomic_thread_fence_acquire();
401  continue;
402  }
403 
404  word_lock_queue_data *new_tail = tail->prev;
405  if (new_tail == nullptr) {
406  bool continue_outer = false;
407  while (!continue_outer) {
408  uintptr_t desired = expected & lock_bit;
409  if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
410  break;
411  }
412  if ((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0) {
413  continue;
414  } else {
415  atomic_thread_fence_acquire();
416  continue_outer = true;
417  }
418  }
419 
420  if (continue_outer) {
421  continue;
422  }
423  } else {
424  head->tail = new_tail;
425  atomic_and_fetch_release(&state, ~(uintptr_t)queue_lock_bit);
426  }
427 
428  // TODO: The reason there are three calls here is other things can happen between them.
429  // Also it is not clear if unpark_start has to return the mutex/flag used by unpark
430  // and unpark_finish due to memory lifetime reasons.
431  tail->parker.unpark_start();
432  tail->parker.unpark();
433  tail->parker.unpark_finish();
434  break;
435  }
436 }
437 
438 struct queue_data {
439  thread_parker parker; // TODO: member or pointer?
440 
441  uintptr_t sleep_address = 0;
442  queue_data *next = nullptr;
443  uintptr_t unpark_info = 0;
444 };
445 
446 // Must be a power of two.
447 constexpr int LOAD_FACTOR = 4;
448 
449 struct hash_bucket {
451 
452  queue_data *head = nullptr; // Is this queue_data or thread_data?
453  queue_data *tail = nullptr; // Is this queue_data or thread_data?
454 };
455 
456 constexpr int HASH_TABLE_SIZE = MAX_THREADS * LOAD_FACTOR;
457 struct hash_table {
459 };
461 
462 constexpr int HASH_TABLE_BITS = 10;
463 static_assert((1 << HASH_TABLE_BITS) >= MAX_THREADS * LOAD_FACTOR);
464 
465 #if 0
466 WEAK void dump_hash() {
467  int i = 0;
468  for (auto &bucket : table.buckets) {
469  queue_data *head = bucket.head;
470  while (head != nullptr) {
471  print(nullptr) << "Bucket index " << i << " addr " << (void *)head->sleep_address << "\n";
472  head = head->next;
473  }
474  i++;
475  }
476 }
477 #endif
478 
479 static ALWAYS_INLINE uintptr_t addr_hash(uintptr_t addr) {
480  // Fibonacci hashing. The golden ratio is 1.9E3779B97F4A7C15F39...
481  // in hexadecimal.
482  if (sizeof(uintptr_t) >= 8) {
483  return (addr * (uintptr_t)0x9E3779B97F4A7C15) >> (64 - HASH_TABLE_BITS);
484  } else {
485  return (addr * (uintptr_t)0x9E3779B9) >> (32 - HASH_TABLE_BITS);
486  }
487 }
488 
489 WEAK hash_bucket &lock_bucket(uintptr_t addr) {
490  uintptr_t hash = addr_hash(addr);
491 
492  halide_debug_assert(nullptr, hash < HASH_TABLE_SIZE);
493 
494  // TODO: if resizing is implemented, loop, etc.
495  hash_bucket &bucket = table.buckets[hash];
496 
497  bucket.mutex.lock();
498 
499  return bucket;
500 }
501 
502 struct bucket_pair {
505 
507  : from(from), to(to) {
508  }
509 };
510 
511 WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to) {
512  // TODO: if resizing is implemented, loop, etc.
513  uintptr_t hash_from = addr_hash(addr_from);
514  uintptr_t hash_to = addr_hash(addr_to);
515 
516  halide_debug_assert(nullptr, hash_from < HASH_TABLE_SIZE);
517  halide_debug_assert(nullptr, hash_to < HASH_TABLE_SIZE);
518 
519  // Lock the bucket with the smaller hash first in order
520  // to prevent deadlock.
521  if (hash_from == hash_to) {
522  hash_bucket &first = table.buckets[hash_from];
523  first.mutex.lock();
524  return bucket_pair(first, first);
525  } else if (hash_from < hash_to) {
526  hash_bucket &first = table.buckets[hash_from];
527  hash_bucket &second = table.buckets[hash_to];
528  first.mutex.lock();
529  second.mutex.lock();
530  return bucket_pair(first, second);
531  } else {
532  hash_bucket &first = table.buckets[hash_to];
533  hash_bucket &second = table.buckets[hash_from];
534  first.mutex.lock();
535  second.mutex.lock();
536  return bucket_pair(second, first);
537  }
538 }
539 
541  // In the lock routine, the buckets are locked smaller hash index first.
542  // Here we reverse this ordering by comparing the pointers. This works
543  // since the pointers are obtained by indexing an array with the hash
544  // values.
545  if (&buckets.from == &buckets.to) {
546  buckets.from.mutex.unlock();
547  } else if (&buckets.from > &buckets.to) {
548  buckets.from.mutex.unlock();
549  buckets.to.mutex.unlock();
550  } else {
551  buckets.to.mutex.unlock();
552  buckets.from.mutex.unlock();
553  }
554 }
555 
557  bool unpark_one = false;
558  uintptr_t invalid_unpark_info = 0;
559 };
560 
562  uintptr_t park(uintptr_t addr);
563  uintptr_t unpark_one(uintptr_t addr);
564  int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info);
565 
566 protected:
567  virtual bool validate(validate_action &action) {
568  return true;
569  }
570  virtual void before_sleep() {
571  // nothing
572  }
573  virtual uintptr_t unpark(int unparked, bool more_waiters) {
574  return 0;
575  }
576  virtual void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) {
577  // nothing
578  }
579 };
580 
581 // TODO: Do we need a park_result thing here?
582 WEAK uintptr_t parking_control::park(uintptr_t addr) {
584 
585  hash_bucket &bucket = lock_bucket(addr);
586 
587  validate_action action;
588  if (!validate(action)) {
589  bucket.mutex.unlock();
590  return action.invalid_unpark_info;
591  }
592 
593  queue_data.next = nullptr;
594  queue_data.sleep_address = addr;
595  queue_data.parker.prepare_park();
596  if (bucket.head != nullptr) {
597  bucket.tail->next = &queue_data;
598  } else {
599  bucket.head = &queue_data;
600  }
601  bucket.tail = &queue_data;
602  bucket.mutex.unlock();
603 
604  before_sleep();
605 
606  queue_data.parker.park();
607 
608  return queue_data.unpark_info;
609 
610  // TODO: handling timeout.
611 }
612 
613 WEAK uintptr_t parking_control::unpark_one(uintptr_t addr) {
614  hash_bucket &bucket = lock_bucket(addr);
615 
616  queue_data **data_location = &bucket.head;
617  queue_data *prev = nullptr;
618  queue_data *data = *data_location;
619  while (data != nullptr) {
620  uintptr_t cur_addr;
621  atomic_load_relaxed(&data->sleep_address, &cur_addr);
622  if (cur_addr == addr) {
623  queue_data *next = data->next;
624  *data_location = next;
625 
626  bool more_waiters = false;
627 
628  if (bucket.tail == data) {
629  bucket.tail = prev;
630  } else {
631  queue_data *data2 = next;
632  while (data2 != nullptr && !more_waiters) {
633  uintptr_t cur_addr2;
634  atomic_load_relaxed(&data2->sleep_address, &cur_addr2);
635  more_waiters = (cur_addr2 == addr);
636  data2 = data2->next;
637  }
638  }
639 
640  data->unpark_info = unpark(1, more_waiters);
641 
642  data->parker.unpark_start();
643  bucket.mutex.unlock();
644  data->parker.unpark();
645  data->parker.unpark_finish();
646 
647  // TODO: Figure out ret type.
648  return more_waiters ? 1 : 0;
649  } else {
650  data_location = &data->next;
651  prev = data;
652  data = data->next;
653  }
654  }
655 
656  unpark(0, false);
657 
658  bucket.mutex.unlock();
659 
660  // TODO: decide if this is the right return value.
661  return 0;
662 }
663 
664 WEAK int parking_control::unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info) {
665  bucket_pair buckets = lock_bucket_pair(addr_from, addr_to);
666 
667  validate_action action;
668  if (!validate(action)) {
669  unlock_bucket_pair(buckets);
670  return 0;
671  }
672 
673  queue_data **data_location = &buckets.from.head;
674  queue_data *prev = nullptr;
675  queue_data *data = *data_location;
676  queue_data *requeue = nullptr;
677  queue_data *requeue_tail = nullptr;
678  queue_data *wakeup = nullptr;
679 
680  while (data != nullptr) {
681  uintptr_t cur_addr;
682  atomic_load_relaxed(&data->sleep_address, &cur_addr);
683 
684  queue_data *next = data->next;
685  if (cur_addr == addr_from) {
686  *data_location = next;
687 
688  if (buckets.from.tail == data) {
689  buckets.from.tail = prev;
690  }
691 
692  if (action.unpark_one && wakeup == nullptr) {
693  wakeup = data;
694  } else {
695  if (requeue == nullptr) {
696  requeue = data;
697  } else {
698  requeue_tail->next = data;
699  }
700 
701  requeue_tail = data;
702  atomic_store_relaxed(&data->sleep_address, &addr_to);
703  }
704  data = next;
705  // TODO: prev ptr?
706  } else {
707  data_location = &data->next;
708  prev = data;
709  data = next;
710  }
711  }
712 
713  if (requeue != nullptr) {
714  requeue_tail->next = nullptr;
715  if (buckets.to.head == nullptr) {
716  buckets.to.head = requeue;
717  } else {
718  buckets.to.tail->next = requeue;
719  }
720  buckets.to.tail = requeue_tail;
721  }
722 
723  requeue_callback(action, wakeup != nullptr, requeue != nullptr);
724 
725  if (wakeup != nullptr) {
726  wakeup->unpark_info = unpark_info;
727  wakeup->parker.unpark_start();
728  unlock_bucket_pair(buckets);
729  wakeup->parker.unpark();
730  wakeup->parker.unpark_finish();
731  } else {
732  unlock_bucket_pair(buckets);
733  }
734 
735  return wakeup != nullptr && action.unpark_one;
736 }
737 
738 struct mutex_parking_control final : public parking_control {
739  uintptr_t *const lock_state;
740 
743  }
744 
745 protected:
746  bool validate(validate_action &action) final {
747  uintptr_t result;
748  atomic_load_relaxed(lock_state, &result);
749  return result == (lock_bit | parked_bit);
750  }
751 
752  uintptr_t unpark(int unparked, bool more_waiters) final {
753  // TODO: consider handling fairness.
754  uintptr_t return_state = more_waiters ? parked_bit : 0;
755  atomic_store_release(lock_state, &return_state);
756  return 0;
757  }
758 };
759 
760 class fast_mutex {
761  uintptr_t state = 0;
762 
763  ALWAYS_INLINE void lock_full() {
764  // Everyone says this should be 40. Have not measured it.
765  spin_control spinner;
766  uintptr_t expected;
767  atomic_load_relaxed(&state, &expected);
768 
769  while (true) {
770  if (!(expected & lock_bit)) {
771  uintptr_t desired = expected | lock_bit;
772  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
773  return;
774  }
775  continue;
776  }
777 
778  // Spin with spin count. Note that this occurs even if
779  // threads are parked. We're prioritizing throughput over
780  // fairness by letting sleeping threads lie.
781  if (spinner.should_spin()) {
783  atomic_load_relaxed(&state, &expected);
784  continue;
785  }
786 
787  // Mark mutex as having parked threads if not already done.
788  if ((expected & parked_bit) == 0) {
789  uintptr_t desired = expected | parked_bit;
790  if (!atomic_cas_weak_relaxed_relaxed(&state, &expected, &desired)) {
791  continue;
792  }
793  }
794 
795  // TODO: consider handling fairness, timeout
796  mutex_parking_control control(&state);
797  uintptr_t result = control.park((uintptr_t)this);
798  if (result == (uintptr_t)this) {
799  return;
800  }
801 
802  spinner.reset();
803  atomic_load_relaxed(&state, &expected);
804  }
805  }
806 
807  ALWAYS_INLINE void unlock_full() {
808  uintptr_t expected = lock_bit;
809  uintptr_t desired = 0;
810  // Try for a fast release of the lock. Redundant with code in lock, but done
811  // to make unlock_full a standalone unlock that can be called directly.
812  if (atomic_cas_strong_release_relaxed(&state, &expected, &desired)) {
813  return;
814  }
815 
816  mutex_parking_control control(&state);
817  control.unpark_one((uintptr_t)this);
818  }
819 
820 public:
822  uintptr_t expected = 0;
823  uintptr_t desired = lock_bit;
824  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
825  if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
826  lock_full();
827  }
828  }
829 
831  uintptr_t expected = lock_bit;
832  uintptr_t desired = 0;
833  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
834  if (!atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
835  unlock_full();
836  }
837  }
838 
840  uintptr_t val;
841  atomic_load_relaxed(&state, &val);
842  while (true) {
843  if (!(val & lock_bit)) {
844  return false;
845  }
846 
847  uintptr_t desired = val | parked_bit;
848  if (atomic_cas_weak_relaxed_relaxed(&state, &val, &desired)) {
849  return true;
850  }
851  }
852  }
853 
855  atomic_or_fetch_relaxed(&state, parked_bit);
856  }
857 };
858 
860  uintptr_t *const cond_state;
862 
865  }
866 
867 protected:
868  uintptr_t unpark(int unparked, bool more_waiters) final {
869  if (!more_waiters) {
870  uintptr_t val = 0;
871  atomic_store_relaxed(cond_state, &val);
872  }
873 
874 #if 0 // TODO: figure out why this was here.
875  return (uintptr_t)mutex;
876 #else
877  return 0;
878 #endif
879  }
880 };
881 
883  uintptr_t *const cond_state;
885 
888  }
889 
890 protected:
891  bool validate(validate_action &action) final {
892  uintptr_t val;
893  atomic_load_relaxed(cond_state, &val);
894  // By the time this broadcast locked everything and was processed, the cond
895  // has progressed to a new mutex, do nothing since any waiting threads have
896  // to be waiting on what is effectively a different condition.
897  if (val != (uintptr_t)mutex) {
898  return false;
899  }
900  // Clear the cond's connection to the mutex as all waiting threads are going to reque onto the mutex.
901  val = 0;
902  atomic_store_relaxed(cond_state, &val);
903  action.unpark_one = !mutex->make_parked_if_locked();
904  return true;
905  }
906 
907  void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) final {
908  if (action.unpark_one && some_requeued) {
909  mutex->make_parked();
910  }
911  }
912 };
913 
914 struct wait_parking_control final : public parking_control {
915  uintptr_t *const cond_state;
917 
920  }
921 
922 protected:
923  bool validate(validate_action &action) final {
924  uintptr_t val;
925  atomic_load_relaxed(cond_state, &val);
926 
927  if (val == 0) {
928  val = (uintptr_t)mutex;
929  atomic_store_relaxed(cond_state, &val);
930  } else if (val != (uintptr_t)mutex) {
931  // TODO: signal error.
932  action.invalid_unpark_info = (uintptr_t)mutex;
933  return false;
934  }
935 
936  return true;
937  }
938 
939  void before_sleep() final {
940  mutex->unlock();
941  }
942 
943  uintptr_t unpark(int unparked, bool more_waiters) final {
944  if (!more_waiters) {
945  uintptr_t val = 0;
946  atomic_store_relaxed(cond_state, &val);
947  }
948  return 0;
949  }
950 };
951 
952 class fast_cond {
953  uintptr_t state = 0;
954 
955 public:
957  if_tsan_pre_signal(this);
958 
959  uintptr_t val;
960  atomic_load_relaxed(&state, &val);
961  if (val == 0) {
962  if_tsan_post_signal(this);
963  return;
964  }
965  signal_parking_control control(&state, (fast_mutex *)val);
966  control.unpark_one((uintptr_t)this);
967  if_tsan_post_signal(this);
968  }
969 
971  if_tsan_pre_signal(this);
972  uintptr_t val;
973  atomic_load_relaxed(&state, &val);
974  if (val == 0) {
975  if_tsan_post_signal(this);
976  return;
977  }
978  broadcast_parking_control control(&state, (fast_mutex *)val);
979  control.unpark_requeue((uintptr_t)this, val, 0);
980  if_tsan_post_signal(this);
981  }
982 
984  wait_parking_control control(&state, mutex);
985  uintptr_t result = control.park((uintptr_t)this);
986  if (result != (uintptr_t)mutex) {
987  mutex->lock();
988  } else {
989  if_tsan_pre_lock(mutex);
990 
991  // TODO: this is debug only.
992  uintptr_t val;
993  atomic_load_relaxed((uintptr_t *)mutex, &val);
994  halide_abort_if_false(nullptr, val & 0x1);
995 
996  if_tsan_post_lock(mutex);
997  }
998  }
999 };
1000 
1001 } // namespace Synchronization
1002 
1003 } // namespace Internal
1004 } // namespace Runtime
1005 } // namespace Halide
1006 
1007 extern "C" {
1008 
1012  fast_mutex->lock();
1013 }
1014 
1018  fast_mutex->unlock();
1019 }
1020 
1024  fast_cond->broadcast();
1025 }
1026 
1030  fast_cond->signal();
1031 }
1032 
1033 WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex) {
1038  fast_cond->wait(fast_mutex);
1039 }
1040 
1041 // Actual definition of the mutex array.
1044 };
1045 
1047  // TODO: If sz is huge, we should probably hash it down to something smaller
1048  // in the accessors below. Check for deadlocks before doing so.
1050  nullptr, sizeof(halide_mutex_array));
1051  if (array == nullptr) {
1052  // Will result in a failed assertion and a call to halide_error.
1053  return nullptr;
1054  }
1055  array->array = (halide_mutex *)halide_malloc(
1056  nullptr, sz * sizeof(halide_mutex));
1057  if (array->array == nullptr) {
1058  halide_free(nullptr, array);
1059  // Will result in a failed assertion and a call to halide_error.
1060  return nullptr;
1061  }
1062  memset(array->array, 0, sz * sizeof(halide_mutex));
1063  return array;
1064 }
1065 
1066 WEAK void halide_mutex_array_destroy(void *user_context, void *array) {
1067  struct halide_mutex_array *arr_ptr = (struct halide_mutex_array *)array;
1068  halide_free(user_context, arr_ptr->array);
1069  halide_free(user_context, arr_ptr);
1070 }
1071 
1073  halide_mutex_lock(&array->array[entry]);
1074  return 0;
1075 }
1076 
1078  halide_mutex_unlock(&array->array[entry]);
1079  return 0;
1080 }
1081 }
This file declares the routines used by Halide internally in its runtime.
void halide_free(void *user_context, void *ptr)
void * halide_malloc(void *user_context, size_t x)
Halide calls these functions to allocate and free memory.
WEAK void unlock_bucket_pair(bucket_pair &buckets)
WEAK hash_bucket & lock_bucket(uintptr_t addr)
WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to)
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
@ Internal
Not visible externally, similar to 'static' linkage in C.
Expr print(const std::vector< Expr > &values)
Create an Expr that prints out its value whenever it is evaluated.
#define halide_debug_assert(user_context, cond)
halide_debug_assert() is like halide_assert(), but only expands into a check when DEBUG_RUNTIME is de...
unsigned __INT8_TYPE__ uint8_t
void * memset(void *s, int val, size_t n)
void halide_thread_yield()
#define ALWAYS_INLINE
#define halide_abort_if_false(user_context, cond)
#define WEAK
ALWAYS_INLINE broadcast_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) final
ALWAYS_INLINE bucket_pair(hash_bucket &from, hash_bucket &to)
virtual uintptr_t unpark(int unparked, bool more_waiters)
virtual void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued)
int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info)
ALWAYS_INLINE signal_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
uintptr_t unpark(int unparked, bool more_waiters) final
ALWAYS_INLINE wait_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
Cross platform condition variable.
struct halide_mutex * array
Cross-platform mutex.
WEAK int halide_mutex_array_lock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_array_destroy(void *user_context, void *array)
WEAK void halide_mutex_unlock(halide_mutex *mutex)
WEAK halide_mutex_array * halide_mutex_array_create(int sz)
WEAK void halide_cond_signal(struct halide_cond *cond)
WEAK int halide_mutex_array_unlock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_lock(halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
WEAK void halide_cond_broadcast(struct halide_cond *cond)