]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/log: Added new version of PrebufferedStreambuf, tuned for log usage 19100/head
authorAdam Kupczyk <akupczyk@redhat.com>
Sun, 5 Nov 2017 20:18:28 +0000 (21:18 +0100)
committerKefu Chai <kchai@redhat.com>
Tue, 28 Nov 2017 08:15:24 +0000 (16:15 +0800)
Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/CMakeLists.txt
src/common/CachedPrebufferedStreambuf.cc [new file with mode: 0644]
src/common/CachedPrebufferedStreambuf.h [new file with mode: 0644]
src/common/dout.h
src/log/Entry.h
src/log/Log.cc

index 3a1270738fa57c09867ff84f135a9bcc6e948996..dab80ae86df7bde127811f58c661c437833eddc7 100644 (file)
@@ -416,6 +416,7 @@ set(libcommon_files
   common/LogClient.cc
   common/LogEntry.cc
   common/PrebufferedStreambuf.cc
+  common/CachedPrebufferedStreambuf.cc
   common/BackTrace.cc
   common/perf_counters.cc
   common/perf_histogram.cc
diff --git a/src/common/CachedPrebufferedStreambuf.cc b/src/common/CachedPrebufferedStreambuf.cc
new file mode 100644 (file)
index 0000000..1e27e1f
--- /dev/null
@@ -0,0 +1,147 @@
+#include "common/CachedPrebufferedStreambuf.h"
+#include <string.h>
+
+// std::unique cannot be used here, as deletion will not clear value,
+// but thread variable will still exist, causing free-memory-read
+struct cached_os_t
+{
+  CachedPrebufferedStreambuf* streambuf;
+  cached_os_t()
+    : streambuf(new CachedPrebufferedStreambuf)
+  {}
+  ~cached_os_t() {
+    delete streambuf;
+    streambuf = nullptr;
+  }
+};
+
+thread_local cached_os_t t_os;
+
+CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf()
+{
+  if (this == t_os.streambuf) {
+    // we are deleting thread's PrebufferedStreambuf,
+    // clear it so we can create it cleanly again without error
+    t_os.streambuf = nullptr;
+  }
+}
+
+// lock state of streambuf and detach buffer
+void CachedPrebufferedStreambuf::finish()
+{
+  data->m_pptr = this->pptr();
+  data = nullptr;
+  if (this != t_os.streambuf) {
+    // this is extra formatter, not useful anymore
+    delete this;
+  }
+}
+
+CachedPrebufferedStreambuf*
+CachedPrebufferedStreambuf::create(prebuffered_data* data)
+{
+  CachedPrebufferedStreambuf* streambuf;
+
+  if (t_os.streambuf == nullptr || /*this can happen only on process cleanup*/
+      t_os.streambuf->in_use() /*this happens when we do recursion in logging*/ ) {
+    streambuf = new CachedPrebufferedStreambuf();
+  } else {
+    streambuf = t_os.streambuf;
+  }
+  streambuf->data = data;
+  streambuf->setp(data->m_buf, data->m_buf + data->m_buf_len);
+  // so we underflow on first read
+  streambuf->setg(0, 0, 0);
+  return streambuf;
+}
+
+CachedPrebufferedStreambuf::int_type CachedPrebufferedStreambuf::overflow(int_type c)
+{
+  int old_len = data->m_overflow.size();
+  if (old_len == 0) {
+    data->m_overflow.resize(80);
+  } else {
+    data->m_overflow.resize(old_len * 2);
+  }
+  data->m_overflow[old_len] = c;
+  this->setp(&data->m_overflow[old_len + 1], &*data->m_overflow.begin() + data->m_overflow.size());
+  return traits_type::not_eof(c);
+}
+
+CachedPrebufferedStreambuf::int_type CachedPrebufferedStreambuf::underflow()
+{
+  if (this->gptr() == 0) {
+    // first read; start with the static buffer
+    if (!data->m_overflow.empty())
+      // there is overflow, so start with entire prealloc buffer
+      this->setg(data->m_buf, data->m_buf, data->m_buf + data->m_buf_len);
+    else if (this->pptr() == data->m_buf)
+      // m_buf is empty
+      return traits_type::eof();  // no data
+    else
+      // set up portion of m_buf we've filled
+      this->setg(data->m_buf, data->m_buf, this->pptr());
+    return *this->gptr();
+  }
+  if (this->gptr() == data->m_buf + data->m_buf_len && data->m_overflow.size()) {
+    // at end of m_buf; continue with the overflow string
+    this->setg(&data->m_overflow[0], &data->m_overflow[0], this->pptr());
+    return *this->gptr();
+  }
+
+  // otherwise we must be at the end (of m_buf and/or m_overflow)
+  return traits_type::eof();
+}
+
+/// return a string copy (inefficiently)
+std::string prebuffered_data::get_str() const
+{
+  if (!m_overflow.empty()) {
+    std::string s(m_buf, m_buf + m_buf_len);
+    s.append(&m_overflow[0], m_pptr - &m_overflow[0]);
+    return s;
+  } else if (m_pptr == m_buf) {
+    return std::string();
+  } else {
+    return std::string(m_buf, m_pptr - m_buf);
+  }
+}
+
+// returns current size of content
+size_t prebuffered_data::size() const
+{
+  if (m_overflow.empty()) {
+    return m_pptr - m_buf;
+  } else {
+    return m_buf_len + m_pptr - &m_overflow[0];
+  }
+}
+
+// extracts up to avail chars of content
+int prebuffered_data::snprintf(char* dst, size_t avail) const
+{
+  size_t len_a;
+  size_t len_b;
+  if (!m_overflow.empty()) {
+    len_a = m_buf_len;
+    len_b = m_pptr - &m_overflow[0];
+  } else {
+    len_a = m_pptr - m_buf;
+    len_b = 0;
+  }
+  if (avail > len_a + len_b) {
+    memcpy(dst, m_buf, len_a);
+    memcpy(dst + m_buf_len, m_overflow.c_str(), len_b);
+    dst[len_a + len_b] = 0;
+  } else {
+    if (avail > len_a) {
+      memcpy(dst, m_buf, len_a);
+      memcpy(dst + m_buf_len, m_overflow.c_str(), avail - len_a - 1);
+      dst[avail - 1] = 0;
+    } else {
+      memcpy(dst, m_buf, avail - 1);
+      dst[avail - 1] = 0;
+    }
+  }
+  return len_a + len_b;
+}
diff --git a/src/common/CachedPrebufferedStreambuf.h b/src/common/CachedPrebufferedStreambuf.h
new file mode 100644 (file)
index 0000000..b6a80d6
--- /dev/null
@@ -0,0 +1,73 @@
+#ifndef CEPH_COMMON_CACHED_PREBUFFEREDSTREAMBUF_H
+#define CEPH_COMMON_CACHED_PREBUFFEREDSTREAMBUF_H
+
+#include <streambuf>
+#include <atomic>
+#include <ostream>
+
+/**
+ * streambuf using existing buffer, overflowing into a std::string
+ *
+ * A simple streambuf that uses a preallocated buffer for small
+ * strings, and overflows into a std::string when necessary.  If the
+ * preallocated buffer size is chosen well, we can optimize for the
+ * common case and overflow to a slower heap-allocated buffer when
+ * necessary.
+ */
+struct prebuffered_data
+{
+private:
+  char *m_buf;
+  size_t m_buf_len;
+  char *m_pptr;
+  std::string m_overflow;
+
+public:
+  prebuffered_data(char* buf, size_t buf_len)
+  : m_buf(buf), m_buf_len(buf_len), m_pptr(nullptr) {}
+
+  /// return a string copy (inefficiently)
+  std::string get_str() const;
+
+  // returns current size of content
+  size_t size() const;
+
+  // extracts up to avail chars of content
+  int snprintf(char* dst, size_t avail) const;
+  friend class CachedPrebufferedStreambuf;
+};
+
+class CachedPrebufferedStreambuf final : public std::streambuf
+{
+public:
+  static CachedPrebufferedStreambuf* create(prebuffered_data* d);
+
+  std::ostream& get_ostream() {
+    return os;
+  }
+
+  // called when the buffer fills up
+  int_type overflow(int_type c) override;
+
+  // called when we read and need more data
+  int_type underflow() override;
+
+  // signals that formatting log has finished
+  void finish();
+
+private:
+  CachedPrebufferedStreambuf()
+    : data(nullptr), os(this) {}
+  ~CachedPrebufferedStreambuf();
+
+  // determines if instance is currently used for formatting log
+  bool in_use() const {
+    return data != nullptr;
+  }
+
+  prebuffered_data* data;
+  std::ostream os;
+  friend class cached_os_t;
+};
+
+#endif
index 29863a524a60277ab37aea4d047a91e1c7f3876e..41a330ee49ab8fa871d641a96f08f1cb18235691 100644 (file)
@@ -54,12 +54,11 @@ public:
     }                                                                  \
     static size_t _log_exp_length = 80;                                \
     ceph::logging::Entry *_dout_e = cct->_log->create_entry(v, sub, &_log_exp_length); \
