Libosmium  2.12.2
Fast and flexible C++ library for working with OpenStreetMap data
queue.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_THREAD_QUEUE_HPP
2 #define OSMIUM_THREAD_QUEUE_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2017 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <chrono>
37 #include <condition_variable>
38 #include <cstddef>
39 #include <mutex>
40 #include <queue>
41 #include <string>
42 #include <thread>
43 #include <utility> // IWYU pragma: keep
44 
45 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
46 # include <atomic>
47 # include <iostream>
48 #endif
49 
50 namespace osmium {
51 
52  namespace thread {
53 
54  static const std::chrono::milliseconds max_wait{10};
55 
59  template <typename T>
60  class Queue {
61 
64  const size_t m_max_size;
65 
67  const std::string m_name;
68 
69  mutable std::mutex m_mutex;
70 
71  std::queue<T> m_queue;
72 
74  std::condition_variable m_data_available;
75 
77  std::condition_variable m_space_available;
78 
79 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
80  size_t m_largest_size;
82 
84  std::atomic<int> m_push_counter;
85 
88  std::atomic<int> m_full_counter;
89 
94  std::atomic<int> m_pop_counter;
95 
98  std::atomic<int> m_empty_counter;
99 #endif
100 
101  public:
102 
110  explicit Queue(size_t max_size = 0, const std::string& name = "") :
111  m_max_size(max_size),
112  m_name(name),
113  m_mutex(),
114  m_queue(),
115  m_data_available(),
116  m_space_available()
117 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
118  ,
119  m_largest_size(0),
120  m_push_counter(0),
121  m_full_counter(0),
122  m_pop_counter(0),
123  m_empty_counter(0)
124 #endif
125  {
126  }
127 
128  ~Queue() {
129 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
130  std::cerr << "queue '" << m_name
131  << "' with max_size=" << m_max_size
132  << " had largest size " << m_largest_size
133  << " and was full " << m_full_counter
134  << " times in " << m_push_counter
135  << " push() calls and was empty " << m_empty_counter
136  << " times in " << m_pop_counter
137  << " pop() calls\n";
138 #endif
139  }
140 
145  void push(T value) {
146 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
147  ++m_push_counter;
148 #endif
149  if (m_max_size) {
150  while (size() >= m_max_size) {
151  std::unique_lock<std::mutex> lock{m_mutex};
152  m_space_available.wait_for(lock, max_wait, [this] {
153  return m_queue.size() < m_max_size;
154  });
155 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
156  ++m_full_counter;
157 #endif
158  }
159  }
160  std::lock_guard<std::mutex> lock{m_mutex};
161  m_queue.push(std::move(value));
162 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
163  if (m_largest_size < m_queue.size()) {
164  m_largest_size = m_queue.size();
165  }
166 #endif
167  m_data_available.notify_one();
168  }
169 
170  void wait_and_pop(T& value) {
171 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
172  ++m_pop_counter;
173 #endif
174  std::unique_lock<std::mutex> lock{m_mutex};
175 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
176  if (m_queue.empty()) {
177  ++m_empty_counter;
178  }
179 #endif
180  m_data_available.wait(lock, [this] {
181  return !m_queue.empty();
182  });
183  if (!m_queue.empty()) {
184  value = std::move(m_queue.front());
185  m_queue.pop();
186  lock.unlock();
187  if (m_max_size) {
188  m_space_available.notify_one();
189  }
190  }
191  }
192 
193  bool try_pop(T& value) {
194 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
195  ++m_pop_counter;
196 #endif
197  {
198  std::lock_guard<std::mutex> lock{m_mutex};
199  if (m_queue.empty()) {
200 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
201  ++m_empty_counter;
202 #endif
203  return false;
204  }
205  value = std::move(m_queue.front());
206  m_queue.pop();
207  }
208  if (m_max_size) {
209  m_space_available.notify_one();
210  }
211  return true;
212  }
213 
214  bool empty() const {
215  std::lock_guard<std::mutex> lock{m_mutex};
216  return m_queue.empty();
217  }
218 
219  size_t size() const {
220  std::lock_guard<std::mutex> lock{m_mutex};
221  return m_queue.size();
222  }
223 
224  }; // class Queue
225 
226  } // namespace thread
227 
228 } // namespace osmium
229 
230 #endif // OSMIUM_THREAD_QUEUE_HPP
size_t size() const
Definition: queue.hpp:219
Definition: queue.hpp:60
std::mutex m_mutex
Definition: queue.hpp:69
~Queue()
Definition: queue.hpp:128
static const std::chrono::milliseconds max_wait
Definition: queue.hpp:54
std::queue< T > m_queue
Definition: queue.hpp:71
const std::string m_name
Name of this queue (for debugging only).
Definition: queue.hpp:67
const size_t m_max_size
Definition: queue.hpp:64
Namespace for everything in the Osmium library.
Definition: assembler.hpp:63
std::condition_variable m_space_available
Used to signal producers when queue is not full.
Definition: queue.hpp:77
bool try_pop(T &value)
Definition: queue.hpp:193
void wait_and_pop(T &value)
Definition: queue.hpp:170
void push(T value)
Definition: queue.hpp:145
bool empty() const
Definition: queue.hpp:214
Queue(size_t max_size=0, const std::string &name="")
Definition: queue.hpp:110
std::condition_variable m_data_available
Used to signal consumers when data is available in the queue.
Definition: queue.hpp:74