Libosmium  2.7.1
Fast and flexible C++ library for working with OpenStreetMap data
writer.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_WRITER_HPP
2 #define OSMIUM_IO_WRITER_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2016 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <cassert>
37 #include <future>
38 #include <memory>
39 #include <stdexcept>
40 #include <string>
41 #include <thread>
42 #include <utility>
43 
45 #include <osmium/io/detail/output_format.hpp>
46 #include <osmium/io/detail/queue_util.hpp>
47 #include <osmium/io/detail/read_write.hpp>
48 #include <osmium/io/detail/write_thread.hpp>
49 #include <osmium/io/error.hpp>
50 #include <osmium/io/file.hpp>
51 #include <osmium/io/header.hpp>
53 #include <osmium/memory/buffer.hpp>
54 #include <osmium/thread/util.hpp>
55 
56 namespace osmium {
57 
58  namespace io {
59 
83  class Writer {
84 
85  static constexpr size_t default_buffer_size = 10 * 1024 * 1024;
86 
88 
89  detail::future_string_queue_type m_output_queue;
90 
91  std::unique_ptr<osmium::io::detail::OutputFormat> m_output;
92 
94 
95  size_t m_buffer_size;
96 
97  std::future<bool> m_write_future;
98 
100 
101  enum class status {
102  okay = 0, // normal writing
103  error = 1, // some error occurred while writing
104  closed = 2 // close() called successfully
105  } m_status;
106 
107  // This function will run in a separate thread.
108  static void write_thread(detail::future_string_queue_type& output_queue,
109  std::unique_ptr<osmium::io::Compressor>&& compressor,
110  std::promise<bool>&& write_promise) {
111  detail::WriteThread write_thread{output_queue,
112  std::move(compressor),
113  std::move(write_promise)};
114  write_thread();
115  }
116 
118  if (buffer && buffer.committed() > 0) {
119  m_output->write_buffer(std::move(buffer));
120  }
121  }
122 
123  void do_flush() {
124  osmium::thread::check_for_exception(m_write_future);
125  if (m_buffer && m_buffer.committed() > 0) {
128  using std::swap;
129  swap(m_buffer, buffer);
130 
131  m_output->write_buffer(std::move(buffer));
132  }
133  }
134 
135  template <typename TFunction, typename... TArgs>
136  void ensure_cleanup(TFunction func, TArgs&&... args) {
137  if (m_status != status::okay) {
138  throw io_error("Can not write to writer when in status 'closed' or 'error'");
139  }
140 
141  try {
142  func(std::forward<TArgs>(args)...);
143  } catch (...) {
145  detail::add_to_queue(m_output_queue, std::current_exception());
146  detail::add_end_of_data_to_queue(m_output_queue);
147  throw;
148  }
149  }
150 
151  struct options_type {
153  overwrite allow_overwrite = overwrite::no;
154  fsync sync = fsync::no;
155  };
156 
157  static void set_option(options_type& options, const osmium::io::Header& header) {
158  options.header = header;
159  }
160 
161  static void set_option(options_type& options, overwrite value) {
162  options.allow_overwrite = value;
163  }
164 
165  static void set_option(options_type& options, fsync value) {
166  options.sync = value;
167  }
168 
169  public:
170 
194  template <typename... TArgs>
195  explicit Writer(const osmium::io::File& file, TArgs&&... args) :
196  m_file(file.check()),
197  m_output_queue(20, "raw_output"), // XXX
198  m_output(osmium::io::detail::OutputFormatFactory::instance().create_output(m_file, m_output_queue)),
199  m_buffer(),
200  m_buffer_size(default_buffer_size),
201  m_write_future(),
202  m_thread(),
203  m_status(status::okay) {
204  assert(!m_file.buffer()); // XXX can't handle pseudo-files
205 
206  options_type options;
207  (void)std::initializer_list<int>{
208  (set_option(options, args), 0)...
209  };
210 
211  std::unique_ptr<osmium::io::Compressor> compressor =
213  osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
214  options.sync);
215 
216  std::promise<bool> write_promise;
217  m_write_future = write_promise.get_future();
218  m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise)};
219 
220  ensure_cleanup([&](){
221  m_output->write_header(options.header);
222  });
223  }
224 
225  template <typename... TArgs>
226  explicit Writer(const std::string& filename, TArgs&&... args) :
227  Writer(osmium::io::File(filename), std::forward<TArgs>(args)...) {
228  }
229 
230  template <typename... TArgs>
231  explicit Writer(const char* filename, TArgs&&... args) :
232  Writer(osmium::io::File(filename), std::forward<TArgs>(args)...) {
233  }
234 
235  Writer(const Writer&) = delete;
236  Writer& operator=(const Writer&) = delete;
237 
238  Writer(Writer&&) = default;
239  Writer& operator=(Writer&&) = default;
240 
241  ~Writer() noexcept {
242  try {
243  close();
244  } catch (...) {
245  // Ignore any exceptions because destructor must not throw.
246  }
247  }
248 
252  size_t buffer_size() const noexcept {
253  return m_buffer_size;
254  }
255 
260  void set_buffer_size(size_t size) noexcept {
261  m_buffer_size = size;
262  }
263 
271  void flush() {
272  ensure_cleanup([&](){
273  do_flush();
274  });
275  }
276 
286  ensure_cleanup([&](){
287  do_flush();
288  do_write(std::move(buffer));
289  });
290  }
291 
299  void operator()(const osmium::memory::Item& item) {
300  ensure_cleanup([&](){
301  if (!m_buffer) {
304  }
305  try {
306  m_buffer.push_back(item);
307  } catch (osmium::buffer_is_full&) {
308  do_flush();
309  m_buffer.push_back(item);
310  }
311  });
312  }
313 
323  void close() {
324  if (m_status == status::okay) {
325  ensure_cleanup([&](){
326  do_write(std::move(m_buffer));
327  m_output->write_end();
329  detail::add_end_of_data_to_queue(m_output_queue);
330  });
331  }
332 
333  if (m_write_future.valid()) {
334  m_write_future.get();
335  }
336  }
337 
338  }; // class Writer
339 
340  } // namespace io
341 
342 } // namespace osmium
343 
344 #endif // OSMIUM_IO_WRITER_HPP
fsync sync
Definition: writer.hpp:154
~Writer() noexcept
Definition: writer.hpp:241
void ensure_cleanup(TFunction func, TArgs &&...args)
Definition: writer.hpp:136
Definition: writer.hpp:151
static CompressionFactory & instance()
Definition: compression.hpp:151
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:117
void do_flush()
Definition: writer.hpp:123
Definition: reader_iterator.hpp:39
void swap(Buffer &lhs, Buffer &rhs)
Definition: buffer.hpp:731
osmium::thread::thread_handler m_thread
Definition: writer.hpp:99
osmium::io::Header header
Definition: writer.hpp:152
osmium::memory::Buffer m_buffer
Definition: writer.hpp:93
std::unique_ptr< osmium::io::detail::OutputFormat > m_output
Definition: writer.hpp:91
enum osmium::io::Writer::status m_status
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:260
Definition: file.hpp:74
Definition: item.hpp:97
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< bool > &&write_promise)
Definition: writer.hpp:108
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:285
Namespace for everything in the Osmium library.
Definition: assembler.hpp:66
status
Definition: writer.hpp:101
Definition: attr.hpp:298
size_t buffer_size() const noexcept
Definition: writer.hpp:252
Writer(const std::string &filename, TArgs &&...args)
Definition: writer.hpp:226
fsync
Definition: writer_options.hpp:51
static constexpr size_t default_buffer_size
Definition: writer.hpp:85
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:165
void push_back(const osmium::memory::Item &item)
Definition: buffer.hpp:488
Definition: error.hpp:44
size_t m_buffer_size
Definition: writer.hpp:95
void close()
Definition: writer.hpp:323
void flush()
Definition: writer.hpp:271
osmium::io::File m_file
Definition: writer.hpp:87
std::unique_ptr< osmium::io::Compressor > create_compressor(osmium::io::file_compression compression, TArgs &&...args)
Definition: compression.hpp:171
size_t committed() const noexcept
Definition: buffer.hpp:241
Definition: buffer.hpp:97
Definition: buffer.hpp:58
const char * buffer() const noexcept
Definition: file.hpp:158
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:161
Definition: writer.hpp:83
std::future< bool > m_write_future
Definition: writer.hpp:97
void check_for_exception(std::future< T > &future)
Definition: util.hpp:53
Writer(const osmium::io::File &file, TArgs &&...args)
Definition: writer.hpp:195
Definition: header.hpp:49
overwrite allow_overwrite
Definition: writer.hpp:153
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:157
detail::future_string_queue_type m_output_queue
Definition: writer.hpp:89
file_compression compression() const noexcept
Definition: file.hpp:291
File & filename(const std::string &filename)
Definition: file.hpp:309
Writer(const char *filename, TArgs &&...args)
Definition: writer.hpp:231
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:299
Definition: util.hpp:83
overwrite
Definition: writer_options.hpp:43
Writer & operator=(const Writer &)=delete