-    ostream _dout_os(&_dout_e->m_streambuf);                           \
     static_assert(std::is_convertible<decltype(&*cct),                         \
                                      CephContext* >::value,            \
                  "provided cct must be compatible with CephContext*"); \
     auto _dout_cct = cct;                                              \
-    std::ostream* _dout = &_dout_os;
+    std::ostream* _dout = &_dout_e->get_ostream();
 
 #define lsubdout(cct, sub, v)  dout_impl(cct, ceph_subsys_##sub, v) dout_prefix
 #define ldout(cct, v)  dout_impl(cct, dout_subsys, v) dout_prefix
index d5c23233ee64b5c90fba94f195c4fedc3b0a4bb2..0005cdda72eb13ffb41de20a446d2c0b805cfc49 100644 (file)
@@ -4,7 +4,7 @@
 #ifndef __CEPH_LOG_ENTRY_H
 #define __CEPH_LOG_ENTRY_H
 
-#include "common/PrebufferedStreambuf.h"
+#include "common/CachedPrebufferedStreambuf.h"
 #include <pthread.h>
 #include <string>
 #include "log/LogClock.h"
@@ -19,49 +19,43 @@ struct Entry {
   short m_prio, m_subsys;
   Entry *m_next;
 
-  PrebufferedStreambuf m_streambuf;
   size_t m_buf_len;
   size_t* m_exp_len;
   char m_static_buf[1];
 
+  prebuffered_data m_data;
+  CachedPrebufferedStreambuf* m_streambuf;
+
   Entry()
-    : m_thread(0), m_prio(0), m_subsys(0),
-      m_next(NULL),
-      m_streambuf(m_static_buf, sizeof(m_static_buf)),
-      m_buf_len(sizeof(m_static_buf)),
-      m_exp_len(NULL)
+    : Entry(log_time{}, 0, 0, 0, nullptr)
   {}
   Entry(log_time s, pthread_t t, short pr, short sub,
-  const char *msg = NULL)
-      : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub),
-        m_next(NULL),
-        m_streambuf(m_static_buf, sizeof(m_static_buf)),
-        m_buf_len(sizeof(m_static_buf)),
-        m_exp_len(NULL)
-    {
-      if (msg) {
-       std::ostream os(&m_streambuf);
-        os << msg;
-      }
-    }
+       const char *msg = NULL)
+    : Entry(s, t, pr, sub, m_static_buf, sizeof(m_static_buf), nullptr,
+           msg)
+  {}
   Entry(log_time s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len,
        const char *msg = NULL)
     : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub),
       m_next(NULL),
