Libosmium  2.10.0
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2016 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <cerrno>
37 #include <cstdlib>
38 #include <fcntl.h>
39 #include <future>
40 #include <memory>
41 #include <string>
42 #include <system_error>
43 #include <thread>
44 #include <utility>
45 
46 #ifndef _WIN32
47 # include <sys/wait.h>
48 #endif
49 
50 #ifndef _MSC_VER
51 # include <unistd.h>
52 #endif
53 
55 #include <osmium/io/detail/input_format.hpp>
56 #include <osmium/io/detail/read_thread.hpp>
57 #include <osmium/io/detail/read_write.hpp>
58 #include <osmium/io/detail/queue_util.hpp>
59 #include <osmium/io/error.hpp>
60 #include <osmium/io/file.hpp>
61 #include <osmium/io/header.hpp>
62 #include <osmium/memory/buffer.hpp>
64 #include <osmium/thread/util.hpp>
65 #include <osmium/util/config.hpp>
66 
67 namespace osmium {
68 
69  namespace io {
70 
71  namespace detail {
72 
73  inline size_t get_input_queue_size() noexcept {
74  const size_t n = osmium::config::get_max_queue_size("INPUT", 20);
75  return n > 2 ? n : 2;
76  }
77 
78  inline size_t get_osmdata_queue_size() noexcept {
79  const size_t n = osmium::config::get_max_queue_size("OSMDATA", 20);
80  return n > 2 ? n : 2;
81  }
82 
83  } // namespace detail
84 
91  class Reader {
92 
94 
95  enum class status {
96  okay = 0, // normal reading
97  error = 1, // some error occurred while reading
98  closed = 2, // close() called successfully after eof
99  eof = 3 // eof of file was reached without error
100  } m_status;
101 
103 
104  detail::future_string_queue_type m_input_queue;
105 
106  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
107 
108  osmium::io::detail::ReadThreadManager m_read_thread_manager;
109 
110  detail::future_buffer_queue_type m_osmdata_queue;
111  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
112 
113  std::future<osmium::io::Header> m_header_future;
115 
117 
118  size_t m_file_size;
119 
120  osmium::io::detail::reader_options m_options;
121 
123  m_options.read_which_entities = value;
124  }
125 
126  void set_option(osmium::io::read_meta value) noexcept {
127  m_options.read_metadata = value;
128  }
129 
130  // This function will run in a separate thread.
131  static void parser_thread(const osmium::io::File& file,
132  detail::future_string_queue_type& input_queue,
133  detail::future_buffer_queue_type& osmdata_queue,
134  std::promise<osmium::io::Header>&& header_promise,
135  osmium::io::detail::reader_options options) {
136  std::promise<osmium::io::Header> promise = std::move(header_promise);
137  const auto creator = detail::ParserFactory::instance().get_creator_function(file);
138  const auto parser = creator(input_queue, osmdata_queue, promise, options);
139  parser->parse();
140  }
141 
142 #ifndef _WIN32
143 
154  static int execute(const std::string& command, const std::string& filename, int* childpid) {
155  int pipefd[2];
156  if (pipe(pipefd) < 0) {
157  throw std::system_error(errno, std::system_category(), "opening pipe failed");
158  }
159  const pid_t pid = fork();
160  if (pid < 0) {
161  throw std::system_error(errno, std::system_category(), "fork failed");
162  }
163  if (pid == 0) { // child
164  // close all file descriptors except one end of the pipe
165  for (int i = 0; i < 32; ++i) {
166  if (i != pipefd[1]) {
167  ::close(i);
168  }
169  }
170  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
171  exit(1);
172  }
173 
174  ::open("/dev/null", O_RDONLY); // stdin
175  ::open("/dev/null", O_WRONLY); // stderr
176  // hack: -g switches off globbing in curl which allows [] to be used in file names
177  // this is important for XAPI URLs
178  // in theory this execute() function could be used for other commands, but it is
179  // only used for curl at the moment, so this is okay.
180  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
181  exit(1);
182  }
183  }
184  // parent
185  *childpid = pid;
186  ::close(pipefd[1]);
187  return pipefd[0];
188  }
189 #endif
190 
199  static int open_input_file_or_url(const std::string& filename, int* childpid) {
200  std::string protocol = filename.substr(0, filename.find_first_of(':'));
201  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
202 #ifndef _WIN32
203  return execute("curl", filename, childpid);
204 #else
205  throw io_error("Reading OSM files from the network currently not supported on Windows.");
206 #endif
207  } else {
208  return osmium::io::detail::open_for_reading(filename);
209  }
210  }
211 
212  public:
213 
236  template <typename... TArgs>
237  explicit Reader(const osmium::io::File& file, TArgs&&... args) :
238  m_file(file.check()),
239  m_status(status::okay),
240  m_childpid(0),
241  m_input_queue(detail::get_input_queue_size(), "raw_input"),
242  m_decompressor(m_file.buffer() ?
243  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), m_file.buffer(), m_file.buffer_size()) :
244  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), open_input_file_or_url(m_file.filename(), &m_childpid))),
245  m_read_thread_manager(*m_decompressor, m_input_queue),
246  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
247  m_osmdata_queue_wrapper(m_osmdata_queue),
248  m_header_future(),
249  m_header(),
250  m_thread(),
251  m_file_size(m_decompressor->file_size()) {
252 
253  (void)std::initializer_list<int>{
254  (set_option(args), 0)...
255  };
256 
257  std::promise<osmium::io::Header> header_promise;
258  m_header_future = header_promise.get_future();
259  m_thread = osmium::thread::thread_handler{parser_thread, std::ref(m_file), std::ref(m_input_queue), std::ref(m_osmdata_queue), std::move(header_promise), m_options};
260  }
261 
262  template <typename... TArgs>
263  explicit Reader(const std::string& filename, TArgs&&... args) :
264  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
265  }
266 
267  template <typename... TArgs>
268  explicit Reader(const char* filename, TArgs&&... args) :
269  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
270  }
271 
272  Reader(const Reader&) = delete;
273  Reader& operator=(const Reader&) = delete;
274 
275  Reader(Reader&&) = default;
276  Reader& operator=(Reader&&) = default;
277 
278  ~Reader() noexcept {
279  try {
280  close();
281  } catch (...) {
282  // Ignore any exceptions because destructor must not throw.
283  }
284  }
285 
294  void close() {
295  m_status = status::closed;
296 
297  m_read_thread_manager.stop();
298 
299  m_osmdata_queue_wrapper.drain();
300 
301  try {
302  m_read_thread_manager.close();
303  } catch (...) {
304  // Ignore any exceptions.
305  }
306 
307 #ifndef _WIN32
308  if (m_childpid) {
309  int status;
310  const pid_t pid = ::waitpid(m_childpid, &status, 0);
311 #pragma GCC diagnostic push
312 #pragma GCC diagnostic ignored "-Wold-style-cast"
313  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) {
314  throw std::system_error(errno, std::system_category(), "subprocess returned error");
315  }
316 #pragma GCC diagnostic pop
317  m_childpid = 0;
318  }
319 #endif
320  }
321 
329  if (m_status == status::error) {
330  throw io_error("Can not get header from reader when in status 'error'");
331  }
332 
333  try {
334  if (m_header_future.valid()) {
335  m_header = m_header_future.get();
336  if (m_options.read_which_entities == osmium::osm_entity_bits::nothing) {
337  m_status = status::eof;
338  }
339  }
340  } catch (...) {
341  close();
342  m_status = status::error;
343  throw;
344  }
345  return m_header;
346  }
347 
359  osmium::memory::Buffer buffer;
360 
361  if (m_status != status::okay ||
362  m_options.read_which_entities == osmium::osm_entity_bits::nothing) {
363  throw io_error("Can not read from reader when in status 'closed', 'eof', or 'error'");
364  }
365 
366  try {
367  // m_input_format.read() can return an invalid buffer to signal EOF,
368  // or a valid buffer with or without data. A valid buffer
369  // without data is not an error, it just means we have to
370  // keep getting the next buffer until there is one with data.
371  while (true) {
372  buffer = m_osmdata_queue_wrapper.pop();
373  if (detail::at_end_of_data(buffer)) {
374  m_status = status::eof;
375  m_read_thread_manager.close();
376  return buffer;
377  }
378  if (buffer.committed() > 0) {
379  return buffer;
380  }
381  }
382  } catch (...) {
383  close();
384  m_status = status::error;
385  throw;
386  }
387  }
388 
393  bool eof() const {
394  return m_status == status::eof || m_status == status::closed;
395  }
396 
401  size_t file_size() const noexcept {
402  return m_file_size;
403  }
404 
419  size_t offset() const noexcept {
420  return m_decompressor->offset();
421  }
422 
423  }; // class Reader
424 
433  template <typename... TArgs>
436 
437  Reader reader(std::forward<TArgs>(args)...);
438  while (osmium::memory::Buffer read_buffer = reader.read()) {
439  buffer.add_buffer(read_buffer);
440  buffer.commit();
441  }
442 
443  return buffer;
444  }
445 
446  } // namespace io
447 
448 } // namespace osmium
449 
450 #endif // OSMIUM_IO_READER_HPP
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:111
status
Definition: reader.hpp:95
osmium::memory::Buffer read()
Definition: reader.hpp:358
Reader(const osmium::io::File &file, TArgs &&... args)
Definition: reader.hpp:237
type
Definition: entity_bits.hpp:63
size_t file_size(int fd)
Definition: file.hpp:67
bool eof() const
Definition: reader.hpp:393
int m_childpid
Definition: reader.hpp:102
Reader(const char *filename, TArgs &&... args)
Definition: reader.hpp:268
std::future< osmium::io::Header > m_header_future
Definition: reader.hpp:113
Definition: reader_iterator.hpp:39
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:106
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:104
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:434
osmium::io::File m_file
Definition: reader.hpp:93
Definition: file.hpp:72
Namespace for everything in the Osmium library.
Definition: assembler.hpp:73
void set_option(osmium::osm_entity_bits::type value) noexcept
Definition: reader.hpp:122
Definition: attr.hpp:333
void add_buffer(const Buffer &buffer)
Definition: buffer.hpp:500
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:108
size_t file_size() const noexcept
Definition: reader.hpp:401
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:154
Reader(const std::string &filename, TArgs &&... args)
Definition: reader.hpp:263
Definition: reader.hpp:91
~Reader() noexcept
Definition: reader.hpp:278
Definition: error.hpp:44
osmium::io::Header header()
Definition: reader.hpp:328
void set_option(osmium::io::read_meta value) noexcept
Definition: reader.hpp:126
size_t committed() const noexcept
Definition: buffer.hpp:259
size_t get_max_queue_size(const char *queue_name, size_t default_value) noexcept
Definition: config.hpp:69
Definition: buffer.hpp:97
size_t m_file_size
Definition: reader.hpp:118
osmium::thread::thread_handler m_thread
Definition: reader.hpp:116
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:199
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:110
osmium::io::detail::reader_options m_options
Definition: reader.hpp:120
osmium::io::Header m_header
Definition: reader.hpp:114
read_meta
Definition: file_format.hpp:52
Definition: entity_bits.hpp:67
Definition: compression.hpp:138
size_t offset() const noexcept
Definition: reader.hpp:419
static void parser_thread(const osmium::io::File &file, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, osmium::io::detail::reader_options options)
Definition: reader.hpp:131
std::string get(const std::string &key, const std::string &default_value="") const noexcept
Definition: options.hpp:125
Definition: header.hpp:68
void close()
Definition: reader.hpp:294
size_t commit()
Definition: buffer.hpp:354
Definition: util.hpp:85