]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Implemented log message size predictor. It tracks size of log messages. 6641/head
authorAdam Kupczyk <akupczyk@mirantis.com>
Wed, 18 Nov 2015 10:01:31 +0000 (11:01 +0100)
committerAdam Kupczyk <akupczyk@mirantis.com>
Tue, 1 Dec 2015 17:15:48 +0000 (18:15 +0100)
Initial allocation size is derived from last log message from the same line of code.
Fixed bug in test.

Signed-off-by: Adam Kupczyk <akupczyk@mirantis.com>
src/common/PrebufferedStreambuf.cc
src/common/dout.h
src/log/Entry.h
src/log/Log.cc
src/log/Log.h
src/log/test.cc

index fe3d728dd2c26a0af910d9f01f0fcea9aecbecb4..fe27ef5db0e57e73b374ef21ab84ffffbe603716 100644 (file)
@@ -15,7 +15,7 @@ PrebufferedStreambuf::int_type PrebufferedStreambuf::overflow(int_type c)
 {
   int old_len = m_overflow.size();
   if (old_len == 0) {
-    m_overflow.resize(m_buf_len);
+    m_overflow.resize(80);
   } else {
     m_overflow.resize(old_len * 2);
   }
index f00e4f63df7ccb108071d74fd7124c566c6b38c4..22befd67e0cecd90182d73df5b233975cf26e95a 100644 (file)
@@ -48,7 +48,8 @@ inline std::ostream& operator<<(std::ostream& out, _bad_endl_use_dendl_t) {
     if (0) {                                                           \
       char __array[((v >= -1) && (v <= 200)) ? 0 : -1] __attribute__((unused)); \
     }                                                                  \
-    ceph::log::Entry *_dout_e = cct->_log->create_entry(v, sub);       \
+    static size_t _log_exp_length=80; \
+    ceph::log::Entry *_dout_e = cct->_log->create_entry(v, sub, &_log_exp_length);     \
     ostream _dout_os(&_dout_e->m_streambuf);                           \
     CephContext *_dout_cct = cct;                                      \
     std::ostream* _dout = &_dout_os;
index 02217609d28f3f3c56564edbd75ff6a3217da369..1b589e1b325cd3c2e8fe1b81f8e39c5659733682 100644 (file)
@@ -9,7 +9,6 @@
 #include <pthread.h>
 #include <string>
 
-#define CEPH_LOG_ENTRY_PREALLOC 80
 
 namespace ceph {
 namespace log {
@@ -20,19 +19,38 @@ struct Entry {
   short m_prio, m_subsys;
   Entry *m_next;
 
-  char m_static_buf[CEPH_LOG_ENTRY_PREALLOC];
   PrebufferedStreambuf m_streambuf;
+  size_t m_buf_len;
+  size_t* m_exp_len;
+  char m_static_buf[1];
 
   Entry()
     : m_thread(0), m_prio(0), m_subsys(0),
       m_next(NULL),
-      m_streambuf(m_static_buf, sizeof(m_static_buf))
+      m_streambuf(m_static_buf, sizeof(m_static_buf)),
+      m_buf_len(sizeof(m_static_buf)),
+      m_exp_len(NULL)
   {}
   Entry(utime_t s, pthread_t t, short pr, short sub,
+  const char *msg = NULL)
+      : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub),
+        m_next(NULL),
+        m_streambuf(m_static_buf, sizeof(m_static_buf)),
+        m_buf_len(sizeof(m_static_buf)),
+        m_exp_len(NULL)
+    {
+      if (msg) {
+        ostream os(&m_streambuf);
+        os << msg;
+      }
+    }
+  Entry(utime_t s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len,
        const char *msg = NULL)
     : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub),
       m_next(NULL),
-      m_streambuf(m_static_buf, sizeof(m_static_buf))
+      m_streambuf(buf, buf_len),
+      m_buf_len(buf_len),
+      m_exp_len(exp_len)
   {
     if (msg) {
       ostream os(&m_streambuf);
@@ -40,6 +58,21 @@ struct Entry {
     }
   }
 
+  // function improves estimate for expected size of message
+  void hint_size() {
+    if (m_exp_len != NULL) {
+      size_t size = m_streambuf.size();
+      if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) {
+        //log larger then expected, just expand
+        __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED);
+      }
+      else {
+        //asymptotically adapt expected size to message size
+        __atomic_store_n(m_exp_len, (size + 10 + m_buf_len*31) / 32, __ATOMIC_RELAXED);
+      }
+    }
+  }
+
   void set_str(const std::string &s) {
     ostream os(&m_streambuf);
     os << s;
index a4fb250258d1bf7dd4c84ffed6e3f95f4a1c3d67..b91cc4d3ece606333bd5d3e2f5aaa40db8acff18 100644 (file)
@@ -21,6 +21,7 @@
 
 #define PREALLOC 1000000
 
+
 namespace ceph {
 namespace log {
 
@@ -167,6 +168,7 @@ void Log::submit_entry(Entry *e)
   pthread_mutex_unlock(&m_queue_mutex);
 }
 
+
 Entry *Log::create_entry(int level, int subsys)
 {
   if (true) {
@@ -184,6 +186,25 @@ Entry *Log::create_entry(int level, int subsys)
   }
 }
 
+Entry *Log::create_entry(int level, int subsys, size_t* expected_size)
+{
+  if (true) {
+    size_t size = __atomic_load_n(expected_size, __ATOMIC_RELAXED);
+    void *ptr = ::operator new(sizeof(Entry) + size);
+    return new(ptr) Entry(ceph_clock_now(NULL),
+       pthread_self(), level, subsys,
+       reinterpret_cast<char*>(ptr) + sizeof(Entry), size, expected_size);
+  } else {
+    // kludge for perf testing
+    Entry *e = m_recent.dequeue();
+    e->m_stamp = ceph_clock_now(NULL);
+    e->m_thread = pthread_self();
+    e->m_prio = level;
+    e->m_subsys = subsys;
+    return e;
+  }
+}
+
 void Log::flush()
 {
   pthread_mutex_lock(&m_flush_mutex);
@@ -217,6 +238,7 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
     bool do_syslog = m_syslog_crash >= e->m_prio && should_log;
     bool do_stderr = m_stderr_crash >= e->m_prio && should_log;
 
+    e->hint_size();
     if (do_fd || do_syslog || do_stderr) {
       size_t buflen = 0;
       char buf[80 + e->size()];
@@ -287,7 +309,7 @@ void Log::dump_recent()
 
   EntryQueue old;
   _log_message("--- begin dump of recent events ---", true);
-  _flush(&m_recent, &old, true);  
+  _flush(&m_recent, &old, true);
 
   char buf[4096];
   _log_message("--- logging levels ---", true);
index 04cadd726f68db344568288f652f7224c98f7556..57727d312bc913ec180053e537bd0e7e4ac59382 100644 (file)
@@ -69,6 +69,7 @@ public:
   void set_stderr_level(int log, int crash);
 
   Entry *create_entry(int level, int subsys);
+  Entry *create_entry(int level, int subsys, size_t* expected_size);
   void submit_entry(Entry *e);
 
   void start();
index 6e4704befde0ed59563b8bdf6d147b6414890199..a2df608665c1dce7a735d2d6557f32becadc079b 100644 (file)
@@ -160,9 +160,10 @@ TEST(Log, ManyGatherLogPrebufOverflow)
     int l = 10;
     if (subs.should_gather(1, l)) {
       Entry *e = new Entry(ceph_clock_now(NULL), pthread_self(), l, 1);
-      PrebufferedStreambuf psb(e->m_static_buf, 20);
+      PrebufferedStreambuf psb(e->m_static_buf, sizeof(e->m_static_buf));
       ostream oss(&psb);
-      oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
+      oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd"
+          << std::string(sizeof(e->m_static_buf) * 2, '-') ;
       //e->m_str = oss.str();
       log.submit_entry(e);
     }