From: Adam Kupczyk Date: Wed, 18 Nov 2015 10:01:31 +0000 (+0100) Subject: Implemented log message size predictor. It tracks size of log messages. X-Git-Tag: v10.0.2~102^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=999b24f722e69c4fc8ab0bcba6323f4190835cef;p=ceph.git Implemented log message size predictor. It tracks size of log messages. Initial allocation size is derived from last log message from the same line of code. Fixed bug in test. Signed-off-by: Adam Kupczyk --- diff --git a/src/common/PrebufferedStreambuf.cc b/src/common/PrebufferedStreambuf.cc index fe3d728dd2c..fe27ef5db0e 100644 --- a/src/common/PrebufferedStreambuf.cc +++ b/src/common/PrebufferedStreambuf.cc @@ -15,7 +15,7 @@ PrebufferedStreambuf::int_type PrebufferedStreambuf::overflow(int_type c) { int old_len = m_overflow.size(); if (old_len == 0) { - m_overflow.resize(m_buf_len); + m_overflow.resize(80); } else { m_overflow.resize(old_len * 2); } diff --git a/src/common/dout.h b/src/common/dout.h index f00e4f63df7..22befd67e0c 100644 --- a/src/common/dout.h +++ b/src/common/dout.h @@ -48,7 +48,8 @@ inline std::ostream& operator<<(std::ostream& out, _bad_endl_use_dendl_t) { if (0) { \ char __array[((v >= -1) && (v <= 200)) ? 0 : -1] __attribute__((unused)); \ } \ - ceph::log::Entry *_dout_e = cct->_log->create_entry(v, sub); \ + static size_t _log_exp_length=80; \ + ceph::log::Entry *_dout_e = cct->_log->create_entry(v, sub, &_log_exp_length); \ ostream _dout_os(&_dout_e->m_streambuf); \ CephContext *_dout_cct = cct; \ std::ostream* _dout = &_dout_os; diff --git a/src/log/Entry.h b/src/log/Entry.h index 02217609d28..1b589e1b325 100644 --- a/src/log/Entry.h +++ b/src/log/Entry.h @@ -9,7 +9,6 @@ #include #include -#define CEPH_LOG_ENTRY_PREALLOC 80 namespace ceph { namespace log { @@ -20,19 +19,38 @@ struct Entry { short m_prio, m_subsys; Entry *m_next; - char m_static_buf[CEPH_LOG_ENTRY_PREALLOC]; PrebufferedStreambuf m_streambuf; + size_t m_buf_len; + size_t* m_exp_len; + char m_static_buf[1]; Entry() : m_thread(0), m_prio(0), m_subsys(0), m_next(NULL), - m_streambuf(m_static_buf, sizeof(m_static_buf)) + m_streambuf(m_static_buf, sizeof(m_static_buf)), + m_buf_len(sizeof(m_static_buf)), + m_exp_len(NULL) {} Entry(utime_t s, pthread_t t, short pr, short sub, + const char *msg = NULL) + : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), + m_next(NULL), + m_streambuf(m_static_buf, sizeof(m_static_buf)), + m_buf_len(sizeof(m_static_buf)), + m_exp_len(NULL) + { + if (msg) { + ostream os(&m_streambuf); + os << msg; + } + } + Entry(utime_t s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len, const char *msg = NULL) : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), m_next(NULL), - m_streambuf(m_static_buf, sizeof(m_static_buf)) + m_streambuf(buf, buf_len), + m_buf_len(buf_len), + m_exp_len(exp_len) { if (msg) { ostream os(&m_streambuf); @@ -40,6 +58,21 @@ struct Entry { } } + // function improves estimate for expected size of message + void hint_size() { + if (m_exp_len != NULL) { + size_t size = m_streambuf.size(); + if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) { + //log larger then expected, just expand + __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED); + } + else { + //asymptotically adapt expected size to message size + __atomic_store_n(m_exp_len, (size + 10 + m_buf_len*31) / 32, __ATOMIC_RELAXED); + } + } + } + void set_str(const std::string &s) { ostream os(&m_streambuf); os << s; diff --git a/src/log/Log.cc b/src/log/Log.cc index a4fb250258d..b91cc4d3ece 100644 --- a/src/log/Log.cc +++ b/src/log/Log.cc @@ -21,6 +21,7 @@ #define PREALLOC 1000000 + namespace ceph { namespace log { @@ -167,6 +168,7 @@ void Log::submit_entry(Entry *e) pthread_mutex_unlock(&m_queue_mutex); } + Entry *Log::create_entry(int level, int subsys) { if (true) { @@ -184,6 +186,25 @@ Entry *Log::create_entry(int level, int subsys) } } +Entry *Log::create_entry(int level, int subsys, size_t* expected_size) +{ + if (true) { + size_t size = __atomic_load_n(expected_size, __ATOMIC_RELAXED); + void *ptr = ::operator new(sizeof(Entry) + size); + return new(ptr) Entry(ceph_clock_now(NULL), + pthread_self(), level, subsys, + reinterpret_cast(ptr) + sizeof(Entry), size, expected_size); + } else { + // kludge for perf testing + Entry *e = m_recent.dequeue(); + e->m_stamp = ceph_clock_now(NULL); + e->m_thread = pthread_self(); + e->m_prio = level; + e->m_subsys = subsys; + return e; + } +} + void Log::flush() { pthread_mutex_lock(&m_flush_mutex); @@ -217,6 +238,7 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash) bool do_syslog = m_syslog_crash >= e->m_prio && should_log; bool do_stderr = m_stderr_crash >= e->m_prio && should_log; + e->hint_size(); if (do_fd || do_syslog || do_stderr) { size_t buflen = 0; char buf[80 + e->size()]; @@ -287,7 +309,7 @@ void Log::dump_recent() EntryQueue old; _log_message("--- begin dump of recent events ---", true); - _flush(&m_recent, &old, true); + _flush(&m_recent, &old, true); char buf[4096]; _log_message("--- logging levels ---", true); diff --git a/src/log/Log.h b/src/log/Log.h index 04cadd726f6..57727d312bc 100644 --- a/src/log/Log.h +++ b/src/log/Log.h @@ -69,6 +69,7 @@ public: void set_stderr_level(int log, int crash); Entry *create_entry(int level, int subsys); + Entry *create_entry(int level, int subsys, size_t* expected_size); void submit_entry(Entry *e); void start(); diff --git a/src/log/test.cc b/src/log/test.cc index 6e4704befde..a2df608665c 100644 --- a/src/log/test.cc +++ b/src/log/test.cc @@ -160,9 +160,10 @@ TEST(Log, ManyGatherLogPrebufOverflow) int l = 10; if (subs.should_gather(1, l)) { Entry *e = new Entry(ceph_clock_now(NULL), pthread_self(), l, 1); - PrebufferedStreambuf psb(e->m_static_buf, 20); + PrebufferedStreambuf psb(e->m_static_buf, sizeof(e->m_static_buf)); ostream oss(&psb); - oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd"; + oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd" + << std::string(sizeof(e->m_static_buf) * 2, '-') ; //e->m_str = oss.str(); log.submit_entry(e); }