xenium
kirsch_bounded_kfifo_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
8 
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
13 
14 #include <xenium/detail/pointer_queue_traits.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <cstdint>
19 #include <stdexcept>
20 
21 #ifdef _MSC_VER
22 #pragma warning(push)
23 #pragma warning(disable: 26495) // uninitialized member variable
24 #endif
25 
26 namespace xenium {
46  template <class T, class... Policies>
48  private:
49  using traits = detail::pointer_queue_traits_t<T, Policies...>;
50  using raw_value_type = typename traits::raw_type;
51  public:
52  using value_type = T;
53  static constexpr unsigned padding_bytes = parameter::value_param_t<unsigned, policy::padding_bytes, sizeof(raw_value_type), Policies...>::value;
54 
55  kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments);
57 
60 
61  kirsch_bounded_kfifo_queue& operator= (const kirsch_bounded_kfifo_queue&) = delete;
63 
70  bool try_push(value_type value);
71 
78  bool try_pop(value_type& result);
79 
80  private:
82 
83  struct padded_entry {
84  std::atomic<marked_value> value;
85  // we use max here to avoid arrays of size zero which are not allowed by Visual C++
86  char padding[std::max(padding_bytes, 1u)];
87  };
88 
89  struct unpadded_entry {
90  std::atomic<marked_value> value;
91  };
92 
93  using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
94 
95  public:
99  static constexpr std::size_t entry_size = sizeof(entry);
100 
101  private:
102  struct marked_idx {
103  marked_idx() = default;
104  marked_idx(uint64_t val, uint64_t mark) noexcept { val_ = val | (mark << bits); }
105 
106  uint64_t get() const noexcept { return val_ & val_mask; }
107  uint64_t mark() const noexcept { return val_ >> bits; }
108  bool operator==(const marked_idx& other) const noexcept { return this->val_ == other.val_; }
109  bool operator!=(const marked_idx& other) const noexcept { return this->val_ != other.val_; }
110  private:
111  static constexpr unsigned bits = 16;
112  static constexpr uint64_t val_mask = (static_cast<uint64_t>(1) << bits) - 1;
113  uint64_t val_ = 0;
114  };
115 
116  template <bool Empty>
117  bool find_index(uint64_t start_index, uint64_t& index, marked_value& old);
118  bool queue_full(const marked_idx& head_old, const marked_idx& tail_old) const;
119  bool segment_empty(const marked_idx& head_old) const;
120  bool not_in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current) const;
121  bool in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current) const;
122  bool committed(const marked_idx& tail_old, marked_value new_value, uint64_t index);
123 
124  std::uint64_t queue_size_;
125  std::size_t k_;
126  // all operations on head/tail are synchronized via the value operations and
127  // can therefore use memory_order_relaxed.
128  std::atomic<marked_idx> head_;
129  std::atomic<marked_idx> tail_;
130  std::unique_ptr<entry[]> queue_;
131  };
132 
133  template <class T, class... Policies>
134  kirsch_bounded_kfifo_queue<T, Policies...>::kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments) :
135  queue_size_(k * num_segments),
136  k_(k),
137  head_(),
138  tail_(),
139  queue_(new entry[k * num_segments]())
140  {}
141 
142  template <class T, class... Policies>
143  kirsch_bounded_kfifo_queue<T, Policies...>::~kirsch_bounded_kfifo_queue() {
144  for (unsigned i = 0; i < queue_size_; ++i)
145  traits::delete_value(queue_[i].value.load(std::memory_order_relaxed).get());
146  }
147 
148  template <class T, class... Policies>
150  if (value == nullptr)
151  throw std::invalid_argument("value can not be nullptr");
152 
153  raw_value_type raw_value = traits::get_raw(value);
154  for (;;) {
155  marked_idx tail_old = tail_.load(std::memory_order_relaxed);
156  marked_idx head_old = head_.load(std::memory_order_relaxed);
157 
158  uint64_t idx;
159  marked_value old_value;
160  bool found_idx = find_index<true>(tail_old.get(), idx, old_value);
161  if (tail_old != tail_.load(std::memory_order_relaxed))
162  continue;
163 
164  if (found_idx) {
165  assert(old_value.get() == nullptr);
166  const marked_value new_value(raw_value, old_value.mark() + 1);
167  // (1) - this release-CAS synchronizes with the acquire-load (3, 4)
168  if (queue_[idx].value.compare_exchange_strong(
169  old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
170  committed(tail_old, new_value, idx)) {
171  traits::release(value);
172  return true;
173  }
174  } else {
175  if (queue_full(head_old, tail_old)) {
176  if (segment_empty(head_old)) {
177  // increment head by k
178  marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
179  head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
180  } else if (head_old == head_.load(std::memory_order_relaxed)) {
181  // queue is full
182  return false;
183  }
184  }
185  // increment tail by k
186  marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
187  tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
188  }
189  }
190  }
191 
192  template <class T, class... Policies>
194  for (;;) {
195  marked_idx head_old = head_.load(std::memory_order_relaxed);
196  marked_idx tail_old = tail_.load(std::memory_order_relaxed);
197 
198  uint64_t idx;
199  marked_value old_value;
200  bool found_idx = find_index<false>(head_old.get(), idx, old_value);
201  if (head_old != head_.load(std::memory_order_relaxed))
202  continue;
203 
204  if (found_idx) {
205  assert(old_value.get() != nullptr);
206  if (head_old.get() == tail_old.get()) {
207  marked_idx new_tail((tail_old.get() + k_) % queue_size_, tail_old.mark() + 1);
208  tail_.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
209  }
210  marked_value new_value(nullptr, old_value.mark() + 1);
211  // (2) - this release-CAS synchronizes with the acquire-load (3, 4)
212  if (queue_[idx].value.compare_exchange_strong(
213  old_value, new_value, std::memory_order_release, std::memory_order_relaxed)) {
214  traits::store(result, old_value.get());
215  return true;
216  }
217  } else {
218  if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
219  return false;
220 
221  marked_idx new_head((head_old.get() + k_) % queue_size_, head_old.mark() + 1);
222  head_.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
223  }
224  }
225  }
226 
227  template <class T, class... Policies>
228  template <bool Empty>
230  uint64_t start_index, uint64_t& value_index, marked_value& old)
231  {
232  const uint64_t random_index = utils::random() % k_;
233  for (size_t i = 0; i < k_; i++) {
234  // TODO - this can be simplified if queue_size is a multiple of k!
235  uint64_t index = (start_index + ((random_index + i) % k_)) % queue_size_;
236  // (3) - this acquire-load synchronizes-with the release-CAS (1, 2)
237  old = queue_[index].value.load(std::memory_order_acquire);
238  if ((Empty && old.get() == nullptr) || (!Empty && old.get() != nullptr)) {
239  value_index = index;
240  return true;
241  }
242  }
243  return false;
244  }
245 
246  template <class T, class... Policies>
247  bool kirsch_bounded_kfifo_queue<T, Policies...>::committed(
248  const marked_idx& tail_old, marked_value value, uint64_t index)
249  {
250  if (queue_[index].value.load(std::memory_order_relaxed) != value)
251  return true;
252 
253  marked_idx tail_current = tail_.load(std::memory_order_relaxed);
254  marked_idx head_current = head_.load(std::memory_order_relaxed);
255  if (in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
256  return true;
257  } else if (not_in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
258  marked_value new_value(nullptr, value.mark() + 1);
259  if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
260  return true;
261  } else {
262  marked_idx new_head(head_current.get(), head_current.mark() + 1);
263  if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
264  return true;
265 
266  marked_value new_value(nullptr, value.mark() + 1);
267  if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
268  return true;
269  }
270  return false;
271  }
272 
273  template <class T, class... Policies>
274  bool kirsch_bounded_kfifo_queue<T, Policies...>::queue_full(
275  const marked_idx& head_old, const marked_idx& tail_old) const
276  {
277  if (((tail_old.get() + k_) % queue_size_) == head_old.get() &&
278  (head_old == head_.load(std::memory_order_relaxed)))
279  return true;
280  return false;
281  }
282 
283  template <class T, class... Policies>
284  bool kirsch_bounded_kfifo_queue<T, Policies...>::segment_empty(const marked_idx& head_old) const {
285  const uint64_t start = head_old.get();
286  for (size_t i = 0; i < k_; i++) {
287  // TODO - this can be simplified if queue_size is a multiple of k!
288  // (4) - this acquire-load synchronizes-with the release-CAS (1, 2)
289  if (queue_[(start + i) % queue_size_].value.load(std::memory_order_acquire).get() != nullptr)
290  return false;
291  }
292  return true;
293  }
294 
295  template <class T, class... Policies>
296  bool kirsch_bounded_kfifo_queue<T, Policies...>::in_valid_region(uint64_t tail_old,
297  uint64_t tail_current, uint64_t head_current) const
298  {
299  bool wrap_around = tail_current < head_current;
300  if (!wrap_around)
301  return head_current < tail_old && tail_old <= tail_current;
302  return head_current < tail_old || tail_old <= tail_current;
303  }
304 
305  template <class T, class... Policies>
306  bool kirsch_bounded_kfifo_queue<T, Policies...>::not_in_valid_region(uint64_t tail_old,
307  uint64_t tail_current, uint64_t head_current) const
308  {
309  bool wrap_around = tail_current < head_current;
310  if (!wrap_around)
311  return tail_old < tail_current || head_current < tail_old;
312  return tail_old < tail_current && head_current < tail_old;
313  }
314 }
315 #ifdef _MSC_VER
316 #pragma warning(pop)
317 #endif
318 
319 #endif
xenium::marked_ptr
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
xenium::kirsch_bounded_kfifo_queue::try_push
bool try_push(value_type value)
Tries to push a new element to the queue. Progress guarantees: lock-free.
Definition: kirsch_bounded_kfifo_queue.hpp:149
xenium::marked_ptr::mark
uintptr_t mark() const noexcept
Get the mark value.
Definition: marked_ptr.hpp:70
xenium::policy::padding_bytes
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition: policy.hpp:117
xenium::kirsch_bounded_kfifo_queue::try_pop
bool try_pop(value_type &result)
Definition: kirsch_bounded_kfifo_queue.hpp:193
xenium::marked_ptr::get
T * get() const noexcept
Get underlying pointer (with mark bits stripped off).
Definition: marked_ptr.hpp:77
xenium::kirsch_bounded_kfifo_queue::entry_size
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition: kirsch_bounded_kfifo_queue.hpp:99
xenium::kirsch_bounded_kfifo_queue
A bounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition: kirsch_bounded_kfifo_queue.hpp:47