]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
log: disk write coalescing 21847/head
authorPiotr Dałek <piotr.dalek@corp.ovh.com>
Fri, 27 Apr 2018 13:36:10 +0000 (15:36 +0200)
committerPiotr Dałek <piotr.dalek@corp.ovh.com>
Mon, 7 May 2018 10:02:51 +0000 (12:02 +0200)
When loglevel is high enough to produce tons of events per second,
sub-sector sized writes are one of bottleneck. Fix this by coalescing
following writes into single, preallocated buffer until flush queue
is empty, then flushing multiple lines to disk in one syscall.
This in turn provides between 0 to 80% performance increase,
depending on how heavy logging is enabled - most gains are seen on
worst case (highest log level) scenarios.
Also refactored flusher code to make var names more reasonable. This
change also gets rid of VLA inside a flusher.

Signed-off-by: Piotr Dałek <piotr.dalek@corp.ovh.com>
src/log/Log.cc
src/log/Log.h

index 9240751dd1228b1989ee45d05010cde42a3d737e..4aef3dfb74b11a8207059647e4ae26e7dbcdc933 100644 (file)
@@ -24,7 +24,7 @@
 #define DEFAULT_MAX_RECENT 10000
 
 #define PREALLOC 1000000
-
+#define MAX_LOG_BUF 65536
 
 namespace ceph {
 namespace logging {
@@ -52,6 +52,7 @@ Log::Log(SubsystemMap *s)
     m_syslog_log(-2), m_syslog_crash(-2),
     m_stderr_log(1), m_stderr_crash(-1),
     m_graylog_log(-3), m_graylog_crash(-3),
+    m_log_buf(nullptr), m_log_buf_pos(0),
     m_stop(false),
     m_max_new(DEFAULT_MAX_NEW),
     m_max_recent(DEFAULT_MAX_RECENT),
@@ -71,6 +72,8 @@ Log::Log(SubsystemMap *s)
   ret = pthread_cond_init(&m_cond_flusher, NULL);
   assert(ret == 0);
 
+  m_log_buf = (char*)malloc(MAX_LOG_BUF);
+
   // kludge for prealloc testing
   if (false)
     for (int i=0; i < PREALLOC; i++)
@@ -86,6 +89,7 @@ Log::~Log()
   assert(!is_started());
   if (m_fd >= 0)
     VOID_TEMP_FAILURE_RETRY(::close(m_fd));
+  free(m_log_buf);
 
   pthread_mutex_destroy(&m_queue_mutex);
   pthread_mutex_destroy(&m_flush_mutex);
@@ -296,6 +300,45 @@ void Log::flush()
   pthread_mutex_unlock(&m_flush_mutex);
 }
 
+void Log::_log_safe_write(const char* what, size_t write_len)
+{
+  if (m_fd < 0)
+    return;
+  int r = safe_write(m_fd, m_log_buf, m_log_buf_pos);
+  if (r != m_fd_last_error) {
+    if (r < 0)
+      cerr << "problem writing to " << m_log_file
+           << ": " << cpp_strerror(r)
+           << std::endl;
+    m_fd_last_error = r;
+  }
+}
+
+void Log::_flush_logbuf()
+{
+  if (m_log_buf_pos) {
+    _log_safe_write(m_log_buf, m_log_buf_pos);
+    m_log_buf_pos = 0;
+  }
+}
+
+// write part of "what" directly to disk, copy remaining part to m_log_buf
+// for later coalescing
+void Log::_write_and_copy(char* what, size_t len)
+{
+  size_t write_len = len - (len & (MAX_LOG_BUF - 1));
+  if (write_len) {
+    _log_safe_write(what, write_len);
+    what += write_len;
+  }
+
+  write_len = len - write_len;
+  if (write_len) {
+     maybe_inline_memcpy((void*)m_log_buf, (void*)what, write_len, 32);
+     m_log_buf_pos = write_len;
+  }
+}
+
 void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
 {
   Entry *e;
@@ -310,59 +353,66 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
 
     e->hint_size();
     if (do_fd || do_syslog || do_stderr) {
-      size_t buflen = 0;
+      size_t line_used = 0;
 
-      char *buf;
-      size_t buf_size = 80 + e->size();
-      bool need_dynamic = buf_size >= 0x10000; //avoids >64K buffers
-                                              //allocation at stack
-      char buf0[need_dynamic ? 1 : buf_size];
+      char *line;
+      size_t line_size = 80 + e->size();
+      bool need_dynamic = line_size >= MAX_LOG_BUF;
+
+      // this flushes the existing buffers if either line is longer
+      // than our buffer, or buffer is too full to fit it
+      if (m_log_buf_pos + line_size >= MAX_LOG_BUF) {
+          _flush_logbuf();
+      }
       if (need_dynamic) {
-        buf = new char[buf_size];
+        line = new char[line_size];
       } else {
-        buf = buf0;
+        line = &m_log_buf[m_log_buf_pos];
       }
 
       if (crash)
-       buflen += snprintf(buf, buf_size, "%6d> ", -t->m_len);
-      buflen += append_time(e->m_stamp, buf + buflen, buf_size - buflen);
-      buflen += snprintf(buf + buflen, buf_size-buflen, " %lx %2d ",
+        line_used += snprintf(line, line_size, "%6d> ", -t->m_len);
+      line_used += append_time(e->m_stamp, line + line_used, line_size - line_used);
+      line_used += snprintf(line + line_used, line_size - line_used, " %lx %2d ",
                        (unsigned long)e->m_thread, e->m_prio);
 
-      buflen += e->snprintf(buf + buflen, buf_size - buflen - 1);
-      if (buflen > buf_size - 1) { //paranoid check, buf was declared
+      line_used += e->snprintf(line + line_used, line_size - line_used - 1);
+      if (line_used > line_size - 1) { //paranoid check, buf was declared
                                   //to hold everything
-        buflen = buf_size - 1;
-        buf[buflen] = 0;
+        line_used = line_size - 1;
+        line[line_used] = 0;
       }
 
       if (do_syslog) {
-        syslog(LOG_USER|LOG_INFO, "%s", buf);
+        syslog(LOG_USER|LOG_INFO, "%s", line);
       }
 
       if (do_stderr) {
-        cerr << m_log_stderr_prefix << buf << std::endl;
+        cerr << m_log_stderr_prefix << line << std::endl;
       }
+
       if (do_fd) {
-        buf[buflen] = '\n';
-        int r = safe_write(m_fd, buf, buflen+1);
-       if (r != m_fd_last_error) {
-         if (r < 0)
-           cerr << "problem writing to " << m_log_file
-                << ": " << cpp_strerror(r)
-                << std::endl;
-         m_fd_last_error = r;
-       }
+        line[line_used] = '\n';
+        if (need_dynamic) {
+          _write_and_copy(line, line_used + 1);
+        }
       }
+
       if (need_dynamic)
-        delete[] buf;
+        delete[] line;
+
+      m_log_buf_pos += line_used + 1;
     }
+
     if (do_graylog2 && m_graylog) {
       m_graylog->log_entry(e);
     }
 
     requeue->enqueue(e);
   }
+
+  _flush_logbuf();
+
 }
 
 void Log::_log_message(const char *s, bool crash)
@@ -400,6 +450,7 @@ void Log::dump_recent()
   m_queue_mutex_holder = 0;
   pthread_mutex_unlock(&m_queue_mutex);
   _flush(&t, &m_recent, false);
+  _flush_logbuf();
 
   EntryQueue old;
   _log_message("--- begin dump of recent events ---", true);
@@ -425,6 +476,8 @@ void Log::dump_recent()
 
   _log_message("--- end dump of recent events ---", true);
 
+  _flush_logbuf();
+
   m_flush_mutex_holder = 0;
   pthread_mutex_unlock(&m_flush_mutex);
 }
index 9d5ae17bd5674370c08083165f9e099a8b76fde6..db137b12d27ef973794ba04285d073fe47ae5d95 100644 (file)
@@ -50,6 +50,9 @@ class Log : private Thread
 
   std::shared_ptr<Graylog> m_graylog;
 
+  char* m_log_buf;   ///< coalescing buffer
+  int m_log_buf_pos; ///< where we're at within coalescing buffer
+
   bool m_stop;
 
   int m_max_new, m_max_recent;
@@ -58,6 +61,9 @@ class Log : private Thread
 
   void *entry() override;
 
+  void _log_safe_write(const char* what, size_t write_len);
+  void _write_and_copy(char* what, size_t len);
+  void _flush_logbuf();
   void _flush(EntryQueue *q, EntryQueue *requeue, bool crash);
 
   void _log_message(const char *s, bool crash);