From: Piotr Dałek Date: Fri, 27 Apr 2018 13:36:10 +0000 (+0200) Subject: log: disk write coalescing X-Git-Tag: v14.0.0~75^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=65da5ba216cafb8a91893d0e7fc092209f0fb286;p=ceph.git log: disk write coalescing When loglevel is high enough to produce tons of events per second, sub-sector sized writes are one of bottleneck. Fix this by coalescing following writes into single, preallocated buffer until flush queue is empty, then flushing multiple lines to disk in one syscall. This in turn provides between 0 to 80% performance increase, depending on how heavy logging is enabled - most gains are seen on worst case (highest log level) scenarios. Also refactored flusher code to make var names more reasonable. This change also gets rid of VLA inside a flusher. Signed-off-by: Piotr Dałek --- diff --git a/src/log/Log.cc b/src/log/Log.cc index 9240751dd122..4aef3dfb74b1 100644 --- a/src/log/Log.cc +++ b/src/log/Log.cc @@ -24,7 +24,7 @@ #define DEFAULT_MAX_RECENT 10000 #define PREALLOC 1000000 - +#define MAX_LOG_BUF 65536 namespace ceph { namespace logging { @@ -52,6 +52,7 @@ Log::Log(SubsystemMap *s) m_syslog_log(-2), m_syslog_crash(-2), m_stderr_log(1), m_stderr_crash(-1), m_graylog_log(-3), m_graylog_crash(-3), + m_log_buf(nullptr), m_log_buf_pos(0), m_stop(false), m_max_new(DEFAULT_MAX_NEW), m_max_recent(DEFAULT_MAX_RECENT), @@ -71,6 +72,8 @@ Log::Log(SubsystemMap *s) ret = pthread_cond_init(&m_cond_flusher, NULL); assert(ret == 0); + m_log_buf = (char*)malloc(MAX_LOG_BUF); + // kludge for prealloc testing if (false) for (int i=0; i < PREALLOC; i++) @@ -86,6 +89,7 @@ Log::~Log() assert(!is_started()); if (m_fd >= 0) VOID_TEMP_FAILURE_RETRY(::close(m_fd)); + free(m_log_buf); pthread_mutex_destroy(&m_queue_mutex); pthread_mutex_destroy(&m_flush_mutex); @@ -296,6 +300,45 @@ void Log::flush() pthread_mutex_unlock(&m_flush_mutex); } +void Log::_log_safe_write(const char* what, size_t write_len) +{ + if (m_fd < 0) + return; + int r = safe_write(m_fd, m_log_buf, m_log_buf_pos); + if (r != m_fd_last_error) { + if (r < 0) + cerr << "problem writing to " << m_log_file + << ": " << cpp_strerror(r) + << std::endl; + m_fd_last_error = r; + } +} + +void Log::_flush_logbuf() +{ + if (m_log_buf_pos) { + _log_safe_write(m_log_buf, m_log_buf_pos); + m_log_buf_pos = 0; + } +} + +// write part of "what" directly to disk, copy remaining part to m_log_buf +// for later coalescing +void Log::_write_and_copy(char* what, size_t len) +{ + size_t write_len = len - (len & (MAX_LOG_BUF - 1)); + if (write_len) { + _log_safe_write(what, write_len); + what += write_len; + } + + write_len = len - write_len; + if (write_len) { + maybe_inline_memcpy((void*)m_log_buf, (void*)what, write_len, 32); + m_log_buf_pos = write_len; + } +} + void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash) { Entry *e; @@ -310,59 +353,66 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash) e->hint_size(); if (do_fd || do_syslog || do_stderr) { - size_t buflen = 0; + size_t line_used = 0; - char *buf; - size_t buf_size = 80 + e->size(); - bool need_dynamic = buf_size >= 0x10000; //avoids >64K buffers - //allocation at stack - char buf0[need_dynamic ? 1 : buf_size]; + char *line; + size_t line_size = 80 + e->size(); + bool need_dynamic = line_size >= MAX_LOG_BUF; + + // this flushes the existing buffers if either line is longer + // than our buffer, or buffer is too full to fit it + if (m_log_buf_pos + line_size >= MAX_LOG_BUF) { + _flush_logbuf(); + } if (need_dynamic) { - buf = new char[buf_size]; + line = new char[line_size]; } else { - buf = buf0; + line = &m_log_buf[m_log_buf_pos]; } if (crash) - buflen += snprintf(buf, buf_size, "%6d> ", -t->m_len); - buflen += append_time(e->m_stamp, buf + buflen, buf_size - buflen); - buflen += snprintf(buf + buflen, buf_size-buflen, " %lx %2d ", + line_used += snprintf(line, line_size, "%6d> ", -t->m_len); + line_used += append_time(e->m_stamp, line + line_used, line_size - line_used); + line_used += snprintf(line + line_used, line_size - line_used, " %lx %2d ", (unsigned long)e->m_thread, e->m_prio); - buflen += e->snprintf(buf + buflen, buf_size - buflen - 1); - if (buflen > buf_size - 1) { //paranoid check, buf was declared + line_used += e->snprintf(line + line_used, line_size - line_used - 1); + if (line_used > line_size - 1) { //paranoid check, buf was declared //to hold everything - buflen = buf_size - 1; - buf[buflen] = 0; + line_used = line_size - 1; + line[line_used] = 0; } if (do_syslog) { - syslog(LOG_USER|LOG_INFO, "%s", buf); + syslog(LOG_USER|LOG_INFO, "%s", line); } if (do_stderr) { - cerr << m_log_stderr_prefix << buf << std::endl; + cerr << m_log_stderr_prefix << line << std::endl; } + if (do_fd) { - buf[buflen] = '\n'; - int r = safe_write(m_fd, buf, buflen+1); - if (r != m_fd_last_error) { - if (r < 0) - cerr << "problem writing to " << m_log_file - << ": " << cpp_strerror(r) - << std::endl; - m_fd_last_error = r; - } + line[line_used] = '\n'; + if (need_dynamic) { + _write_and_copy(line, line_used + 1); + } } + if (need_dynamic) - delete[] buf; + delete[] line; + + m_log_buf_pos += line_used + 1; } + if (do_graylog2 && m_graylog) { m_graylog->log_entry(e); } requeue->enqueue(e); } + + _flush_logbuf(); + } void Log::_log_message(const char *s, bool crash) @@ -400,6 +450,7 @@ void Log::dump_recent() m_queue_mutex_holder = 0; pthread_mutex_unlock(&m_queue_mutex); _flush(&t, &m_recent, false); + _flush_logbuf(); EntryQueue old; _log_message("--- begin dump of recent events ---", true); @@ -425,6 +476,8 @@ void Log::dump_recent() _log_message("--- end dump of recent events ---", true); + _flush_logbuf(); + m_flush_mutex_holder = 0; pthread_mutex_unlock(&m_flush_mutex); } diff --git a/src/log/Log.h b/src/log/Log.h index 9d5ae17bd567..db137b12d27e 100644 --- a/src/log/Log.h +++ b/src/log/Log.h @@ -50,6 +50,9 @@ class Log : private Thread std::shared_ptr m_graylog; + char* m_log_buf; ///< coalescing buffer + int m_log_buf_pos; ///< where we're at within coalescing buffer + bool m_stop; int m_max_new, m_max_recent; @@ -58,6 +61,9 @@ class Log : private Thread void *entry() override; + void _log_safe_write(const char* what, size_t write_len); + void _write_and_copy(char* what, size_t len); + void _flush_logbuf(); void _flush(EntryQueue *q, EntryQueue *requeue, bool crash); void _log_message(const char *s, bool crash);