From 999b24f722e69c4fc8ab0bcba6323f4190835cef Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Wed, 18 Nov 2015 11:01:31 +0100 Subject: [PATCH] Implemented log message size predictor. It tracks size of log messages. Initial allocation size is derived from last log message from the same line of code. Fixed bug in test. Signed-off-by: Adam Kupczyk --- src/common/PrebufferedStreambuf.cc | 2 +- src/common/dout.h | 3 ++- src/log/Entry.h | 41 +++++++++++++++++++++++++++--- src/log/Log.cc | 24 ++++++++++++++++- src/log/Log.h | 1 + src/log/test.cc | 5 ++-- 6 files changed, 67 insertions(+), 9 deletions(-) diff --git a/src/common/PrebufferedStreambuf.cc b/src/common/PrebufferedStreambuf.cc index fe3d728dd2c2..fe27ef5db0e5 100644 --- a/src/common/PrebufferedStreambuf.cc +++ b/src/common/PrebufferedStreambuf.cc @@ -15,7 +15,7 @@ PrebufferedStreambuf::int_type PrebufferedStreambuf::overflow(int_type c) { int old_len = m_overflow.size(); if (old_len == 0) { - m_overflow.resize(m_buf_len); + m_overflow.resize(80); } else { m_overflow.resize(old_len * 2); } diff --git a/src/common/dout.h b/src/common/dout.h index f00e4f63df7c..22befd67e0ce 100644 --- a/src/common/dout.h +++ b/src/common/dout.h @@ -48,7 +48,8 @@ inline std::ostream& operator<<(std::ostream& out, _bad_endl_use_dendl_t) { if (0) { \ char __array[((v >= -1) && (v <= 200)) ? 0 : -1] __attribute__((unused)); \ } \ - ceph::log::Entry *_dout_e = cct->_log->create_entry(v, sub); \ + static size_t _log_exp_length=80; \ + ceph::log::Entry *_dout_e = cct->_log->create_entry(v, sub, &_log_exp_length); \ ostream _dout_os(&_dout_e->m_streambuf); \ CephContext *_dout_cct = cct; \ std::ostream* _dout = &_dout_os; diff --git a/src/log/Entry.h b/src/log/Entry.h index 02217609d28f..1b589e1b325c 100644 --- a/src/log/Entry.h +++ b/src/log/Entry.h @@ -9,7 +9,6 @@ #include #include -#define CEPH_LOG_ENTRY_PREALLOC 80 namespace ceph { namespace log { @@ -20,19 +19,38 @@ struct Entry { short m_prio, m_subsys; Entry *m_next; - char m_static_buf[CEPH_LOG_ENTRY_PREALLOC]; PrebufferedStreambuf m_streambuf; + size_t m_buf_len; + size_t* m_exp_len; + char m_static_buf[1]; Entry() : m_thread(0), m_prio(0), m_subsys(0), m_next(NULL), - m_streambuf(m_static_buf, sizeof(m_static_buf)) + m_streambuf(m_static_buf, sizeof(m_static_buf)), + m_buf_len(sizeof(m_static_buf)), + m_exp_len(NULL) {} Entry(utime_t s, pthread_t t, short pr, short sub, + const char *msg = NULL) + : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), + m_next(NULL), + m_streambuf(m_static_buf, sizeof(m_static_buf)), + m_buf_len(sizeof(m_static_buf)), + m_exp_len(NULL) + { + if (msg) { + ostream os(&m_streambuf); + os << msg; + } + } + Entry(utime_t s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len, const char *msg = NULL) : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), m_next(NULL), - m_streambuf(m_static_buf, sizeof(m_static_buf)) + m_streambuf(buf, buf_len), + m_buf_len(buf_len), + m_exp_len(exp_len) { if (msg) { ostream os(&m_streambuf); @@ -40,6 +58,21 @@ struct Entry { } } + // function improves estimate for expected size of message + void hint_size() { + if (m_exp_len != NULL) { + size_t size = m_streambuf.size(); + if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) { + //log larger then expected, just expand + __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED); + } + else { + //asymptotically adapt expected size to message size + __atomic_store_n(m_exp_len, (size + 10 + m_buf_len*31) / 32, __ATOMIC_RELAXED); + } + } + } + void set_str(const std::string &s) { ostream os(&m_streambuf); os << s; diff --git a/src/log/Log.cc b/src/log/Log.cc index a4fb250258d1..b91cc4d3ece6 100644 --- a/src/log/Log.cc +++ b/src/log/Log.cc @@ -21,6 +21,7 @@ #define PREALLOC 1000000 + namespace ceph { namespace log { @@ -167,6 +168,7 @@ void Log::submit_entry(Entry *e) pthread_mutex_unlock(&m_queue_mutex); } + Entry *Log::create_entry(int level, int subsys) { if (true) { @@ -184,6 +186,25 @@ Entry *Log::create_entry(int level, int subsys) } } +Entry *Log::create_entry(int level, int subsys, size_t* expected_size) +{ + if (true) { + size_t size = __atomic_load_n(expected_size, __ATOMIC_RELAXED); + void *ptr = ::operator new(sizeof(Entry) + size); + return new(ptr) Entry(ceph_clock_now(NULL), + pthread_self(), level, subsys, + reinterpret_cast(ptr) + sizeof(Entry), size, expected_size); + } else { + // kludge for perf testing + Entry *e = m_recent.dequeue(); + e->m_stamp = ceph_clock_now(NULL); + e->m_thread = pthread_self(); + e->m_prio = level; + e->m_subsys = subsys; + return e; + } +} + void Log::flush() { pthread_mutex_lock(&m_flush_mutex); @@ -217,6 +238,7 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash) bool do_syslog = m_syslog_crash >= e->m_prio && should_log; bool do_stderr = m_stderr_crash >= e->m_prio && should_log; + e->hint_size(); if (do_fd || do_syslog || do_stderr) { size_t buflen = 0; char buf[80 + e->size()]; @@ -287,7 +309,7 @@ void Log::dump_recent() EntryQueue old; _log_message("--- begin dump of recent events ---", true); - _flush(&m_recent, &old, true); + _flush(&m_recent, &old, true); char buf[4096]; _log_message("--- logging levels ---", true); diff --git a/src/log/Log.h b/src/log/Log.h index 04cadd726f68..57727d312bc9 100644 --- a/src/log/Log.h +++ b/src/log/Log.h @@ -69,6 +69,7 @@ public: void set_stderr_level(int log, int crash); Entry *create_entry(int level, int subsys); + Entry *create_entry(int level, int subsys, size_t* expected_size); void submit_entry(Entry *e); void start(); diff --git a/src/log/test.cc b/src/log/test.cc index 6e4704befde0..a2df608665c1 100644 --- a/src/log/test.cc +++ b/src/log/test.cc @@ -160,9 +160,10 @@ TEST(Log, ManyGatherLogPrebufOverflow) int l = 10; if (subs.should_gather(1, l)) { Entry *e = new Entry(ceph_clock_now(NULL), pthread_self(), l, 1); - PrebufferedStreambuf psb(e->m_static_buf, 20); + PrebufferedStreambuf psb(e->m_static_buf, sizeof(e->m_static_buf)); ostream oss(&psb); - oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd"; + oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd" + << std::string(sizeof(e->m_static_buf) * 2, '-') ; //e->m_str = oss.str(); log.submit_entry(e); } -- 2.47.3