Libosmium  2.17.3
Fast and flexible C++ library for working with OpenStreetMap data
writer.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_WRITER_HPP
2 #define OSMIUM_IO_WRITER_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2022 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
37 #include <osmium/io/detail/output_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_write.hpp>
40 #include <osmium/io/detail/write_thread.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
45 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 #include <osmium/version.hpp>
50 
51 #include <cassert>
52 #include <cstddef>
53 #include <exception>
54 #include <functional>
55 #include <future>
56 #include <initializer_list>
57 #include <memory>
58 #include <string>
59 #include <utility>
60 
61 namespace osmium {
62 
63  namespace memory {
64  class Item;
65  } //namespace memory
66 
67  namespace io {
68 
69  namespace detail {
70 
71  inline std::size_t get_output_queue_size() noexcept {
72  return osmium::config::get_max_queue_size("OUTPUT", 20);
73  }
74 
75  } // namespace detail
76 
100  class Writer {
101 
102  enum {
103  default_buffer_size = 10UL * 1024UL * 1024UL
104  };
105 
107 
108  detail::future_string_queue_type m_output_queue{detail::get_output_queue_size(), "raw_output"};
109 
110  std::unique_ptr<osmium::io::detail::OutputFormat> m_output{nullptr};
111 
112  osmium::memory::Buffer m_buffer{};
113 
115 
117 
118  std::future<std::size_t> m_write_future{};
119 
121 
122  // Checking the m_write_future is much more expensive then checking
123  // one atomic bool, so we set this bool in the write_thread when
124  // the writer should check the future...
125  std::atomic_bool m_notification{false};
126 
127  enum class status {
128  okay = 0, // normal writing
129  error = 1, // some error occurred while writing
130  closed = 2 // close() called successfully
132 
133  // Has the header already bin written to the file?
134  bool m_header_written = false;
135 
136  // This function will run in a separate thread.
137  static void write_thread(detail::future_string_queue_type& output_queue,
138  std::unique_ptr<osmium::io::Compressor>&& compressor,
139  std::promise<std::size_t>&& write_promise,
140  std::atomic_bool* notification) {
141  detail::WriteThread write_thread{output_queue,
142  std::move(compressor),
143  std::move(write_promise),
144  notification};
145  write_thread();
146  }
147 
148  void write_header() {
149  if (m_header.get("generator").empty()) {
150  m_header.set("generator", "libosmium/" LIBOSMIUM_VERSION_STRING);
151  }
152 
153  m_output->write_header(m_header);
154 
155  m_header_written = true;
156  }
157 
158  void do_write(osmium::memory::Buffer&& buffer) {
159  if (!m_header_written) {
160  write_header();
161  }
162  if (buffer && buffer.committed() > 0) {
163  m_output->write_buffer(std::move(buffer));
164  }
165  }
166 
167  void do_flush() {
168  if (!m_header_written) {
169  write_header();
170  }
171  if (m_notification) {
173  }
174  if (m_buffer && m_buffer.committed() > 0) {
175  osmium::memory::Buffer buffer{m_buffer_size,
176  osmium::memory::Buffer::auto_grow::no};
177  using std::swap;
178  swap(m_buffer, buffer);
179 
180  m_output->write_buffer(std::move(buffer));
181  }
182  }
183 
184  template <typename TFunction, typename... TArgs>
185  void ensure_cleanup(TFunction func, TArgs&&... args) {
186  if (m_status != status::okay) {
187  throw io_error("Can not write to writer when in status 'closed' or 'error'");
188  }
189 
190  try {
191  func(std::forward<TArgs>(args)...);
192  } catch (...) {
194  detail::add_to_queue(m_output_queue, std::current_exception());
195  detail::add_end_of_data_to_queue(m_output_queue);
196  throw;
197  }
198  }
199 
200  struct options_type {
205  };
206 
207  static void set_option(options_type& options, osmium::thread::Pool& pool) {
208  options.pool = &pool;
209  }
210 
211  static void set_option(options_type& options, const osmium::io::Header& header) {
212  options.header = header;
213  }
214 
215  static void set_option(options_type& options, overwrite value) {
216  options.allow_overwrite = value;
217  }
218 
219  static void set_option(options_type& options, fsync value) {
220  options.sync = value;
221  }
222 
223  void do_close() {
224  if (m_status == status::okay) {
225  ensure_cleanup([&]() {
226  do_write(std::move(m_buffer));
227  m_output->write_end();
229  detail::add_end_of_data_to_queue(m_output_queue);
230  });
231  }
232  }
233 
234  public:
235 
266  template <typename... TArgs>
267  explicit Writer(const osmium::io::File& file, TArgs&&... args) :
268  m_file(file.check()) {
269  assert(!m_file.buffer()); // XXX can't handle pseudo-files
270 
271  options_type options;
272  (void)std::initializer_list<int>{(set_option(options, args), 0)...};
273 
274  if (!options.pool) {
276  }
277 
278  m_header = options.header;
279 
280  m_output = osmium::io::detail::OutputFormatFactory::instance().create_output(*options.pool, m_file, m_output_queue);
281 
282  std::unique_ptr<osmium::io::Compressor> compressor =
284  osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
285  options.sync);
286 
287  std::promise<std::size_t> write_promise;
288  m_write_future = write_promise.get_future();
289  m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise), &m_notification};
290  }
291 
292  template <typename... TArgs>
293  explicit Writer(const std::string& filename, TArgs&&... args) :
294  Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
295  }
296 
297  template <typename... TArgs>
298  explicit Writer(const char* filename, TArgs&&... args) :
299  Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
300  }
301 
302  Writer(const Writer&) = delete;
303  Writer& operator=(const Writer&) = delete;
304 
305  Writer(Writer&&) = delete;
306  Writer& operator=(Writer&&) = delete;
307 
308  ~Writer() noexcept {
309  try {
310  do_close();
311  } catch (...) {
312  // Ignore any exceptions because destructor must not throw.
313  }
314  }
315 
319  size_t buffer_size() const noexcept {
320  return m_buffer_size;
321  }
322 
327  void set_buffer_size(size_t size) noexcept {
328  m_buffer_size = size;
329  }
330 
337  void set_header(const osmium::io::Header& header) {
338  m_header = header;
339  }
340 
348  void flush() {
349  ensure_cleanup([&]() {
350  do_flush();
351  });
352  }
353 
362  void operator()(osmium::memory::Buffer&& buffer) {
363  ensure_cleanup([&]() {
364  do_flush();
365  do_write(std::move(buffer));
366  });
367  }
368 
376  void operator()(const osmium::memory::Item& item) {
377  ensure_cleanup([&]() {
378  if (!m_buffer) {
379  m_buffer = osmium::memory::Buffer{m_buffer_size,
380  osmium::memory::Buffer::auto_grow::no};
381  }
382  try {
383  m_buffer.push_back(item);
384  } catch (const osmium::buffer_is_full&) {
385  do_flush();
386  m_buffer.push_back(item);
387  }
388  });
389  }
390 
402  std::size_t close() {
403  do_close();
404 
405  if (m_write_future.valid()) {
406  return m_write_future.get();
407  }
408 
409  return 0;
410  }
411 
412  }; // class Writer
413 
414  } // namespace io
415 
416 } // namespace osmium
417 
418 #endif // OSMIUM_IO_WRITER_HPP
std::unique_ptr< osmium::io::Compressor > create_compressor(const osmium::io::file_compression compression, TArgs &&... args) const
Definition: compression.hpp:211
static CompressionFactory & instance()
Definition: compression.hpp:191
Definition: file.hpp:72
File & filename(const std::string &filename)
Definition: file.hpp:312
const char * buffer() const noexcept
Definition: file.hpp:143
file_compression compression() const noexcept
Definition: file.hpp:294
Definition: header.hpp:68
Definition: writer.hpp:100
std::size_t close()
Definition: writer.hpp:402
@ default_buffer_size
Definition: writer.hpp:103
void write_header()
Definition: writer.hpp:148
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:219
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:211
static void set_option(options_type &options, osmium::thread::Pool &pool)
Definition: writer.hpp:207
size_t m_buffer_size
Definition: writer.hpp:116
Writer(Writer &&)=delete
Writer & operator=(Writer &&)=delete
void do_flush()
Definition: writer.hpp:167
Writer & operator=(const Writer &)=delete
Writer(const osmium::io::File &file, TArgs &&... args)
Definition: writer.hpp:267
Writer(const std::string &filename, TArgs &&... args)
Definition: writer.hpp:293
size_t buffer_size() const noexcept
Definition: writer.hpp:319
status
Definition: writer.hpp:127
osmium::thread::thread_handler m_thread
Definition: writer.hpp:120
void flush()
Definition: writer.hpp:348
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:362
bool m_header_written
Definition: writer.hpp:134
osmium::io::Header m_header
Definition: writer.hpp:114
osmium::memory::Buffer m_buffer
Definition: writer.hpp:112
std::atomic_bool m_notification
Definition: writer.hpp:125
~Writer() noexcept
Definition: writer.hpp:308
std::unique_ptr< osmium::io::detail::OutputFormat > m_output
Definition: writer.hpp:110
void set_header(const osmium::io::Header &header)
Definition: writer.hpp:337
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:215
std::future< std::size_t > m_write_future
Definition: writer.hpp:118
void do_close()
Definition: writer.hpp:223
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:158
detail::future_string_queue_type m_output_queue
Definition: writer.hpp:108
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:327
Writer(const Writer &)=delete
osmium::io::File m_file
Definition: writer.hpp:106
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:376
void ensure_cleanup(TFunction func, TArgs &&... args)
Definition: writer.hpp:185
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< std::size_t > &&write_promise, std::atomic_bool *notification)
Definition: writer.hpp:137
Writer(const char *filename, TArgs &&... args)
Definition: writer.hpp:298
enum osmium::io::Writer::status m_status
Definition: item.hpp:105
Definition: pool.hpp:90
static Pool & default_instance()
Definition: pool.hpp:186
Definition: util.hpp:85
Definition: attr.hpp:342
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
fsync
Definition: writer_options.hpp:51
overwrite
Definition: writer_options.hpp:43
void check_for_exception(std::future< T > &future)
Definition: util.hpp:55
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:555
Definition: writer.hpp:200
overwrite allow_overwrite
Definition: writer.hpp:202
osmium::thread::Pool * pool
Definition: writer.hpp:204
fsync sync
Definition: writer.hpp:203
osmium::io::Header header
Definition: writer.hpp:201
Definition: error.hpp:46
#define LIBOSMIUM_VERSION_STRING
Definition: version.hpp:40