-      m_streambuf(buf, buf_len),
       m_buf_len(buf_len),
-      m_exp_len(exp_len)
+      m_exp_len(exp_len),
+      m_data(buf, buf_len),
+      m_streambuf(CachedPrebufferedStreambuf::create(&m_data))
   {
     if (msg) {
-      std::ostream os(&m_streambuf);
-      os << msg;
+      get_ostream() << msg;
     }
   }
 
+  std::ostream& get_ostream() {
+    return m_streambuf->get_ostream();
+  }
+
   // function improves estimate for expected size of message
   void hint_size() {
     if (m_exp_len != NULL) {
-      size_t size = m_streambuf.size();
+      size_t size = m_data.size();
       if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) {
         //log larger then expected, just expand
         __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED);
@@ -74,22 +68,25 @@ struct Entry {
   }
 
   void set_str(const std::string &s) {
-    std::ostream os(&m_streambuf);
-    os << s;
+    get_ostream() << s;
   }
 
   std::string get_str() const {
-    return m_streambuf.get_str();
+    return m_data.get_str();
   }
 
   // returns current size of content
   size_t size() const {
-    return m_streambuf.size();
+    return m_data.size();
   }
 
   // extracts up to avail chars of content
   int snprintf(char* dst, size_t avail) const {
-    return m_streambuf.snprintf(dst, avail);
+    return m_data.snprintf(dst, avail);
+  }
+
+  void finish() {
+    m_streambuf->finish();
   }
 };
 
index c190363d7c5fe9862d083c3c3df05f1076afcfae..dea7c6efa5d1bc0cf158deaabcadf85e98d0d502 100644 (file)
@@ -212,6 +212,8 @@ void Log::stop_graylog()
 
 void Log::submit_entry(Entry *e)
 {
+  e->finish();
+
   pthread_mutex_lock(&m_queue_mutex);
   m_queue_mutex_holder = pthread_self();