xenium
vyukov_bounded_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_VYUKOV_BOUNDED_QUEUE_HPP
7 #define XENIUM_VYUKOV_BOUNDED_QUEUE_HPP
8 
9 #include <xenium/utils.hpp>
10 #include <xenium/parameter.hpp>
11 
12 #include <atomic>
13 #include <cassert>
14 #include <cstdint>
15 #include <memory>
16 
17 #ifdef _MSC_VER
18 #pragma warning(push)
19 #pragma warning(disable: 4324) // structure was padded due to alignment specifier
20 #endif
21 
22 namespace xenium {
23 
24 namespace policy {
30  template <bool Value>
32 
33 }
58 template<class T, class... Policies>
60 public:
61  using value_type = T;
62 
63  static constexpr bool default_to_weak =
64  parameter::value_param_t<bool, policy::default_to_weak, false, Policies...>::value;;
65 
70  vyukov_bounded_queue(std::size_t size) :
71  cells(new cell[size]),
72  index_mask(size - 1)
73  {
74  assert(size >= 2 && utils::is_power_of_two(size));
75  for (std::size_t i = 0; i < size; ++i)
76  cells[i].sequence.store(i, std::memory_order_relaxed);
77  enqueue_pos.store(0, std::memory_order_relaxed);
78  dequeue_pos.store(0, std::memory_order_relaxed);
79  }
80 
83 
84  vyukov_bounded_queue& operator= (const vyukov_bounded_queue&) = delete;
85  vyukov_bounded_queue& operator= (vyukov_bounded_queue&&) = delete;
86 
99  template <class... Args>
100  bool try_push(Args&&... args) {
101  return do_try_push<default_to_weak>(std::forward<Args>(args)...);
102  }
103 
121  template <class... Args>
122  bool try_push_strong(Args&&... args) {
123  return do_try_push<false>(std::forward<Args>(args)...);
124  }
125 
143  template <class... Args>
144  bool try_push_weak(Args&&... args) {
145  return do_try_push<true>(std::forward<Args>(args)...);
146  }
147 
159  [[nodiscard]] bool try_pop(T& result) {
160  return do_try_pop<default_to_weak>(result);
161  }
162 
174  [[nodiscard]] bool try_pop_strong(T& result) {
175  return do_try_pop<false>(result);
176  }
177 
189  [[nodiscard]] bool try_pop_weak(T& result) {
190  return do_try_pop<true>(result);
191  }
192 
193 private:
194  template <bool Weak, class... Args>
195  bool do_try_push(Args&&... args) {
196  cell* c;
197  std::size_t pos = enqueue_pos.load(std::memory_order_relaxed);
198  for (;;) {
199  c = &cells[pos & index_mask];
200  // (3) - this acquire-load synchronizes-with the release-store (2)
201  std::size_t seq = c->sequence.load(std::memory_order_acquire);
202  if (seq == pos) {
203  if (enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
204  break;
205  } else {
206  if (Weak) {
207  if (seq < pos)
208  return false;
209  else
210  pos = enqueue_pos.load(std::memory_order_relaxed);
211  } else {
212  auto pos2 = enqueue_pos.load(std::memory_order_relaxed);
213  if (pos2 == pos && dequeue_pos.load(std::memory_order_relaxed) + index_mask + 1 == pos)
214  return false;
215  pos = pos2;
216  }
217  }
218  }
219  assign_value(c->value, std::forward<Args>(args)...);
220  // (4) - this release-store synchronizes-with the acquire-load (1)
221  c->sequence.store(pos + 1, std::memory_order_release);
222  return true;
223  }
224 
225  template <bool Weak>
226  bool do_try_pop(T& result) {
227  cell* c;
228  std::size_t pos = dequeue_pos.load(std::memory_order_relaxed);
229  for (;;) {
230  c = &cells[pos & index_mask];
231  // (1) - this acquire-load synchronizes-with the release-store (4)
232  std::size_t seq = c->sequence.load(std::memory_order_acquire);
233  auto new_pos = pos + 1;
234  if (seq == new_pos) {
235  if (dequeue_pos.compare_exchange_weak(pos, new_pos, std::memory_order_relaxed))
236  break;
237  } else {
238  if (Weak) {
239  if (seq < new_pos)
240  return false;
241  pos = dequeue_pos.load(std::memory_order_relaxed);
242  } else {
243  auto pos2 = dequeue_pos.load(std::memory_order_relaxed);
244  if (pos2 == pos && enqueue_pos.load(std::memory_order_relaxed) == pos)
245  return false;
246  pos = pos2;
247  }
248  }
249  }
250  result = std::move(c->value);
251  // (2) - this release-store synchronizes-with the acquire-load (3)
252  c->sequence.store(pos + index_mask + 1, std::memory_order_release);
253  return true;
254  }
255 
256  void assign_value(T& v, const T& source) { v = source; }
257  void assign_value(T& v, T&& source) { v = std::move(source); }
258  template <class... Args>
259  void assign_value(T& v, Args&&... args) { v = T{std::forward<Args>(args)...}; }
260 
261  // TODO - add optional padding via policy
262  struct cell {
263  std::atomic<std::size_t> sequence;
264  T value;
265  };
266 
267  std::unique_ptr<cell[]> cells;
268  const std::size_t index_mask;
269  alignas(64) std::atomic<size_t> enqueue_pos;
270  alignas(64) std::atomic<size_t> dequeue_pos;
271 };
272 }
273 
274 #ifdef _MSC_VER
275 #pragma warning(pop)
276 #endif
277 
278 #endif
xenium::vyukov_bounded_queue::try_pop_weak
bool try_pop_weak(T &result)
Tries to pop an element from the queue.
Definition: vyukov_bounded_queue.hpp:189
xenium::vyukov_bounded_queue
A bounded generic multi-producer/multi-consumer FIFO queue.
Definition: vyukov_bounded_queue.hpp:59
xenium::policy::default_to_weak
Policy to configure whether try_push/try_pop in vyukov_bounded_queue should default to try_push_weak/...
Definition: vyukov_bounded_queue.hpp:31
xenium::vyukov_bounded_queue::try_push_weak
bool try_push_weak(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:144
xenium::vyukov_bounded_queue::try_push_strong
bool try_push_strong(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:122
xenium::vyukov_bounded_queue::vyukov_bounded_queue
vyukov_bounded_queue(std::size_t size)
Constructs a new instance with the specified maximum size.
Definition: vyukov_bounded_queue.hpp:70
xenium::vyukov_bounded_queue::try_pop
bool try_pop(T &result)
Tries to pop an element from the queue.
Definition: vyukov_bounded_queue.hpp:159
xenium::vyukov_bounded_queue::try_push
bool try_push(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:100
xenium::vyukov_bounded_queue::try_pop_strong
bool try_pop_strong(T &result)
Tries to pop an element from the queue as long as the queue is not empty.
Definition: vyukov_bounded_queue.hpp:174