bitz-server  2.0.0
async_log_helper.h
1 //
2 // Copyright(c) 2015 Gabi Melman.
3 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
4 //
5 
6 // async log helper :
7 // Process logs asynchronously using a back thread.
8 //
9 // If the internal queue of log messages reaches its max size,
10 // then the client call will block until there is more room.
11 //
12 
13 #pragma once
14 
15 #include "../common.h"
16 #include "../details/log_msg.h"
17 #include "../details/mpmc_blocking_q.h"
18 #include "../details/os.h"
19 #include "../formatter.h"
20 #include "../sinks/sink.h"
21 
22 #include <chrono>
23 #include <condition_variable>
24 #include <exception>
25 #include <functional>
26 #include <memory>
27 #include <string>
28 #include <thread>
29 #include <utility>
30 #include <vector>
31 
32 namespace spdlog {
33 namespace details {
34 
36 {
37  // Async msg to move to/from the queue
38  // Movable only. should never be copied
39  enum class async_msg_type
40  {
41  log,
42  flush,
43  terminate
44  };
45 
46  struct async_msg
47  {
48  level::level_enum level;
49  log_clock::time_point time;
50  size_t thread_id;
51  std::string txt;
52  async_msg_type msg_type;
53  size_t msg_id;
54 
55  async_msg() = default;
56  ~async_msg() = default;
57 
58  explicit async_msg(async_msg_type m_type)
59  : level(level::info)
60  , thread_id(0)
61  , msg_type(m_type)
62  , msg_id(0)
63  {
64  }
65 
66  async_msg(async_msg &&other) = default;
67  async_msg &operator=(async_msg &&other) = default;
68 
69  // never copy or assign. should only be moved..
70  async_msg(const async_msg &) = delete;
71  async_msg &operator=(const async_msg &other) = delete;
72 
73  // construct from log_msg
74  explicit async_msg(const details::log_msg &m)
75  : level(m.level)
76  , time(m.time)
77  , thread_id(m.thread_id)
78  , txt(m.raw.data(), m.raw.size())
79  , msg_type(async_msg_type::log)
80  , msg_id(m.msg_id)
81  {
82  }
83 
84  // copy into log_msg
85  void fill_log_msg(log_msg &msg, std::string *logger_name)
86  {
87  msg.logger_name = logger_name;
88  msg.level = level;
89  msg.time = time;
90  msg.thread_id = thread_id;
91  msg.raw.clear();
92  msg.raw << txt;
93  msg.msg_id = msg_id;
94  }
95  };
96 
97 public:
98  using item_type = async_msg;
100 
101  using clock = std::chrono::steady_clock;
102 
103  async_log_helper(std::string logger_name, formatter_ptr formatter, std::vector<sink_ptr> sinks, size_t queue_size,
104  const log_err_handler err_handler, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
105  std::function<void()> worker_warmup_cb = nullptr,
106  const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(),
107  std::function<void()> worker_teardown_cb = nullptr);
108 
109  void log(const details::log_msg &msg);
110 
111  // stop logging and join the back thread
112  ~async_log_helper();
113 
114  async_log_helper(const async_log_helper &) = delete;
115  async_log_helper &operator=(const async_log_helper &) = delete;
116 
117  void set_formatter(formatter_ptr msg_formatter);
118 
119  void flush();
120 
121  void set_error_handler(spdlog::log_err_handler err_handler);
122 
123 private:
124  std::string _logger_name;
125  formatter_ptr _formatter;
126  std::vector<std::shared_ptr<sinks::sink>> _sinks;
127 
128  // queue of messages to log
129  q_type _q;
130 
131  log_err_handler _err_handler;
132 
133  std::chrono::time_point<log_clock> _last_flush;
134 
135  // overflow policy
136  const async_overflow_policy _overflow_policy;
137 
138  // worker thread warmup callback - one can set thread priority, affinity, etc
139  const std::function<void()> _worker_warmup_cb;
140 
141  // auto periodic sink flush parameter
142  const std::chrono::milliseconds _flush_interval_ms;
143 
144  // worker thread teardown callback
145  const std::function<void()> _worker_teardown_cb;
146 
147  std::mutex null_mutex_;
148  // null_mutex null_mutex_;
149  std::condition_variable_any not_empty_cv_;
150  std::condition_variable_any not_full_cv_;
151 
152  // worker thread
153  std::thread _worker_thread;
154 
155  void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy);
156 
157  // worker thread main loop
158  void worker_loop();
159 
160  // dequeue next message from the queue and process it.
161  // return false if termination of the queue is required
162  bool process_next_msg();
163 
164  void handle_flush_interval();
165 
166  void flush_sinks();
167 };
168 } // namespace details
169 } // namespace spdlog
170 
172 // async_sink class implementation
174 inline spdlog::details::async_log_helper::async_log_helper(std::string logger_name, formatter_ptr formatter, std::vector<sink_ptr> sinks,
175  size_t queue_size, log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function<void()> worker_warmup_cb,
176  const std::chrono::milliseconds &flush_interval_ms, std::function<void()> worker_teardown_cb)
177  : _logger_name(std::move(logger_name))
178  , _formatter(std::move(formatter))
179  , _sinks(std::move(sinks))
180  , _q(queue_size)
181  , _err_handler(std::move(err_handler))
182  , _last_flush(os::now())
183  , _overflow_policy(overflow_policy)
184  , _worker_warmup_cb(std::move(worker_warmup_cb))
185  , _flush_interval_ms(flush_interval_ms)
186  , _worker_teardown_cb(std::move(worker_teardown_cb))
187 {
188  _worker_thread = std::thread(&async_log_helper::worker_loop, this);
189 }
190 
191 // send to the worker thread terminate message, and join it.
192 inline spdlog::details::async_log_helper::~async_log_helper()
193 {
194  try
195  {
196  enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry);
197  _worker_thread.join();
198  }
199  catch (...) // don't crash in destructor
200  {
201  }
202 }
203 
204 // try to push and block until succeeded (if the policy is not to discard when the queue is full)
205 inline void spdlog::details::async_log_helper::log(const details::log_msg &msg)
206 {
207  enqueue_msg(async_msg(msg), _overflow_policy);
208 }
209 
210 inline void spdlog::details::async_log_helper::enqueue_msg(details::async_log_helper::async_msg &&new_msg, async_overflow_policy policy)
211 {
212 
213  // block until succeeded pushing to the queue
214  if (policy == async_overflow_policy::block_retry)
215  {
216  _q.enqueue(std::move(new_msg));
217  }
218  else
219  {
220  _q.enqueue_nowait(std::move(new_msg));
221  }
222 }
223 
224 // optionally wait for the queue be empty and request flush from the sinks
225 inline void spdlog::details::async_log_helper::flush()
226 {
227  enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy);
228 }
229 
230 inline void spdlog::details::async_log_helper::worker_loop()
231 {
232  if (_worker_warmup_cb)
233  {
234  _worker_warmup_cb();
235  }
236  auto active = true;
237  while (active)
238  {
239  try
240  {
241  active = process_next_msg();
242  }
243  SPDLOG_CATCH_AND_HANDLE
244  }
245  if (_worker_teardown_cb)
246  {
247  _worker_teardown_cb();
248  }
249 }
250 
251 // process next message in the queue
252 // return true if this thread should still be active (while no terminate msg was received)
253 inline bool spdlog::details::async_log_helper::process_next_msg()
254 {
255  async_msg incoming_async_msg;
256  bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::seconds(2));
257  if (!dequeued)
258  {
259  handle_flush_interval();
260  return true;
261  }
262 
263  switch (incoming_async_msg.msg_type)
264  {
265  case async_msg_type::flush:
266  flush_sinks();
267  return true;
268 
269  case async_msg_type::terminate:
270  flush_sinks();
271  return false;
272 
273  default:
274  log_msg incoming_log_msg;
275  incoming_async_msg.fill_log_msg(incoming_log_msg, &_logger_name);
276  _formatter->format(incoming_log_msg);
277  for (auto &s : _sinks)
278  {
279  if (s->should_log(incoming_log_msg.level))
280  {
281  try
282  {
283  s->log(incoming_log_msg);
284  }
285  SPDLOG_CATCH_AND_HANDLE
286  }
287  }
288  handle_flush_interval();
289  return true;
290  }
291  assert(false);
292  return true; // should not be reached
293 }
294 
295 inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_formatter)
296 {
297  _formatter = std::move(msg_formatter);
298 }
299 
300 inline void spdlog::details::async_log_helper::set_error_handler(spdlog::log_err_handler err_handler)
301 {
302  _err_handler = std::move(err_handler);
303 }
304 
305 // flush all sinks if _flush_interval_ms has expired.
306 inline void spdlog::details::async_log_helper::handle_flush_interval()
307 {
308  if (_flush_interval_ms == std::chrono::milliseconds::zero())
309  {
310  return;
311  }
312  auto delta = details::os::now() - _last_flush;
313  ;
314  if (delta >= _flush_interval_ms)
315  {
316  flush_sinks();
317  }
318 }
319 
320 // flush all sinks if _flush_interval_ms has expired. only called if queue is empty
321 inline void spdlog::details::async_log_helper::flush_sinks()
322 {
323 
324  for (auto &s : _sinks)
325  {
326  try
327  {
328  s->flush();
329  }
330  SPDLOG_CATCH_AND_HANDLE
331  }
332  _last_flush = os::now();
333 }
const Char * data() const FMT_NOEXCEPT
Definition: format.h:3280
Definition: formatter.h:19
Definition: async_logger.h:26
std::size_t size() const
Definition: format.h:3271
Definition: log_msg.h:16
Definition: async_log_helper.h:35