From 999b24f722e69c4fc8ab0bcba6323f4190835cef Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Wed, 18 Nov 2015 11:01:31 +0100 Subject: [PATCH] Implemented log message size predictor. It tracks size of log messages. Initial allocation size is derived from last log message from the same line of code. Fixed bug in test. Signed-off-by: Adam Kupczyk --- src/common/PrebufferedStreambuf.cc | 2 +- src/common/dout.h | 3 ++- src/log/Entry.h | 41 +++++++++++++++++++++++++++--- src/log/Log.cc | 24 ++++++++++++++++- src/log/Log.h | 1 + src/log/test.cc | 5 ++-- 6 files changed, 67 insertions(+), 9 deletions(-) diff --git a/src/common/PrebufferedStreambuf.cc b/src/common/PrebufferedStreambuf.cc index fe3d728dd2c26..fe27ef5db0e57 100644 --- a/src/common/PrebufferedStreambuf.cc +++ b/src/common/PrebufferedStreambuf.cc @@ -15,7 +15,7 @@ PrebufferedStreambuf::int_type PrebufferedStreambuf::overflow(int_type c) { int old_len = m_overflow.size(); if (old_len == 0) { - m_overflow.resize(m_buf_len); + m_overflow.resize(80); } else { m_overflow.resize(old_len * 2); } diff --git a/src/common/dout.h b/src/common/dout.h index f00e4f63df7cc..22befd67e0cec 100644 --- a/src/common/dout.h +++ b/src/common/dout.h @@ -48,7 +48,8 @@ inline std::ostream& operator<<(std::ostream& out, _bad_endl_use_dendl_t) { if (0) { \ char __array[((v >= -1) && (v <= 200)) ? 0 : -1] __attribute__((unused)); \ } \ - ceph::log::Entry *_dout_e = cct->_log->create_entry(v, sub); \ + static size_t _log_exp_length=80; \ + ceph::log::Entry *_dout_e = cct->_log->create_entry(v, sub, &_log_exp_length); \ ostream _dout_os(&_dout_e->m_streambuf); \ CephContext *_dout_cct = cct; \ std::ostream* _dout = &_dout_os; diff --git a/src/log/Entry.h b/src/log/Entry.h index 02217609d28f3..1b589e1b325cd 100644 --- a/src/log/Entry.h +++ b/src/log/Entry.h @@ -9,7 +9,6 @@ #include #include -#define CEPH_LOG_ENTRY_PREALLOC 80 namespace ceph { namespace log { @@ -20,19 +19,38 @@ struct Entry { short m_prio, m_subsys; Entry *m_next; - char m_static_buf[CEPH_LOG_ENTRY_PREALLOC]; PrebufferedStreambuf m_streambuf; + size_t m_buf_len; + size_t* m_exp_len; + char m_static_buf[1]; Entry() : m_thread(0), m_prio(0), m_subsys(0), m_next(NULL), - m_streambuf(m_static_buf, sizeof(m_static_buf)) + m_streambuf(m_static_buf, sizeof(m_static_buf)), + m_buf_len(sizeof(m_static_buf)), + m_exp_len(NULL) {} Entry(utime_t s, pthread_t t, short pr, short sub, + const char *msg = NULL) + : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), + m_next(NULL), + m_streambuf(m_static_buf, sizeof(m_static_buf)), + m_buf_len(sizeof(m_static_buf)), + m_exp_len(NULL) + { + if (msg) { + ostream os(&m_streambuf); + os << msg; + } + } + Entry(utime_t s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len, const char *msg = NULL) : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), m_next(NULL), - m_streambuf(m_static_buf, sizeof(m_static_buf)) + m_streambuf(buf, buf_len), + m_buf_len(buf_len), + m_exp_len(exp_len) { if (msg) { ostream os(&m_streambuf); @@ -40,6 +58,21 @@ struct Entry { } } + // function improves estimate for expected size of message + void hint_size() { + if (m_exp_len != NULL) { + size_t size = m_streambuf.size(); + if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) { + //log larger then expected, just expand + __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED); + } + else { + //asymptotically adapt expected size to message size + __atomic_store_n(m_exp_len, (size + 10 + m_buf_len*31) / 32, __ATOMIC_RELAXED); + } + } + } + void set_str(const std::string &s) { ostream os(&m_streambuf); os << s; diff --git a/src/log/Log.cc b/src/log/Log.cc index a4fb250258d1b..b91cc4d3ece60 100644 --- a/src/log/Log.cc +++ b/src/log/Log.cc @@ -21,6 +21,7 @@ #define PREALLOC 1000000 + namespace ceph { namespace log { @@ -167,6 +168,7 @@ void Log::submit_entry(Entry *e) pthread_mutex_unlock(&m_queue_mutex); } + Entry *Log::create_entry(int level, int subsys) { if (true) { @@ -184,6 +186,25 @@ Entry *Log::create_entry(int level, int subsys) } } +Entry *Log::create_entry(int level, int subsys, size_t* expected_size) +{ + if (true) { + size_t size = __atomic_load_n(expected_size, __ATOMIC_RELAXED); + void *ptr = ::operator new(sizeof(Entry) + size); + return new(ptr) Entry(ceph_clock_now(NULL), + pthread_self(), level, subsys, + reinterpret_cast(ptr) + sizeof(Entry), size, expected_size); + } else { + // kludge for perf testing + Entry *e = m_recent.dequeue(); + e->m_stamp = ceph_clock_now(NULL); + e->m_thread = pthread_self(); + e->m_prio = level; + e->m_subsys = subsys; + return e; + } +} + void Log::flush() { pthread_mutex_lock(&m_flush_mutex); @@ -217,6 +238,7 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash) bool do_syslog = m_syslog_crash >= e->m_prio && should_log; bool do_stderr = m_stderr_crash >= e->m_prio && should_log; + e->hint_size(); if (do_fd || do_syslog || do_stderr) { size_t buflen = 0; char buf[80 + e->size()]; @@ -287,7 +309,7 @@ void Log::dump_recent() EntryQueue old; _log_message("--- begin dump of recent events ---", true); - _flush(&m_recent, &old, true); + _flush(&m_recent, &old, true); char buf[4096]; _log_message("--- logging levels ---", true); diff --git a/src/log/Log.h b/src/log/Log.h index 04cadd726f68d..57727d312bc91 100644 --- a/src/log/Log.h +++ b/src/log/Log.h @@ -69,6 +69,7 @@ public: void set_stderr_level(int log, int crash); Entry *create_entry(int level, int subsys); + Entry *create_entry(int level, int subsys, size_t* expected_size); void submit_entry(Entry *e); void start(); diff --git a/src/log/test.cc b/src/log/test.cc index 6e4704befde0e..a2df608665c1d 100644 --- a/src/log/test.cc +++ b/src/log/test.cc @@ -160,9 +160,10 @@ TEST(Log, ManyGatherLogPrebufOverflow) int l = 10; if (subs.should_gather(1, l)) { Entry *e = new Entry(ceph_clock_now(NULL), pthread_self(), l, 1); - PrebufferedStreambuf psb(e->m_static_buf, 20); + PrebufferedStreambuf psb(e->m_static_buf, sizeof(e->m_static_buf)); ostream oss(&psb); - oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd"; + oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd" + << std::string(sizeof(e->m_static_buf) * 2, '-') ; //e->m_str = oss.str(); log.submit_entry(e); } -- 2.39.5