From b08ae658e8d5b1516ad7b91f924c290265a7b5fe Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Sun, 5 Nov 2017 21:18:28 +0100 Subject: [PATCH] common/log: Added new version of PrebufferedStreambuf, tuned for log usage Signed-off-by: Adam Kupczyk Signed-off-by: Kefu Chai --- src/CMakeLists.txt | 1 + src/common/CachedPrebufferedStreambuf.cc | 147 +++++++++++++++++++++++ src/common/CachedPrebufferedStreambuf.h | 73 +++++++++++ src/common/dout.h | 3 +- src/log/Entry.h | 55 ++++----- src/log/Log.cc | 2 + 6 files changed, 250 insertions(+), 31 deletions(-) create mode 100644 src/common/CachedPrebufferedStreambuf.cc create mode 100644 src/common/CachedPrebufferedStreambuf.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3a1270738fa57..dab80ae86df7b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -416,6 +416,7 @@ set(libcommon_files common/LogClient.cc common/LogEntry.cc common/PrebufferedStreambuf.cc + common/CachedPrebufferedStreambuf.cc common/BackTrace.cc common/perf_counters.cc common/perf_histogram.cc diff --git a/src/common/CachedPrebufferedStreambuf.cc b/src/common/CachedPrebufferedStreambuf.cc new file mode 100644 index 0000000000000..1e27e1f5374ef --- /dev/null +++ b/src/common/CachedPrebufferedStreambuf.cc @@ -0,0 +1,147 @@ +#include "common/CachedPrebufferedStreambuf.h" +#include + +// std::unique cannot be used here, as deletion will not clear value, +// but thread variable will still exist, causing free-memory-read +struct cached_os_t +{ + CachedPrebufferedStreambuf* streambuf; + cached_os_t() + : streambuf(new CachedPrebufferedStreambuf) + {} + ~cached_os_t() { + delete streambuf; + streambuf = nullptr; + } +}; + +thread_local cached_os_t t_os; + +CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf() +{ + if (this == t_os.streambuf) { + // we are deleting thread's PrebufferedStreambuf, + // clear it so we can create it cleanly again without error + t_os.streambuf = nullptr; + } +} + +// lock state of streambuf and detach buffer +void CachedPrebufferedStreambuf::finish() +{ + data->m_pptr = this->pptr(); + data = nullptr; + if (this != t_os.streambuf) { + // this is extra formatter, not useful anymore + delete this; + } +} + +CachedPrebufferedStreambuf* +CachedPrebufferedStreambuf::create(prebuffered_data* data) +{ + CachedPrebufferedStreambuf* streambuf; + + if (t_os.streambuf == nullptr || /*this can happen only on process cleanup*/ + t_os.streambuf->in_use() /*this happens when we do recursion in logging*/ ) { + streambuf = new CachedPrebufferedStreambuf(); + } else { + streambuf = t_os.streambuf; + } + streambuf->data = data; + streambuf->setp(data->m_buf, data->m_buf + data->m_buf_len); + // so we underflow on first read + streambuf->setg(0, 0, 0); + return streambuf; +} + +CachedPrebufferedStreambuf::int_type CachedPrebufferedStreambuf::overflow(int_type c) +{ + int old_len = data->m_overflow.size(); + if (old_len == 0) { + data->m_overflow.resize(80); + } else { + data->m_overflow.resize(old_len * 2); + } + data->m_overflow[old_len] = c; + this->setp(&data->m_overflow[old_len + 1], &*data->m_overflow.begin() + data->m_overflow.size()); + return traits_type::not_eof(c); +} + +CachedPrebufferedStreambuf::int_type CachedPrebufferedStreambuf::underflow() +{ + if (this->gptr() == 0) { + // first read; start with the static buffer + if (!data->m_overflow.empty()) + // there is overflow, so start with entire prealloc buffer + this->setg(data->m_buf, data->m_buf, data->m_buf + data->m_buf_len); + else if (this->pptr() == data->m_buf) + // m_buf is empty + return traits_type::eof(); // no data + else + // set up portion of m_buf we've filled + this->setg(data->m_buf, data->m_buf, this->pptr()); + return *this->gptr(); + } + if (this->gptr() == data->m_buf + data->m_buf_len && data->m_overflow.size()) { + // at end of m_buf; continue with the overflow string + this->setg(&data->m_overflow[0], &data->m_overflow[0], this->pptr()); + return *this->gptr(); + } + + // otherwise we must be at the end (of m_buf and/or m_overflow) + return traits_type::eof(); +} + +/// return a string copy (inefficiently) +std::string prebuffered_data::get_str() const +{ + if (!m_overflow.empty()) { + std::string s(m_buf, m_buf + m_buf_len); + s.append(&m_overflow[0], m_pptr - &m_overflow[0]); + return s; + } else if (m_pptr == m_buf) { + return std::string(); + } else { + return std::string(m_buf, m_pptr - m_buf); + } +} + +// returns current size of content +size_t prebuffered_data::size() const +{ + if (m_overflow.empty()) { + return m_pptr - m_buf; + } else { + return m_buf_len + m_pptr - &m_overflow[0]; + } +} + +// extracts up to avail chars of content +int prebuffered_data::snprintf(char* dst, size_t avail) const +{ + size_t len_a; + size_t len_b; + if (!m_overflow.empty()) { + len_a = m_buf_len; + len_b = m_pptr - &m_overflow[0]; + } else { + len_a = m_pptr - m_buf; + len_b = 0; + } + if (avail > len_a + len_b) { + memcpy(dst, m_buf, len_a); + memcpy(dst + m_buf_len, m_overflow.c_str(), len_b); + dst[len_a + len_b] = 0; + } else { + if (avail > len_a) { + memcpy(dst, m_buf, len_a); + memcpy(dst + m_buf_len, m_overflow.c_str(), avail - len_a - 1); + dst[avail - 1] = 0; + } else { + memcpy(dst, m_buf, avail - 1); + dst[avail - 1] = 0; + } + } + return len_a + len_b; +} diff --git a/src/common/CachedPrebufferedStreambuf.h b/src/common/CachedPrebufferedStreambuf.h new file mode 100644 index 0000000000000..b6a80d6970b5f --- /dev/null +++ b/src/common/CachedPrebufferedStreambuf.h @@ -0,0 +1,73 @@ +#ifndef CEPH_COMMON_CACHED_PREBUFFEREDSTREAMBUF_H +#define CEPH_COMMON_CACHED_PREBUFFEREDSTREAMBUF_H + +#include +#include +#include + +/** + * streambuf using existing buffer, overflowing into a std::string + * + * A simple streambuf that uses a preallocated buffer for small + * strings, and overflows into a std::string when necessary. If the + * preallocated buffer size is chosen well, we can optimize for the + * common case and overflow to a slower heap-allocated buffer when + * necessary. + */ +struct prebuffered_data +{ +private: + char *m_buf; + size_t m_buf_len; + char *m_pptr; + std::string m_overflow; + +public: + prebuffered_data(char* buf, size_t buf_len) + : m_buf(buf), m_buf_len(buf_len), m_pptr(nullptr) {} + + /// return a string copy (inefficiently) + std::string get_str() const; + + // returns current size of content + size_t size() const; + + // extracts up to avail chars of content + int snprintf(char* dst, size_t avail) const; + friend class CachedPrebufferedStreambuf; +}; + +class CachedPrebufferedStreambuf final : public std::streambuf +{ +public: + static CachedPrebufferedStreambuf* create(prebuffered_data* d); + + std::ostream& get_ostream() { + return os; + } + + // called when the buffer fills up + int_type overflow(int_type c) override; + + // called when we read and need more data + int_type underflow() override; + + // signals that formatting log has finished + void finish(); + +private: + CachedPrebufferedStreambuf() + : data(nullptr), os(this) {} + ~CachedPrebufferedStreambuf(); + + // determines if instance is currently used for formatting log + bool in_use() const { + return data != nullptr; + } + + prebuffered_data* data; + std::ostream os; + friend class cached_os_t; +}; + +#endif diff --git a/src/common/dout.h b/src/common/dout.h index 29863a524a602..41a330ee49ab8 100644 --- a/src/common/dout.h +++ b/src/common/dout.h @@ -54,12 +54,11 @@ public: } \ static size_t _log_exp_length = 80; \ ceph::logging::Entry *_dout_e = cct->_log->create_entry(v, sub, &_log_exp_length); \ - ostream _dout_os(&_dout_e->m_streambuf); \ static_assert(std::is_convertible::value, \ "provided cct must be compatible with CephContext*"); \ auto _dout_cct = cct; \ - std::ostream* _dout = &_dout_os; + std::ostream* _dout = &_dout_e->get_ostream(); #define lsubdout(cct, sub, v) dout_impl(cct, ceph_subsys_##sub, v) dout_prefix #define ldout(cct, v) dout_impl(cct, dout_subsys, v) dout_prefix diff --git a/src/log/Entry.h b/src/log/Entry.h index d5c23233ee64b..0005cdda72eb1 100644 --- a/src/log/Entry.h +++ b/src/log/Entry.h @@ -4,7 +4,7 @@ #ifndef __CEPH_LOG_ENTRY_H #define __CEPH_LOG_ENTRY_H -#include "common/PrebufferedStreambuf.h" +#include "common/CachedPrebufferedStreambuf.h" #include #include #include "log/LogClock.h" @@ -19,49 +19,43 @@ struct Entry { short m_prio, m_subsys; Entry *m_next; - PrebufferedStreambuf m_streambuf; size_t m_buf_len; size_t* m_exp_len; char m_static_buf[1]; + prebuffered_data m_data; + CachedPrebufferedStreambuf* m_streambuf; + Entry() - : m_thread(0), m_prio(0), m_subsys(0), - m_next(NULL), - m_streambuf(m_static_buf, sizeof(m_static_buf)), - m_buf_len(sizeof(m_static_buf)), - m_exp_len(NULL) + : Entry(log_time{}, 0, 0, 0, nullptr) {} Entry(log_time s, pthread_t t, short pr, short sub, - const char *msg = NULL) - : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), - m_next(NULL), - m_streambuf(m_static_buf, sizeof(m_static_buf)), - m_buf_len(sizeof(m_static_buf)), - m_exp_len(NULL) - { - if (msg) { - std::ostream os(&m_streambuf); - os << msg; - } - } + const char *msg = NULL) + : Entry(s, t, pr, sub, m_static_buf, sizeof(m_static_buf), nullptr, + msg) + {} Entry(log_time s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len, const char *msg = NULL) : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), m_next(NULL), - m_streambuf(buf, buf_len), m_buf_len(buf_len), - m_exp_len(exp_len) + m_exp_len(exp_len), + m_data(buf, buf_len), + m_streambuf(CachedPrebufferedStreambuf::create(&m_data)) { if (msg) { - std::ostream os(&m_streambuf); - os << msg; + get_ostream() << msg; } } + std::ostream& get_ostream() { + return m_streambuf->get_ostream(); + } + // function improves estimate for expected size of message void hint_size() { if (m_exp_len != NULL) { - size_t size = m_streambuf.size(); + size_t size = m_data.size(); if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) { //log larger then expected, just expand __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED); @@ -74,22 +68,25 @@ struct Entry { } void set_str(const std::string &s) { - std::ostream os(&m_streambuf); - os << s; + get_ostream() << s; } std::string get_str() const { - return m_streambuf.get_str(); + return m_data.get_str(); } // returns current size of content size_t size() const { - return m_streambuf.size(); + return m_data.size(); } // extracts up to avail chars of content int snprintf(char* dst, size_t avail) const { - return m_streambuf.snprintf(dst, avail); + return m_data.snprintf(dst, avail); + } + + void finish() { + m_streambuf->finish(); } }; diff --git a/src/log/Log.cc b/src/log/Log.cc index c190363d7c5fe..dea7c6efa5d1b 100644 --- a/src/log/Log.cc +++ b/src/log/Log.cc @@ -212,6 +212,8 @@ void Log::stop_graylog() void Log::submit_entry(Entry *e) { + e->finish(); + pthread_mutex_lock(&m_queue_mutex); m_queue_mutex_holder = pthread_self(); -- 2.39.5