common/LogClient.cc
common/LogEntry.cc
common/PrebufferedStreambuf.cc
+ common/CachedPrebufferedStreambuf.cc
common/BackTrace.cc
common/perf_counters.cc
common/perf_histogram.cc
--- /dev/null
+#include "common/CachedPrebufferedStreambuf.h"
+#include <string.h>
+
+// std::unique cannot be used here, as deletion will not clear value,
+// but thread variable will still exist, causing free-memory-read
+struct cached_os_t
+{
+ CachedPrebufferedStreambuf* streambuf;
+ cached_os_t()
+ : streambuf(new CachedPrebufferedStreambuf)
+ {}
+ ~cached_os_t() {
+ delete streambuf;
+ streambuf = nullptr;
+ }
+};
+
+thread_local cached_os_t t_os;
+
+CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf()
+{
+ if (this == t_os.streambuf) {
+ // we are deleting thread's PrebufferedStreambuf,
+ // clear it so we can create it cleanly again without error
+ t_os.streambuf = nullptr;
+ }
+}
+
+// lock state of streambuf and detach buffer
+void CachedPrebufferedStreambuf::finish()
+{
+ data->m_pptr = this->pptr();
+ data = nullptr;
+ if (this != t_os.streambuf) {
+ // this is extra formatter, not useful anymore
+ delete this;
+ }
+}
+
+CachedPrebufferedStreambuf*
+CachedPrebufferedStreambuf::create(prebuffered_data* data)
+{
+ CachedPrebufferedStreambuf* streambuf;
+
+ if (t_os.streambuf == nullptr || /*this can happen only on process cleanup*/
+ t_os.streambuf->in_use() /*this happens when we do recursion in logging*/ ) {
+ streambuf = new CachedPrebufferedStreambuf();
+ } else {
+ streambuf = t_os.streambuf;
+ }
+ streambuf->data = data;
+ streambuf->setp(data->m_buf, data->m_buf + data->m_buf_len);
+ // so we underflow on first read
+ streambuf->setg(0, 0, 0);
+ return streambuf;
+}
+
+CachedPrebufferedStreambuf::int_type CachedPrebufferedStreambuf::overflow(int_type c)
+{
+ int old_len = data->m_overflow.size();
+ if (old_len == 0) {
+ data->m_overflow.resize(80);
+ } else {
+ data->m_overflow.resize(old_len * 2);
+ }
+ data->m_overflow[old_len] = c;
+ this->setp(&data->m_overflow[old_len + 1], &*data->m_overflow.begin() + data->m_overflow.size());
+ return traits_type::not_eof(c);
+}
+
+CachedPrebufferedStreambuf::int_type CachedPrebufferedStreambuf::underflow()
+{
+ if (this->gptr() == 0) {
+ // first read; start with the static buffer
+ if (!data->m_overflow.empty())
+ // there is overflow, so start with entire prealloc buffer
+ this->setg(data->m_buf, data->m_buf, data->m_buf + data->m_buf_len);
+ else if (this->pptr() == data->m_buf)
+ // m_buf is empty
+ return traits_type::eof(); // no data
+ else
+ // set up portion of m_buf we've filled
+ this->setg(data->m_buf, data->m_buf, this->pptr());
+ return *this->gptr();
+ }
+ if (this->gptr() == data->m_buf + data->m_buf_len && data->m_overflow.size()) {
+ // at end of m_buf; continue with the overflow string
+ this->setg(&data->m_overflow[0], &data->m_overflow[0], this->pptr());
+ return *this->gptr();
+ }
+
+ // otherwise we must be at the end (of m_buf and/or m_overflow)
+ return traits_type::eof();
+}
+
+/// return a string copy (inefficiently)
+std::string prebuffered_data::get_str() const
+{
+ if (!m_overflow.empty()) {
+ std::string s(m_buf, m_buf + m_buf_len);
+ s.append(&m_overflow[0], m_pptr - &m_overflow[0]);
+ return s;
+ } else if (m_pptr == m_buf) {
+ return std::string();
+ } else {
+ return std::string(m_buf, m_pptr - m_buf);
+ }
+}
+
+// returns current size of content
+size_t prebuffered_data::size() const
+{
+ if (m_overflow.empty()) {
+ return m_pptr - m_buf;
+ } else {
+ return m_buf_len + m_pptr - &m_overflow[0];
+ }
+}
+
+// extracts up to avail chars of content
+int prebuffered_data::snprintf(char* dst, size_t avail) const
+{
+ size_t len_a;
+ size_t len_b;
+ if (!m_overflow.empty()) {
+ len_a = m_buf_len;
+ len_b = m_pptr - &m_overflow[0];
+ } else {
+ len_a = m_pptr - m_buf;
+ len_b = 0;
+ }
+ if (avail > len_a + len_b) {
+ memcpy(dst, m_buf, len_a);
+ memcpy(dst + m_buf_len, m_overflow.c_str(), len_b);
+ dst[len_a + len_b] = 0;
+ } else {
+ if (avail > len_a) {
+ memcpy(dst, m_buf, len_a);
+ memcpy(dst + m_buf_len, m_overflow.c_str(), avail - len_a - 1);
+ dst[avail - 1] = 0;
+ } else {
+ memcpy(dst, m_buf, avail - 1);
+ dst[avail - 1] = 0;
+ }
+ }
+ return len_a + len_b;
+}
--- /dev/null
+#ifndef CEPH_COMMON_CACHED_PREBUFFEREDSTREAMBUF_H
+#define CEPH_COMMON_CACHED_PREBUFFEREDSTREAMBUF_H
+
+#include <streambuf>
+#include <atomic>
+#include <ostream>
+
+/**
+ * streambuf using existing buffer, overflowing into a std::string
+ *
+ * A simple streambuf that uses a preallocated buffer for small
+ * strings, and overflows into a std::string when necessary. If the
+ * preallocated buffer size is chosen well, we can optimize for the
+ * common case and overflow to a slower heap-allocated buffer when
+ * necessary.
+ */
+struct prebuffered_data
+{
+private:
+ char *m_buf;
+ size_t m_buf_len;
+ char *m_pptr;
+ std::string m_overflow;
+
+public:
+ prebuffered_data(char* buf, size_t buf_len)
+ : m_buf(buf), m_buf_len(buf_len), m_pptr(nullptr) {}
+
+ /// return a string copy (inefficiently)
+ std::string get_str() const;
+
+ // returns current size of content
+ size_t size() const;
+
+ // extracts up to avail chars of content
+ int snprintf(char* dst, size_t avail) const;
+ friend class CachedPrebufferedStreambuf;
+};
+
+class CachedPrebufferedStreambuf final : public std::streambuf
+{
+public:
+ static CachedPrebufferedStreambuf* create(prebuffered_data* d);
+
+ std::ostream& get_ostream() {
+ return os;
+ }
+
+ // called when the buffer fills up
+ int_type overflow(int_type c) override;
+
+ // called when we read and need more data
+ int_type underflow() override;
+
+ // signals that formatting log has finished
+ void finish();
+
+private:
+ CachedPrebufferedStreambuf()
+ : data(nullptr), os(this) {}
+ ~CachedPrebufferedStreambuf();
+
+ // determines if instance is currently used for formatting log
+ bool in_use() const {
+ return data != nullptr;
+ }
+
+ prebuffered_data* data;
+ std::ostream os;
+ friend class cached_os_t;
+};
+
+#endif
} \
static size_t _log_exp_length = 80; \
ceph::logging::Entry *_dout_e = cct->_log->create_entry(v, sub, &_log_exp_length); \
- ostream _dout_os(&_dout_e->m_streambuf); \
static_assert(std::is_convertible<decltype(&*cct), \
CephContext* >::value, \
"provided cct must be compatible with CephContext*"); \
auto _dout_cct = cct; \
- std::ostream* _dout = &_dout_os;
+ std::ostream* _dout = &_dout_e->get_ostream();
#define lsubdout(cct, sub, v) dout_impl(cct, ceph_subsys_##sub, v) dout_prefix
#define ldout(cct, v) dout_impl(cct, dout_subsys, v) dout_prefix
#ifndef __CEPH_LOG_ENTRY_H
#define __CEPH_LOG_ENTRY_H
-#include "common/PrebufferedStreambuf.h"
+#include "common/CachedPrebufferedStreambuf.h"
#include <pthread.h>
#include <string>
#include "log/LogClock.h"
short m_prio, m_subsys;
Entry *m_next;
- PrebufferedStreambuf m_streambuf;
size_t m_buf_len;
size_t* m_exp_len;
char m_static_buf[1];
+ prebuffered_data m_data;
+ CachedPrebufferedStreambuf* m_streambuf;
+
Entry()
- : m_thread(0), m_prio(0), m_subsys(0),
- m_next(NULL),
- m_streambuf(m_static_buf, sizeof(m_static_buf)),
- m_buf_len(sizeof(m_static_buf)),
- m_exp_len(NULL)
+ : Entry(log_time{}, 0, 0, 0, nullptr)
{}
Entry(log_time s, pthread_t t, short pr, short sub,
- const char *msg = NULL)
- : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub),
- m_next(NULL),
- m_streambuf(m_static_buf, sizeof(m_static_buf)),
- m_buf_len(sizeof(m_static_buf)),
- m_exp_len(NULL)
- {
- if (msg) {
- std::ostream os(&m_streambuf);
- os << msg;
- }
- }
+ const char *msg = NULL)
+ : Entry(s, t, pr, sub, m_static_buf, sizeof(m_static_buf), nullptr,
+ msg)
+ {}
Entry(log_time s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len,
const char *msg = NULL)
: m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub),
m_next(NULL),
- m_streambuf(buf, buf_len),
m_buf_len(buf_len),
- m_exp_len(exp_len)
+ m_exp_len(exp_len),
+ m_data(buf, buf_len),
+ m_streambuf(CachedPrebufferedStreambuf::create(&m_data))
{
if (msg) {
- std::ostream os(&m_streambuf);
- os << msg;
+ get_ostream() << msg;
}
}
+ std::ostream& get_ostream() {
+ return m_streambuf->get_ostream();
+ }
+
// function improves estimate for expected size of message
void hint_size() {
if (m_exp_len != NULL) {
- size_t size = m_streambuf.size();
+ size_t size = m_data.size();
if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) {
//log larger then expected, just expand
__atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED);
}
void set_str(const std::string &s) {
- std::ostream os(&m_streambuf);
- os << s;
+ get_ostream() << s;
}
std::string get_str() const {
- return m_streambuf.get_str();
+ return m_data.get_str();
}
// returns current size of content
size_t size() const {
- return m_streambuf.size();
+ return m_data.size();
}
// extracts up to avail chars of content
int snprintf(char* dst, size_t avail) const {
- return m_streambuf.snprintf(dst, avail);
+ return m_data.snprintf(dst, avail);
+ }
+
+ void finish() {
+ m_streambuf->finish();
}
};
void Log::submit_entry(Entry *e)
{
+ e->finish();
+
pthread_mutex_lock(&m_queue_mutex);
m_queue_mutex_holder = pthread_self();