Libosmium  2.14.0
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2018 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
37 #include <osmium/io/detail/input_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_thread.hpp>
40 #include <osmium/io/detail/read_write.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
44 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 
50 #include <cerrno>
51 #include <cstdlib>
52 #include <fcntl.h>
53 #include <future>
54 #include <memory>
55 #include <string>
56 #include <system_error>
57 #include <thread>
58 #include <utility>
59 
60 #ifndef _WIN32
61 # include <sys/wait.h>
62 #endif
63 
64 #ifndef _MSC_VER
65 # include <unistd.h>
66 #endif
67 
68 namespace osmium {
69 
70  namespace io {
71 
72  namespace detail {
73 
74  inline std::size_t get_input_queue_size() noexcept {
75  const std::size_t n = osmium::config::get_max_queue_size("INPUT", 20);
76  return n > 2 ? n : 2;
77  }
78 
79  inline std::size_t get_osmdata_queue_size() noexcept {
80  const std::size_t n = osmium::config::get_max_queue_size("OSMDATA", 20);
81  return n > 2 ? n : 2;
82  }
83 
84  } // namespace detail
85 
92  class Reader {
93 
95 
96  osmium::thread::Pool* m_pool = nullptr;
97 
98  detail::ParserFactory::create_parser_type m_creator;
99 
100  enum class status {
101  okay = 0, // normal reading
102  error = 1, // some error occurred while reading
103  closed = 2, // close() called
104  eof = 3 // eof of file was reached without error
105  } m_status = status::okay;
106 
107  int m_childpid = 0;
108 
109  detail::future_string_queue_type m_input_queue;
110 
111  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
112 
113  osmium::io::detail::ReadThreadManager m_read_thread_manager;
114 
115  detail::future_buffer_queue_type m_osmdata_queue;
116  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
117 
118  std::future<osmium::io::Header> m_header_future{};
119  osmium::io::Header m_header{};
120 
122 
123  std::size_t m_file_size = 0;
124 
127 
128  void set_option(osmium::thread::Pool& pool) noexcept {
129  m_pool = &pool;
130  }
131 
133  m_read_which_entities = value;
134  }
135 
136  void set_option(osmium::io::read_meta value) noexcept {
137  m_read_metadata = value;
138  }
139 
140  // This function will run in a separate thread.
142  const detail::ParserFactory::create_parser_type& creator,
143  detail::future_string_queue_type& input_queue,
144  detail::future_buffer_queue_type& osmdata_queue,
145  std::promise<osmium::io::Header>&& header_promise,
146  osmium::osm_entity_bits::type read_which_entities,
147  osmium::io::read_meta read_metadata) {
148  std::promise<osmium::io::Header> promise{std::move(header_promise)};
149  osmium::io::detail::parser_arguments args = {
150  pool,
151  input_queue,
152  osmdata_queue,
153  promise,
154  read_which_entities,
155  read_metadata
156  };
157  creator(args)->parse();
158  }
159 
160 #ifndef _WIN32
161 
172  static int execute(const std::string& command, const std::string& filename, int* childpid) {
173  int pipefd[2];
174  if (pipe(pipefd) < 0) {
175  throw std::system_error{errno, std::system_category(), "opening pipe failed"};
176  }
177  const pid_t pid = fork();
178  if (pid < 0) {
179  throw std::system_error{errno, std::system_category(), "fork failed"};
180  }
181  if (pid == 0) { // child
182  // close all file descriptors except one end of the pipe
183  for (int i = 0; i < 32; ++i) {
184  if (i != pipefd[1]) {
185  ::close(i);
186  }
187  }
188  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
189  exit(1);
190  }
191 
192  ::open("/dev/null", O_RDONLY); // stdin
193  ::open("/dev/null", O_WRONLY); // stderr
194  // hack: -g switches off globbing in curl which allows [] to be used in file names
195  // this is important for XAPI URLs
196  // in theory this execute() function could be used for other commands, but it is
197  // only used for curl at the moment, so this is okay.
198  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
199  exit(1);
200  }
201  }
202  // parent
203  *childpid = pid;
204  ::close(pipefd[1]);
205  return pipefd[0];
206  }
207 #endif
208 
217  static int open_input_file_or_url(const std::string& filename, int* childpid) {
218  const std::string protocol{filename.substr(0, filename.find_first_of(':'))};
219  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
220 #ifndef _WIN32
221  return execute("curl", filename, childpid);
222 #else
223  throw io_error{"Reading OSM files from the network currently not supported on Windows."};
224 #endif
225  }
226  return osmium::io::detail::open_for_reading(filename);
227  }
228 
229  public:
230 
253  template <typename... TArgs>
254  explicit Reader(const osmium::io::File& file, TArgs&&... args) :
255  m_file(file.check()),
256  m_creator(detail::ParserFactory::instance().get_creator_function(m_file)),
257  m_input_queue(detail::get_input_queue_size(), "raw_input"),
258  m_decompressor(m_file.buffer() ?
259  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), m_file.buffer(), m_file.buffer_size()) :
260  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), open_input_file_or_url(m_file.filename(), &m_childpid))),
261  m_read_thread_manager(*m_decompressor, m_input_queue),
262  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
263  m_osmdata_queue_wrapper(m_osmdata_queue),
264  m_file_size(m_decompressor->file_size()) {
265 
266  (void)std::initializer_list<int>{
267  (set_option(args), 0)...
268  };
269 
270  if (!m_pool) {
271  m_pool = &thread::Pool::default_instance();
272  }
273 
274  std::promise<osmium::io::Header> header_promise;
275  m_header_future = header_promise.get_future();
276  m_thread = osmium::thread::thread_handler{parser_thread, std::ref(*m_pool), std::ref(m_creator), std::ref(m_input_queue), std::ref(m_osmdata_queue), std::move(header_promise), m_read_which_entities, m_read_metadata};
277  }
278 
279  template <typename... TArgs>
280  explicit Reader(const std::string& filename, TArgs&&... args) :
281  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
282  }
283 
284  template <typename... TArgs>
285  explicit Reader(const char* filename, TArgs&&... args) :
286  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
287  }
288 
289  Reader(const Reader&) = delete;
290  Reader& operator=(const Reader&) = delete;
291 
292  Reader(Reader&&) = delete;
293  Reader& operator=(Reader&&) = delete;
294 
295  ~Reader() noexcept {
296  try {
297  close();
298  } catch (...) {
299  // Ignore any exceptions because destructor must not throw.
300  }
301  }
302 
311  void close() {
312  m_status = status::closed;
313 
314  m_read_thread_manager.stop();
315 
316  m_osmdata_queue_wrapper.drain();
317 
318  try {
319  m_read_thread_manager.close();
320  } catch (...) {
321  // Ignore any exceptions.
322  }
323 
324 #ifndef _WIN32
325  if (m_childpid) {
326  int status;
327  const pid_t pid = ::waitpid(m_childpid, &status, 0);
328 #pragma GCC diagnostic push
329 #pragma GCC diagnostic ignored "-Wold-style-cast"
330  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) { // NOLINT(hicpp-signed-bitwise)
331  throw std::system_error{errno, std::system_category(), "subprocess returned error"};
332  }
333 #pragma GCC diagnostic pop
334  m_childpid = 0;
335  }
336 #endif
337  }
338 
346  if (m_status == status::error) {
347  throw io_error{"Can not get header from reader when in status 'error'"};
348  }
349 
350  try {
351  if (m_header_future.valid()) {
352  m_header = m_header_future.get();
353  }
354  } catch (...) {
355  close();
356  m_status = status::error;
357  throw;
358  }
359 
360  return m_header;
361  }
362 
372  osmium::memory::Buffer buffer;
373 
374  if (m_status != status::okay) {
375  throw io_error{"Can not read from reader when in status 'closed', 'eof', or 'error'"};
376  }
377 
378  if (m_read_which_entities == osmium::osm_entity_bits::nothing) {
379  m_status = status::eof;
380  return buffer;
381  }
382 
383  try {
384  // m_input_format.read() can return an invalid buffer to signal EOF,
385  // or a valid buffer with or without data. A valid buffer
386  // without data is not an error, it just means we have to
387  // keep getting the next buffer until there is one with data.
388  while (true) {
389  buffer = m_osmdata_queue_wrapper.pop();
390  if (detail::at_end_of_data(buffer)) {
391  m_status = status::eof;
392  m_read_thread_manager.close();
393  return buffer;
394  }
395  if (buffer.committed() > 0) {
396  return buffer;
397  }
398  }
399  } catch (...) {
400  close();
401  m_status = status::error;
402  throw;
403  }
404  }
405 
410  bool eof() const {
411  return m_status == status::eof || m_status == status::closed;
412  }
413 
418  std::size_t file_size() const noexcept {
419  return m_file_size;
420  }
421 
436  std::size_t offset() const noexcept {
437  return m_decompressor->offset();
438  }
439 
440  }; // class Reader
441 
450  template <typename... TArgs>
453 
454  Reader reader{std::forward<TArgs>(args)...};
455  while (auto read_buffer = reader.read()) {
456  buffer.add_buffer(read_buffer);
457  buffer.commit();
458  }
459 
460  return buffer;
461  }
462 
463  } // namespace io
464 
465 } // namespace osmium
466 
467 #endif // OSMIUM_IO_READER_HPP
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:116
status
Definition: reader.hpp:100
osmium::memory::Buffer read()
Definition: reader.hpp:371
Reader(const osmium::io::File &file, TArgs &&... args)
Definition: reader.hpp:254
type
Definition: entity_bits.hpp:63
std::size_t get_max_queue_size(const char *queue_name, std::size_t default_value) noexcept
Definition: config.hpp:83
std::size_t file_size(int fd)
Definition: file.hpp:109
std::size_t file_size() const noexcept
Definition: reader.hpp:418
bool eof() const
Definition: reader.hpp:410
static Pool & default_instance()
Definition: pool.hpp:174
Reader(const char *filename, TArgs &&... args)
Definition: reader.hpp:285
Definition: location.hpp:550
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:111
std::size_t committed() const noexcept
Definition: buffer.hpp:261
object or changeset
Definition: entity_bits.hpp:76
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:109
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:451
osmium::io::File m_file
Definition: reader.hpp:94
Definition: file.hpp:72
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
void set_option(osmium::osm_entity_bits::type value) noexcept
Definition: reader.hpp:132
Definition: attr.hpp:333
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:113
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:172
static void parser_thread(osmium::thread::Pool &pool, const detail::ParserFactory::create_parser_type &creator, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, osmium::osm_entity_bits::type read_which_entities, osmium::io::read_meta read_metadata)
Definition: reader.hpp:141
Reader(const std::string &filename, TArgs &&... args)
Definition: reader.hpp:280
Definition: reader.hpp:92
~Reader() noexcept
Definition: reader.hpp:295
Definition: error.hpp:44
std::size_t offset() const noexcept
Definition: reader.hpp:436
Definition: pool.hpp:89
osmium::io::Header header()
Definition: reader.hpp:345
void set_option(osmium::io::read_meta value) noexcept
Definition: reader.hpp:136
void set_option(osmium::thread::Pool &pool) noexcept
Definition: reader.hpp:128
Definition: buffer.hpp:97
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:217
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:115
read_meta
Definition: file_format.hpp:54
Definition: entity_bits.hpp:67
Definition: compression.hpp:141
detail::ParserFactory::create_parser_type m_creator
Definition: reader.hpp:98
Definition: header.hpp:68
void close()
Definition: reader.hpp:311
Definition: util.hpp:85