From a747aeac13daf3dba43343120659e802cb569f6b Mon Sep 17 00:00:00 2001 From: Patrick Donnelly Date: Thu, 23 Aug 2018 10:40:20 -0700 Subject: [PATCH] log: avoid heap allocations for most log entries Each log Entry now exists on the stack and uses a large (4k) buffer for its log stream. This Entry is std::move'd to the queues (std::vector and boost::circular_buffer) in the Log, involving only memory copies in the general case. There are two memory copies (std::move) for any given Entry, once in Log::submit_entry and again in Log::_flush In practice, this eliminates 100% of allocations outside of startup allocations I've run a simple experiment with the MDS that copies /usr/bin to CephFS. I got measurements for the number of allocations from the heap profiler and the profile of CPU usage in the MDS. ** Before this patch ** == Heap profile: == $ google-pprof --alloc_objects --text bin/ceph-mds out/mds.a.profile.0001.heap Total: 1105048 objects 433329 39.2% 39.2% 433329 39.2% ceph::logging::Log::create_entry 209311 18.9% 58.2% 209311 18.9% __gnu_cxx::__aligned_membuf::_M_addr (inline) 192963 17.5% 75.6% 192963 17.5% __gnu_cxx::new_allocator::allocate (inline) 61774 5.6% 81.2% 61774 5.6% std::__cxx11::basic_string::_M_mutate 37689 3.4% 84.6% 37689 3.4% ceph::buffer::raw_combined::create (inline) 22773 2.1% 86.7% 22773 2.1% mempool::pool_allocator::allocate (inline) 17761 1.6% 88.3% 20523 1.9% std::pair::pair (inline) 15795 1.4% 89.7% 15797 1.4% std::swap (inline) 11011 1.0% 90.7% 130061 11.8% std::__cxx11::list::_M_insert (inline) 10822 1.0% 91.7% 10822 1.0% std::__cxx11::basic_string::reserve 9108 0.8% 92.5% 32721 3.0% __gnu_cxx::new_allocator::construct (inline) 8608 0.8% 93.3% 8610 0.8% std::_Deque_base::_M_initialize_map (inline) 7694 0.7% 94.0% 7694 0.7% std::__cxx11::basic_string::_M_capacity (inline) 6160 0.6% 94.5% 6160 0.6% Journaler::wrap_finisher 6084 0.6% 95.1% 70892 6.4% std::map::operator[] (inline) 5347 0.5% 95.6% 5347 0.5% MutationImpl::add_projected_fnode 4381 0.4% 96.0% 7706 0.7% mempool::pool_allocator::construct (inline) 3588 0.3% 96.3% 182966 16.6% Locker::_do_cap_update 3049 0.3% 96.6% 5280 0.5% std::__shared_count::__shared_count (inline) 3043 0.3% 96.9% 3043 0.3% MDSLogContextBase::MDSLogContextBase (inline) 3038 0.3% 97.1% 14763 1.3% std::__uninitialized_copy::__uninit_copy (inline) So approximately 430k heap allocations for Entry were created! The basic_string::_M_mutate is also another allocation via PrebufferedStreambuf == Profile data == Selecting interesting functions Samples: 798K of event 'cycles:pp', Event count (approx Children Self Com Shared Object Symbol + 1.04% 1.04% log libceph-common.so.0 [.] ceph::logging::Log::_flush + 0.05% 0.05% log libceph-common.so.0 [.] ceph::logging::Log::flush + 0.00% 0.00% log libceph-common.so.0 [.] ceph::logging::Log::_log_safe_write + 0.00% 0.00% log libceph-common.so.0 [.] ceph::logging::Log::entry + 0.00% 0.00% log libceph-common.so.0 [.] ceph::logging::Log::_flush_logbuf ... Children Self Command Shared Object Symbol + 3.69% 0.00% safe_timer libceph-common.so.0 [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf + 0.53% 0.00% ms_dispatch libceph-common.so.0 [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf + 0.13% 0.00% fn_anonymous libceph-common.so.0 [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf + 0.00% 0.00% ms_dispatch libceph-common.so.0 [.] CachedPrebufferedStreambuf::create + 0.00% 0.00% fn_anonymous libceph-common.so.0 [.] CachedPrebufferedStreambuf::create Children Self Command Shared Object Symbol + 0.07% 0.07% fn_anonymous libceph-common.so.0 [.] ceph::logging::Log::create_entry + 0.00% 0.00% ms_dispatch libceph-common.so.0 [.] ceph::logging::Log::create_entry + 0.00% 0.00% ms_dispatch ceph-mds [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt + 0.00% 0.00% md_submit libceph-common.so.0 [.] ceph::logging::Log::create_entry + 0.00% 0.00% fn_anonymous ceph-mds [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt + 0.00% 0.00% safe_timer libceph-common.so.0 [.] ceph::logging::Log::create_entry + 0.00% 0.00% mds_rank_progr libceph-common.so.0 [.] ceph::logging::Log::create_entry + 0.00% 0.00% mds_rank_progr ceph-mds [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt + 0.00% 0.00% msgr-worker-0 libceph-common.so.0 [.] ceph::logging::Log::create_entry + 0.00% 0.00% msgr-worker-2 libceph-common.so.0 [.] ceph::logging::Log::create_entry + 0.00% 0.00% md_submit ceph-mds [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt + 0.00% 0.00% msgr-worker-1 libceph-common.so.0 [.] ceph::logging::Log::create_entry Children Self Command Shared Object Symbol + 8.29% 0.00% ms_dispatch libstdc++.so.6.0.24 [.] virtual thunk to std::basic_ostream >::~basic_ostream + 7.55% 1.46% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::_M_insert + 3.87% 0.00% fn_anonymous libstdc++.so.6.0.24 [.] std::basic_ostream >::~basic_ostream + 2.92% 0.00% md_submit libstdc++.so.6.0.24 [.] virtual thunk to std::basic_ostream >::~basic_ostream + 2.38% 0.00% fn_anonymous libstdc++.so.6.0.24 [.] virtual thunk to std::basic_ostream >::~basic_ostream + 2.22% 2.22% fn_anonymous libstdc++.so.6.0.24 [.] std::ostream::sentry::sentry + 1.89% 0.13% fn_anonymous libstdc++.so.6.0.24 [.] std::__ostream_insert > + 0.71% 0.00% ms_dispatch libstdc++.so.6.0.24 [.] std::basic_ostream >::~basic_ostream + 0.39% 0.06% fn_anonymous libstdc++.so.6.0.24 [.] std::ostream::_M_insert + 0.29% 0.21% ms_dispatch libstdc++.so.6.0.24 [.] std::__ostream_insert > + 0.27% 0.27% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::sentry::~sentry + 0.27% 0.27% fn_anonymous libstdc++.so.6.0.24 [.] std::num_put > >::_M_insert_int + 0.22% 0.22% ms_dispatch libstdc++.so.6.0.24 [.] std::basic_streambuf >::xsputn + 0.20% 0.20% ms_dispatch libstdc++.so.6.0.24 [.] std::num_put > >::_M_insert_int + 0.15% 0.15% fn_anonymous libstdc++.so.6.0.24 [.] std::ostream::sentry::~sentry + 0.14% 0.14% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::sentry::sentry + 0.13% 0.00% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::_M_insert + 0.13% 0.13% fn_anonymous libstdc++.so.6.0.24 [.] std::basic_streambuf >::xsputn + 0.00% 0.00% fn_anonymous libstdc++.so.6.0.24 [.] std::ostream::_M_insert + 0.00% 0.00% ms_dispatch libstdc++.so.6.0.24 [.] std::num_put > >::_M_insert_int And the unittest_log time: $ bin/unittest_log [==========] Running 15 tests from 1 test case [----------] Global test environment set-up [----------] 15 tests from Log [ RUN ] Log.Simple [ OK ] Log.Simple (0 ms) [ RUN ] Log.ReuseBad [ OK ] Log.ReuseBad (1 ms) [ RUN ] Log.ManyNoGather [ OK ] Log.ManyNoGather (0 ms) [ RUN ] Log.ManyGatherLog [ OK ] Log.ManyGatherLog (12 ms) [ RUN ] Log.ManyGatherLogStringAssign [ OK ] Log.ManyGatherLogStringAssign (27 ms) [ RUN ] Log.ManyGatherLogStringAssignWithReserve [ OK ] Log.ManyGatherLogStringAssignWithReserve (27 ms) [ RUN ] Log.ManyGatherLogPrebuf [ OK ] Log.ManyGatherLogPrebuf (15 ms) [ RUN ] Log.ManyGatherLogPrebufOverflow [ OK ] Log.ManyGatherLogPrebufOverflow (15 ms) [ RUN ] Log.ManyGather [ OK ] Log.ManyGather (8 ms) [ RUN ] Log.InternalSegv [WARNING] /home/pdonnell/cephfs-shell/src/googletest/googletest/src/gtest-death-test.cc:836:: Death tests use fork(), which is unsafe particularly in a threaded context. For this test, Google Test detected 3 threads [ OK ] Log.InternalSegv (8 ms) [ RUN ] Log.LargeLog [ OK ] Log.LargeLog (43 ms) [ RUN ] Log.TimeSwitch [ OK ] Log.TimeSwitch (1 ms) [ RUN ] Log.TimeFormat [ OK ] Log.TimeFormat (0 ms) [ RUN ] Log.Speed_gather [ OK ] Log.Speed_gather (1779 ms) [ RUN ] Log.Speed_nogather [ OK ] Log.Speed_nogather (64 ms) [----------] 15 tests from Log (2000 ms total) [----------] Global test environment tear-down [==========] 15 tests from 1 test case ran. (2000 ms total) [ PASSED ] 15 tests ** After Patch ** The StackStreamBuf uses 4k for its default buffer. This appears to be more than reasonable for preventing allocations for logging == Heap profile: == $ google-pprof --alloc_objects --text bin/ceph-mds out/mds.a.profile.0001.heap Total: 1052127 objects 384957 36.6% 36.6% 384957 36.6% __gnu_cxx::new_allocator::allocate (inline) 274720 26.1% 62.7% 274720 26.1% __gnu_cxx::__aligned_membuf::_M_addr (inline) 68496 6.5% 69.2% 68496 6.5% std::__cxx11::basic_string::_M_mutate 44140 4.2% 73.4% 51828 4.9% std::pair::pair (inline) 43091 4.1% 77.5% 43091 4.1% ceph::buffer::raw_combined::create (inline) 27706 2.6% 80.1% 236407 22.5% std::__cxx11::list::_M_insert (inline) 25699 2.4% 82.6% 25699 2.4% std::__cxx11::basic_string::reserve 23183 2.2% 84.8% 23183 2.2% mempool::pool_allocator::allocate (inline) 19466 1.9% 86.6% 81716 7.8% __gnu_cxx::new_allocator::construct (inline) 17606 1.7% 88.3% 17606 1.7% std::__cxx11::basic_string::_M_capacity (inline) 16879 1.6% 89.9% 16881 1.6% std::swap (inline) 8572 0.8% 90.7% 8574 0.8% std::_Deque_base::_M_initialize_map (inline) 8477 0.8% 91.5% 11808 1.1% mempool::pool_allocator::construct (inline) 6166 0.6% 92.1% 6166 0.6% Journaler::wrap_finisher 6080 0.6% 92.7% 134975 12.8% std::map::operator[] (inline) 6079 0.6% 93.3% 6079 0.6% MutationImpl::add_projected_fnode == Profile data == Samples: 62K of event 'cycles:u', Event count (approx.) Overhead Command Shared Object Symbol + 5.91% log libc-2.23.so [.] vfprintf + 5.75% ms_dispatch libstdc++.so.6.0.24 [.] std::__ostream_insert > + 4.59% ms_dispatch ceph-mds [.] StackStringBuf<4096ul>::xsputn + 4.26% ms_dispatch libc-2.23.so [.] __memmove_ssse3_back + 4.07% log libceph-common.so.0 [.] ceph::logging::Log::_flush + 2.48% ms_dispatch libstdc++.so.6.0.24 [.] std::num_put > >::_M_insert_int > + 2.09% ms_dispatch ceph-mds [.] CDir::check_rstats + 2.06% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::sentry::sentry + 1.98% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::sentry::~sentry + 1.87% log libc-2.23.so [.] __strcpy_sse2_unaligned + 1.60% fn_anonymous ceph-mds [.] StackStringBuf<4096ul>::xsputn + 1.46% log libc-2.23.so [.] _IO_default_xsputn + 1.45% log libc-2.23.so [.] _itoa_word + 1.43% fn_anonymous libc-2.23.so [.] __memmove_ssse3_back + 1.40% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::_M_insert + 0.98% fn_anonymous libstdc++.so.6.0.24 [.] std::num_put > >::_M_insert_int --- src/common/Graylog.cc | 14 +- src/common/Graylog.h | 4 +- src/common/StackStringStream.h | 161 ++++++++++++++ src/common/dout.h | 12 +- src/crimson/CMakeLists.txt | 1 - src/log/Entry.h | 153 ++++++------- src/log/EntryQueue.h | 71 ------ src/log/Log.cc | 379 ++++++++++++--------------------- src/log/Log.h | 70 +++--- src/log/test.cc | 115 +++------- 10 files changed, 466 insertions(+), 514 deletions(-) create mode 100644 src/common/StackStringStream.h delete mode 100644 src/log/EntryQueue.h diff --git a/src/common/Graylog.cc b/src/common/Graylog.cc index 3ed10434a2d..79a537eedde 100644 --- a/src/common/Graylog.cc +++ b/src/common/Graylog.cc @@ -67,23 +67,23 @@ void Graylog::set_fsid(const uuid_d& fsid) m_fsid = std::string(&buf[0]); } -void Graylog::log_entry(Entry const * const e) +void Graylog::log_entry(const Entry& e) { if (m_log_dst_valid) { - std::string s = e->get_str(); + auto s = e.strv(); m_formatter->open_object_section(""); m_formatter->dump_string("version", "1.1"); m_formatter->dump_string("host", m_hostname); m_formatter->dump_string("short_message", s); m_formatter->dump_string("_app", "ceph"); - auto t = ceph::logging::log_clock::to_timeval(e->m_stamp); + auto t = ceph::logging::log_clock::to_timeval(e.m_stamp); m_formatter->dump_float("timestamp", t.tv_sec + (t.tv_usec / 1000000.0)); - m_formatter->dump_unsigned("_thread", (uint64_t)e->m_thread); - m_formatter->dump_int("_level", e->m_prio); + m_formatter->dump_unsigned("_thread", (uint64_t)e.m_thread); + m_formatter->dump_int("_level", e.m_prio); if (m_subs != NULL) - m_formatter->dump_string("_subsys_name", m_subs->get_name(e->m_subsys)); - m_formatter->dump_int("_subsys_id", e->m_subsys); + m_formatter->dump_string("_subsys_name", m_subs->get_name(e.m_subsys)); + m_formatter->dump_int("_subsys_id", e.m_subsys); m_formatter->dump_string("_fsid", m_fsid); m_formatter->dump_string("_logger", m_logger); m_formatter->close_section(); diff --git a/src/common/Graylog.h b/src/common/Graylog.h index 70cd770504e..f78c936ae9a 100644 --- a/src/common/Graylog.h +++ b/src/common/Graylog.h @@ -19,7 +19,7 @@ class Formatter; namespace logging { -struct Entry; +class Entry; class SubsystemMap; // Graylog logging backend: Convert log datastructures (LogEntry, Entry) to @@ -51,7 +51,7 @@ class Graylog void set_destination(const std::string& host, int port); - void log_entry(Entry const * const e); + void log_entry(const Entry& e); void log_log_entry(LogEntry const * const e); typedef std::shared_ptr Ref; diff --git a/src/common/StackStringStream.h b/src/common/StackStringStream.h new file mode 100644 index 00000000000..6d080f362ec --- /dev/null +++ b/src/common/StackStringStream.h @@ -0,0 +1,161 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef COMMON_STACKSTRINGSTREAM_H +#define COMMON_STACKSTRINGSTREAM_H + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +template +class StackStringBuf : public std::basic_streambuf +{ +public: + StackStringBuf() = default; + StackStringBuf(const StackStringBuf&) = delete; + StackStringBuf& operator=(const StackStringBuf&) = delete; + StackStringBuf(StackStringBuf&& o) = delete; + StackStringBuf& operator=(StackStringBuf&& o) = delete; + ~StackStringBuf() override = default; + + void push(std::string_view sv) + { + vec.reserve(vec.size() + sv.size()); + vec.insert(vec.end(), sv.begin(), sv.end()); + } + + void clear() + { + vec.clear(); + } + + std::string_view strv() const + { + return std::string_view(vec.data(), vec.size()); + } + +protected: + std::streamsize xsputn(const char *s, std::streamsize n) + { + push(std::string_view(s, n)); + return n; + } + + int overflow(int c) + { + if (traits_type::not_eof(c)) { + char str = traits_type::to_char_type(c); + push(std::string_view(&str, 1)); + return c; + } + return EOF; + } + +private: + + boost::container::small_vector vec; +}; + +template +class StackStringStream : public std::basic_ostream +{ +public: + StackStringStream() : basic_ostream(&ssb) {} + StackStringStream(const StackStringStream& o) = delete; + StackStringStream& operator=(const StackStringStream& o) = delete; + StackStringStream(StackStringStream&& o) = delete; + StackStringStream& operator=(StackStringStream&& o) = delete; + ~StackStringStream() override = default; + + void clear() { + basic_ostream::clear(); + ssb.clear(); + } + + std::string_view strv() const { + return ssb.strv(); + } + +private: + StackStringBuf ssb; +}; + +/* In an ideal world, we could use StackStringStream indiscriminately, but alas + * it's very expensive to construct/destruct. So, we cache them in a + * thread_local vector. DO NOT share these with other threads. The copy/move + * constructors are deliberately restrictive to make this more difficult to + * accidentally do. + */ +class CachedStackStringStream { +public: + using sss = StackStringStream<4096>; + using osptr = std::unique_ptr; + + CachedStackStringStream() { + if (cache.destructed || cache.c.empty()) { + osp = std::make_unique(); + } else { + osp = std::move(cache.c.back()); + cache.c.pop_back(); + osp->clear(); + } + } + CachedStackStringStream(const CachedStackStringStream&) = delete; + CachedStackStringStream& operator=(const CachedStackStringStream&) = delete; + CachedStackStringStream(CachedStackStringStream&&) = delete; + CachedStackStringStream& operator=(CachedStackStringStream&&) = delete; + ~CachedStackStringStream() { + if (!cache.destructed && cache.c.size() < max_elems) { + cache.c.emplace_back(std::move(osp)); + } + } + + sss& get_stream() { + return *osp; + } + const sss& get_stream() const { + return *osp; + } + +private: + static constexpr std::size_t max_elems = 8; + + /* The thread_local cache may be destructed before other static structures. + * If those destructors try to create a CachedStackStringStream (e.g. for + * logging) and access this cache, that access will be undefined. So note if + * the cache has been destructed and check before use. + */ + struct Cache { + using container = std::vector; + + Cache() {} + ~Cache() { destructed = true; } + + container c; + bool destructed = false; + }; + + inline static thread_local Cache cache; + osptr osp; +}; + +#endif diff --git a/src/common/dout.h b/src/common/dout.h index a98dcfa3e1e..fba299cc812 100644 --- a/src/common/dout.h +++ b/src/common/dout.h @@ -157,18 +157,16 @@ struct is_dynamic> : public std::true_type {}; }(cct); \ \ if (should_gather) { \ - static size_t _log_exp_length = 80; \ - ceph::logging::Entry *_dout_e = \ - cct->_log->create_entry(v, sub, &_log_exp_length); \ + ceph::logging::MutableEntry _dout_e(v, sub); \ static_assert(std::is_convertible::value, \ "provided cct must be compatible with CephContext*"); \ auto _dout_cct = cct; \ - std::ostream* _dout = &_dout_e->get_ostream(); + std::ostream* _dout = &_dout_e.get_ostream(); -#define dendl_impl std::flush; \ - _dout_cct->_log->submit_entry(_dout_e); \ - } \ +#define dendl_impl std::flush; \ + _dout_cct->_log->submit_entry(std::move(_dout_e)); \ + } \ } while (0) #endif // WITH_SEASTAR diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 4d6302f41b6..3fffa8b6e37 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -60,7 +60,6 @@ add_library(crimson-common STATIC ${PROJECT_SOURCE_DIR}/src/common/utf8.c ${PROJECT_SOURCE_DIR}/src/common/version.cc ${PROJECT_SOURCE_DIR}/src/common/BackTrace.cc - ${PROJECT_SOURCE_DIR}/src/common/CachedPrebufferedStreambuf.cc ${PROJECT_SOURCE_DIR}/src/common/ConfUtils.cc ${PROJECT_SOURCE_DIR}/src/common/DecayCounter.cc ${PROJECT_SOURCE_DIR}/src/common/HTMLFormatter.cc diff --git a/src/log/Entry.h b/src/log/Entry.h index e0677ef337f..076512cbe0c 100644 --- a/src/log/Entry.h +++ b/src/log/Entry.h @@ -4,103 +4,110 @@ #ifndef __CEPH_LOG_ENTRY_H #define __CEPH_LOG_ENTRY_H -#include "common/CachedPrebufferedStreambuf.h" -#include -#include #include "log/LogClock.h" +#include "common/StackStringStream.h" -namespace ceph { -namespace logging { +#include "boost/container/small_vector.hpp" -struct Entry { - log_time m_stamp; - pthread_t m_thread; - short m_prio, m_subsys; - Entry *m_next; +#include - size_t m_buf_len; - size_t* m_exp_len; - char m_static_buf[1]; +#include - prebuffered_data m_data; - CachedPrebufferedStreambuf* m_streambuf; +namespace ceph { +namespace logging { - Entry() - : Entry(log_time{}, 0, 0, 0, nullptr) - {} - Entry(log_time s, pthread_t t, short pr, short sub, - const char *msg = NULL) - : Entry(s, t, pr, sub, m_static_buf, sizeof(m_static_buf), nullptr, - msg) +class Entry { +public: + using time = log_time; + + Entry() = delete; + Entry(short pr, short sub) : + m_stamp(clock().now()), + m_thread(pthread_self()), + m_prio(pr), + m_subsys(sub) {} - Entry(log_time s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len, - const char *msg = NULL) - : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub), - m_next(NULL), - m_buf_len(buf_len), - m_exp_len(exp_len), - m_data(buf, buf_len), - m_streambuf(CachedPrebufferedStreambuf::create(&m_data)) - { - if (msg) { - get_ostream() << msg; - } - } + Entry(const Entry &) = default; + Entry& operator=(const Entry &) = default; + Entry(Entry &&e) = default; + Entry& operator=(Entry &&e) = default; + virtual ~Entry() = default; + + virtual std::string_view strv() const = 0; + virtual std::size_t size() const = 0; + + time m_stamp; + pthread_t m_thread; + short m_prio, m_subsys; private: - ~Entry() = default; + static log_clock& clock() { + static log_clock clock; + return clock; + } +}; +/* This should never be moved to the heap! Only allocate this on the stack. See + * CachedStackStringStream for rationale. + */ +class MutableEntry : public Entry { public: + MutableEntry() = delete; + MutableEntry(short pr, short sub) : Entry(pr, sub) {} + MutableEntry(const MutableEntry&) = delete; + MutableEntry& operator=(const MutableEntry&) = delete; + MutableEntry(MutableEntry&&) = delete; + MutableEntry& operator=(MutableEntry&&) = delete; + ~MutableEntry() override = default; + std::ostream& get_ostream() { - return m_streambuf->get_ostream(); + return cos.get_stream(); } - // function improves estimate for expected size of message - void hint_size() { - if (m_exp_len != NULL) { - size_t size = m_data.size(); - if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) { - //log larger then expected, just expand - __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED); - } - else { - //asymptotically adapt expected size to message size - __atomic_store_n(m_exp_len, (size + 10 + m_buf_len*31) / 32, __ATOMIC_RELAXED); - } - } + std::string_view strv() const override { + return cos.get_stream().strv(); } - - void set_str(const std::string &s) { - get_ostream() << s; + std::size_t size() const override { + return cos.get_stream().strv().size(); } - std::string get_str() const { - return m_data.get_str(); - } +private: + CachedStackStringStream cos; +}; - // returns current size of content - size_t size() const { - return m_data.size(); +class ConcreteEntry : public Entry { +public: + ConcreteEntry() = delete; + ConcreteEntry(const Entry& e) : Entry(e) { + auto strv = e.strv(); + str.reserve(strv.size()); + str.insert(str.end(), strv.begin(), strv.end()); } - - // extracts up to avail chars of content - int snprintf(char* dst, size_t avail) const { - return m_data.snprintf(dst, avail); + ConcreteEntry& operator=(const Entry& e) { + Entry::operator=(e); + auto strv = e.strv(); + str.reserve(strv.size()); + str.assign(strv.begin(), strv.end()); + return *this; } - - void finish() { - m_streambuf->finish(); + ConcreteEntry(ConcreteEntry&& e) : Entry(e), str(std::move(e.str)) {} + ConcreteEntry& operator=(ConcreteEntry&& e) { + Entry::operator=(e); + str = std::move(e.str); + return *this; } + ~ConcreteEntry() override = default; - void destroy() { - if (m_exp_len != NULL) { - this->~Entry(); - ::operator delete(this); - } else { - delete(this); - } + std::string_view strv() const override { + return std::string_view(str.data(), str.size()); } + std::size_t size() const override { + return str.size(); + } + +private: + boost::container::small_vector str; }; } diff --git a/src/log/EntryQueue.h b/src/log/EntryQueue.h deleted file mode 100644 index 490b6c263a7..00000000000 --- a/src/log/EntryQueue.h +++ /dev/null @@ -1,71 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef __CEPH_LOG_ENTRYQUEUE_H -#define __CEPH_LOG_ENTRYQUEUE_H - -#include "Entry.h" - -namespace ceph { -namespace logging { - -struct EntryQueue { - int m_len; - struct Entry *m_head, *m_tail; - - bool empty() const { - return m_len == 0; - } - - void swap(EntryQueue& other) { - int len = m_len; - struct Entry *h = m_head, *t = m_tail; - m_len = other.m_len; - m_head = other.m_head; - m_tail = other.m_tail; - other.m_len = len; - other.m_head = h; - other.m_tail = t; - } - - void enqueue(Entry *e) { - if (m_tail) { - m_tail->m_next = e; - m_tail = e; - } else { - m_head = m_tail = e; - } - m_len++; - } - - Entry *dequeue() { - if (!m_head) - return NULL; - Entry *e = m_head; - m_head = m_head->m_next; - if (!m_head) - m_tail = NULL; - m_len--; - e->m_next = NULL; - return e; - } - - EntryQueue() - : m_len(0), - m_head(NULL), - m_tail(NULL) - {} - ~EntryQueue() { - Entry *t; - while (m_head) { - t = m_head->m_next; - m_head->destroy(); - m_head = t; - } - } -}; - -} -} - -#endif diff --git a/src/log/Log.cc b/src/log/Log.cc index c1f8e76562a..a11f19b048b 100644 --- a/src/log/Log.cc +++ b/src/log/Log.cc @@ -3,12 +3,8 @@ #include "Log.h" -#include -#include - #include "common/errno.h" #include "common/safe_io.h" -#include "common/Clock.h" #include "common/Graylog.h" #include "common/valgrind.h" @@ -20,10 +16,11 @@ #include "LogClock.h" #include "SubsystemMap.h" -#define DEFAULT_MAX_NEW 100 -#define DEFAULT_MAX_RECENT 10000 +#include +#include + +#include -#define PREALLOC 1000000 #define MAX_LOG_BUF 65536 namespace ceph { @@ -40,66 +37,28 @@ static void log_on_exit(void *p) } Log::Log(const SubsystemMap *s) - : m_indirect_this(NULL), + : m_indirect_this(nullptr), m_subs(s), - m_queue_mutex_holder(0), - m_flush_mutex_holder(0), - m_new(), m_recent(), - m_fd(-1), - m_uid(0), - m_gid(0), - m_fd_last_error(0), - m_syslog_log(-2), m_syslog_crash(-2), - m_stderr_log(1), m_stderr_crash(-1), - m_graylog_log(-3), m_graylog_crash(-3), - m_log_buf(nullptr), m_log_buf_pos(0), - m_stop(false), - m_max_new(DEFAULT_MAX_NEW), - m_max_recent(DEFAULT_MAX_RECENT), - m_inject_segv(false) + m_recent(DEFAULT_MAX_RECENT) { - int ret; - - ret = pthread_mutex_init(&m_flush_mutex, NULL); - ceph_assert(ret == 0); - - ret = pthread_mutex_init(&m_queue_mutex, NULL); - ceph_assert(ret == 0); - - ret = pthread_cond_init(&m_cond_loggers, NULL); - ceph_assert(ret == 0); - - ret = pthread_cond_init(&m_cond_flusher, NULL); - ceph_assert(ret == 0); - - m_log_buf = (char*)malloc(MAX_LOG_BUF); - - // kludge for prealloc testing - if (false) - for (int i=0; i < PREALLOC; i++) - m_recent.enqueue(new Entry); + m_log_buf.reserve(MAX_LOG_BUF); } Log::~Log() { if (m_indirect_this) { - *m_indirect_this = NULL; + *m_indirect_this = nullptr; } ceph_assert(!is_started()); if (m_fd >= 0) VOID_TEMP_FAILURE_RETRY(::close(m_fd)); - free(m_log_buf); - - pthread_mutex_destroy(&m_queue_mutex); - pthread_mutex_destroy(&m_flush_mutex); - pthread_cond_destroy(&m_cond_loggers); - pthread_cond_destroy(&m_cond_flusher); } /// void Log::set_coarse_timestamps(bool coarse) { + std::scoped_lock lock(m_flush_mutex); if (coarse) clock.coarsen(); else @@ -108,6 +67,7 @@ void Log::set_coarse_timestamps(bool coarse) { void Log::set_flush_on_exit() { + std::scoped_lock lock(m_flush_mutex); // Make sure we flush on shutdown. We do this by deliberately // leaking an indirect pointer to ourselves (on_exit() can't // unregister a callback). This is not racy only becuase we @@ -118,33 +78,33 @@ void Log::set_flush_on_exit() } } -void Log::set_max_new(int n) +void Log::set_max_new(std::size_t n) { + std::scoped_lock lock(m_queue_mutex); m_max_new = n; } -void Log::set_max_recent(int n) +void Log::set_max_recent(std::size_t n) { - pthread_mutex_lock(&m_flush_mutex); - m_flush_mutex_holder = pthread_self(); + std::scoped_lock lock(m_flush_mutex); m_max_recent = n; - m_flush_mutex_holder = 0; - pthread_mutex_unlock(&m_flush_mutex); } -void Log::set_log_file(string fn) +void Log::set_log_file(std::string_view fn) { + std::scoped_lock lock(m_flush_mutex); m_log_file = fn; } -void Log::set_log_stderr_prefix(const std::string& p) +void Log::set_log_stderr_prefix(std::string_view p) { + std::scoped_lock lock(m_flush_mutex); m_log_stderr_prefix = p; } void Log::reopen_log_file() { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock(m_flush_mutex); m_flush_mutex_holder = pthread_self(); if (m_fd >= 0) VOID_TEMP_FAILURE_RETRY(::close(m_fd)); @@ -154,7 +114,7 @@ void Log::reopen_log_file() int r = ::fchown(m_fd, m_uid, m_gid); if (r < 0) { r = -errno; - cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r) + std::cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r) << std::endl; } } @@ -162,152 +122,101 @@ void Log::reopen_log_file() m_fd = -1; } m_flush_mutex_holder = 0; - pthread_mutex_unlock(&m_flush_mutex); } void Log::chown_log_file(uid_t uid, gid_t gid) { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock(m_flush_mutex); if (m_fd >= 0) { int r = ::fchown(m_fd, uid, gid); if (r < 0) { r = -errno; - cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r) + std::cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r) << std::endl; } } - pthread_mutex_unlock(&m_flush_mutex); } void Log::set_syslog_level(int log, int crash) { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock(m_flush_mutex); m_syslog_log = log; m_syslog_crash = crash; - pthread_mutex_unlock(&m_flush_mutex); } void Log::set_stderr_level(int log, int crash) { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock(m_flush_mutex); m_stderr_log = log; m_stderr_crash = crash; - pthread_mutex_unlock(&m_flush_mutex); } void Log::set_graylog_level(int log, int crash) { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock(m_flush_mutex); m_graylog_log = log; m_graylog_crash = crash; - pthread_mutex_unlock(&m_flush_mutex); } void Log::start_graylog() { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock(m_flush_mutex); if (! m_graylog.get()) m_graylog = std::make_shared(m_subs, "dlog"); - pthread_mutex_unlock(&m_flush_mutex); } void Log::stop_graylog() { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock(m_flush_mutex); m_graylog.reset(); - pthread_mutex_unlock(&m_flush_mutex); } -void Log::submit_entry(Entry *e) +void Log::submit_entry(Entry&& e) { - e->finish(); - - pthread_mutex_lock(&m_queue_mutex); + std::unique_lock lock(m_queue_mutex); m_queue_mutex_holder = pthread_self(); - if (m_inject_segv) + if (unlikely(m_inject_segv)) *(volatile int *)(0) = 0xdead; // wait for flush to catch up - while (m_new.m_len > m_max_new) - pthread_cond_wait(&m_cond_loggers, &m_queue_mutex); - - m_new.enqueue(e); - pthread_cond_signal(&m_cond_flusher); - m_queue_mutex_holder = 0; - pthread_mutex_unlock(&m_queue_mutex); -} - - -Entry *Log::create_entry(int level, int subsys, const char* msg) -{ - if (true) { - return new Entry(clock.now(), - pthread_self(), - level, subsys, msg); - } else { - // kludge for perf testing - Entry *e = m_recent.dequeue(); - e->m_stamp = clock.now(); - e->m_thread = pthread_self(); - e->m_prio = level; - e->m_subsys = subsys; - return e; + while (m_new.size() > m_max_new) { + if (m_stop) break; // force addition + m_cond_loggers.wait(lock); } -} -Entry *Log::create_entry(int level, int subsys, size_t* expected_size) -{ - if (true) { - ANNOTATE_BENIGN_RACE_SIZED(expected_size, sizeof(*expected_size), - "Log hint"); - size_t size = __atomic_load_n(expected_size, __ATOMIC_RELAXED); - void *ptr = ::operator new(sizeof(Entry) + size); - return new(ptr) Entry(clock.now(), - pthread_self(), level, subsys, - reinterpret_cast(ptr) + sizeof(Entry), size, expected_size); - } else { - // kludge for perf testing - Entry *e = m_recent.dequeue(); - e->m_stamp = clock.now(); - e->m_thread = pthread_self(); - e->m_prio = level; - e->m_subsys = subsys; - return e; - } + m_new.emplace_back(std::move(e)); + m_cond_flusher.notify_all(); + m_queue_mutex_holder = 0; } void Log::flush() { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock1(m_flush_mutex); m_flush_mutex_holder = pthread_self(); - pthread_mutex_lock(&m_queue_mutex); - m_queue_mutex_holder = pthread_self(); - EntryQueue t; - t.swap(m_new); - pthread_cond_broadcast(&m_cond_loggers); - m_queue_mutex_holder = 0; - pthread_mutex_unlock(&m_queue_mutex); - _flush(&t, &m_recent, false); - // trim - while (m_recent.m_len > m_max_recent) { - m_recent.dequeue()->destroy(); + { + std::scoped_lock lock2(m_queue_mutex); + m_queue_mutex_holder = pthread_self(); + assert(m_flush.empty()); + m_flush.swap(m_new); + m_cond_loggers.notify_all(); + m_queue_mutex_holder = 0; } + _flush(m_flush, true, false); m_flush_mutex_holder = 0; - pthread_mutex_unlock(&m_flush_mutex); } -void Log::_log_safe_write(const char* what, size_t write_len) +void Log::_log_safe_write(std::string_view sv) { if (m_fd < 0) return; - int r = safe_write(m_fd, what, write_len); + int r = safe_write(m_fd, sv.data(), sv.size()); if (r != m_fd_last_error) { if (r < 0) - cerr << "problem writing to " << m_log_file + std::cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl; m_fd_last_error = r; @@ -316,98 +225,72 @@ void Log::_log_safe_write(const char* what, size_t write_len) void Log::_flush_logbuf() { - if (m_log_buf_pos) { - _log_safe_write(m_log_buf, m_log_buf_pos); - m_log_buf_pos = 0; + if (m_log_buf.size()) { + _log_safe_write(std::string_view(m_log_buf.data(), m_log_buf.size())); + m_log_buf.resize(0); } } -void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash) +void Log::_flush(EntryVector& t, bool requeue, bool crash) { - Entry *e = nullptr; long len = 0; if (crash) { - len = t->m_len; + len = t.size(); } - if (!requeue) { - e = t->m_head; - if (!e) { - return; - } + if (!requeue && t.empty()) { + return; } - while (true) { - if (requeue) { - e = t->dequeue(); - if (!e) { - break; - } - requeue->enqueue(e); - } else { - e = e->m_next; - if (!e) { - break; - } - } - - unsigned sub = e->m_subsys; - - bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio; + for (auto& e : t) { + auto prio = e.m_prio; + auto stamp = e.m_stamp; + auto sub = e.m_subsys; + auto thread = e.m_thread; + auto str = e.strv(); + + bool should_log = crash || m_subs->get_log_level(sub) >= prio; bool do_fd = m_fd >= 0 && should_log; - bool do_syslog = m_syslog_crash >= e->m_prio && should_log; - bool do_stderr = m_stderr_crash >= e->m_prio && should_log; - bool do_graylog2 = m_graylog_crash >= e->m_prio && should_log; + bool do_syslog = m_syslog_crash >= prio && should_log; + bool do_stderr = m_stderr_crash >= prio && should_log; + bool do_graylog2 = m_graylog_crash >= prio && should_log; - e->hint_size(); if (do_fd || do_syslog || do_stderr) { - size_t line_used = 0; + const std::size_t cur = m_log_buf.size(); + std::size_t used = 0; + const std::size_t allocated = e.size() + 80; + m_log_buf.resize(cur + allocated); - char *line; - size_t line_size = 80 + e->size(); - bool need_dynamic = line_size >= MAX_LOG_BUF; - - // this flushes the existing buffers if either line is longer - // than our buffer, or buffer is too full to fit it - if (m_log_buf_pos + line_size >= MAX_LOG_BUF) { - _flush_logbuf(); - } - if (need_dynamic) { - line = new char[line_size]; - } else { - line = &m_log_buf[m_log_buf_pos]; - } + char* const start = m_log_buf.data(); + char* pos = start + cur; if (crash) { - line_used += snprintf(line, line_size, "%6ld> ", -(--len)); + used += (std::size_t)snprintf(pos + used, allocated - used, "%6ld> ", -(--len)); } - line_used += append_time(e->m_stamp, line + line_used, line_size - line_used); - line_used += snprintf(line + line_used, line_size - line_used, " %lx %2d ", - (unsigned long)e->m_thread, e->m_prio); - - line_used += e->snprintf(line + line_used, line_size - line_used - 1); - ceph_assert(line_used < line_size - 1); + used += (std::size_t)append_time(stamp, pos + used, allocated - used); + used += (std::size_t)snprintf(pos + used, allocated - used, " %lx %2d ", (unsigned long)thread, prio); + memcpy(pos + used, str.data(), str.size()); + used += str.size(); + pos[used] = '\0'; + ceph_assert((used + 1 /* '\n' */) < allocated); if (do_syslog) { - syslog(LOG_USER|LOG_INFO, "%s", line); + syslog(LOG_USER|LOG_INFO, "%s", pos); } if (do_stderr) { - cerr << m_log_stderr_prefix << line << std::endl; + std::cerr << m_log_stderr_prefix << std::string_view(pos, used) << std::endl; } + /* now add newline */ + pos[used++] = '\n'; + if (do_fd) { - line[line_used] = '\n'; - if (need_dynamic) { - _log_safe_write(line, line_used + 1); - m_log_buf_pos = 0; - } else { - m_log_buf_pos += line_used + 1; - } + m_log_buf.resize(cur + used); } else { - m_log_buf_pos = 0; + m_log_buf.resize(0); } - if (need_dynamic) { - delete[] line; + if (m_log_buf.size() > MAX_LOG_BUF) { + _flush_logbuf(); } } @@ -415,10 +298,13 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash) m_graylog->log_entry(e); } + if (requeue) { + m_recent.push_back(std::move(e)); + } } + t.clear(); _flush_logbuf(); - } void Log::_log_message(const char *s, bool crash) @@ -431,35 +317,40 @@ void Log::_log_message(const char *s, bool crash) b += '\n'; int r = safe_write(m_fd, b.c_str(), b.size()); if (r < 0) - cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl; + std::cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl; } if ((crash ? m_syslog_crash : m_syslog_log) >= 0) { syslog(LOG_USER|LOG_INFO, "%s", s); } if ((crash ? m_stderr_crash : m_stderr_log) >= 0) { - cerr << s << std::endl; + std::cerr << s << std::endl; } } void Log::dump_recent() { - pthread_mutex_lock(&m_flush_mutex); + std::scoped_lock lock1(m_flush_mutex); m_flush_mutex_holder = pthread_self(); - pthread_mutex_lock(&m_queue_mutex); - m_queue_mutex_holder = pthread_self(); - - EntryQueue t; - t.swap(m_new); + { + std::scoped_lock lock2(m_queue_mutex); + m_queue_mutex_holder = pthread_self(); + assert(m_flush.empty()); + m_flush.swap(m_new); + m_queue_mutex_holder = 0; + } - m_queue_mutex_holder = 0; - pthread_mutex_unlock(&m_queue_mutex); - _flush(&t, &m_recent, false); + _flush(m_flush, true, false); _flush_logbuf(); _log_message("--- begin dump of recent events ---", true); - _flush(&m_recent, nullptr, true); + { + EntryVector t; + t.insert(t.end(), std::make_move_iterator(m_recent.begin()), std::make_move_iterator(m_recent.end())); + m_recent.clear(); + _flush(t, false, true); + } char buf[4096]; _log_message("--- logging levels ---", true); @@ -472,9 +363,9 @@ void Log::dump_recent() _log_message(buf, true); sprintf(buf, " %2d/%2d (stderr threshold)", m_stderr_log, m_stderr_crash); _log_message(buf, true); - sprintf(buf, " max_recent %9d", m_max_recent); + sprintf(buf, " max_recent %9zu", m_max_recent); _log_message(buf, true); - sprintf(buf, " max_new %9d", m_max_new); + sprintf(buf, " max_new %9zu", m_max_new); _log_message(buf, true); sprintf(buf, " log_file %s", m_log_file.c_str()); _log_message(buf, true); @@ -484,48 +375,50 @@ void Log::dump_recent() _flush_logbuf(); m_flush_mutex_holder = 0; - pthread_mutex_unlock(&m_flush_mutex); } void Log::start() { ceph_assert(!is_started()); - pthread_mutex_lock(&m_queue_mutex); - m_stop = false; - pthread_mutex_unlock(&m_queue_mutex); + { + std::scoped_lock lock(m_queue_mutex); + m_stop = false; + } create("log"); } void Log::stop() { if (is_started()) { - pthread_mutex_lock(&m_queue_mutex); - m_stop = true; - pthread_cond_signal(&m_cond_flusher); - pthread_cond_broadcast(&m_cond_loggers); - pthread_mutex_unlock(&m_queue_mutex); + { + std::scoped_lock lock(m_queue_mutex); + m_stop = true; + m_cond_flusher.notify_one(); + m_cond_loggers.notify_all(); + } join(); } } void *Log::entry() { - pthread_mutex_lock(&m_queue_mutex); - m_queue_mutex_holder = pthread_self(); - while (!m_stop) { - if (!m_new.empty()) { - m_queue_mutex_holder = 0; - pthread_mutex_unlock(&m_queue_mutex); - flush(); - pthread_mutex_lock(&m_queue_mutex); - m_queue_mutex_holder = pthread_self(); - continue; - } + { + std::unique_lock lock(m_queue_mutex); + m_queue_mutex_holder = pthread_self(); + while (!m_stop) { + if (!m_new.empty()) { + m_queue_mutex_holder = 0; + lock.unlock(); + flush(); + lock.lock(); + m_queue_mutex_holder = pthread_self(); + continue; + } - pthread_cond_wait(&m_cond_flusher, &m_queue_mutex); + m_cond_flusher.wait(lock); + } + m_queue_mutex_holder = 0; } - m_queue_mutex_holder = 0; - pthread_mutex_unlock(&m_queue_mutex); flush(); return NULL; } diff --git a/src/log/Log.h b/src/log/Log.h index 07505e6de25..ac10cf9047d 100644 --- a/src/log/Log.h +++ b/src/log/Log.h @@ -4,66 +4,80 @@ #ifndef __CEPH_LOG_LOG_H #define __CEPH_LOG_LOG_H +#include + +#include #include +#include +#include +#include +#include #include "common/Thread.h" +#include "common/likely.h" -#include "EntryQueue.h" +#include "log/Entry.h" namespace ceph { namespace logging { class Graylog; class SubsystemMap; -class Entry; class Log : private Thread { + using EntryRing = boost::circular_buffer; + using EntryVector = std::vector; + + static const std::size_t DEFAULT_MAX_NEW = 100; + static const std::size_t DEFAULT_MAX_RECENT = 10000; + Log **m_indirect_this; log_clock clock; const SubsystemMap *m_subs; - pthread_mutex_t m_queue_mutex; - pthread_mutex_t m_flush_mutex; - pthread_cond_t m_cond_loggers; - pthread_cond_t m_cond_flusher; + std::mutex m_queue_mutex; + std::mutex m_flush_mutex; + std::condition_variable m_cond_loggers; + std::condition_variable m_cond_flusher; pthread_t m_queue_mutex_holder; pthread_t m_flush_mutex_holder; - EntryQueue m_new; ///< new entries - EntryQueue m_recent; ///< recent (less new) entries we've already written at low detail + EntryVector m_new; ///< new entries + EntryRing m_recent; ///< recent (less new) entries we've already written at low detail + EntryVector m_flush; ///< entries to be flushed (here to optimize heap allocations) std::string m_log_file; - int m_fd; - uid_t m_uid; - gid_t m_gid; + int m_fd = -1; + uid_t m_uid = 0; + gid_t m_gid = 0; - int m_fd_last_error; ///< last error we say writing to fd (if any) + int m_fd_last_error = 0; ///< last error we say writing to fd (if any) - int m_syslog_log, m_syslog_crash; - int m_stderr_log, m_stderr_crash; - int m_graylog_log, m_graylog_crash; + int m_syslog_log = -2, m_syslog_crash = -2; + int m_stderr_log = -1, m_stderr_crash = -1; + int m_graylog_log = -3, m_graylog_crash = -3; std::string m_log_stderr_prefix; std::shared_ptr m_graylog; - char* m_log_buf; ///< coalescing buffer - int m_log_buf_pos; ///< where we're at within coalescing buffer + std::vector m_log_buf; - bool m_stop; + bool m_stop = false; - int m_max_new, m_max_recent; + std::size_t m_max_new = DEFAULT_MAX_NEW; + std::size_t m_max_recent = DEFAULT_MAX_RECENT; - bool m_inject_segv; + bool m_inject_segv = false; void *entry() override; - void _log_safe_write(const char* what, size_t write_len); + void _log_safe_write(std::string_view sv); void _flush_logbuf(); - void _flush(EntryQueue *q, EntryQueue *requeue, bool crash); + void _flush(EntryVector& q, bool requeue, bool crash); void _log_message(const char *s, bool crash); @@ -74,12 +88,12 @@ public: void set_flush_on_exit(); void set_coarse_timestamps(bool coarse); - void set_max_new(int n); - void set_max_recent(int n); - void set_log_file(std::string fn); + void set_max_new(std::size_t n); + void set_max_recent(std::size_t n); + void set_log_file(std::string_view fn); void reopen_log_file(); void chown_log_file(uid_t uid, gid_t gid); - void set_log_stderr_prefix(const std::string& p); + void set_log_stderr_prefix(std::string_view p); void flush(); @@ -94,9 +108,7 @@ public: std::shared_ptr graylog() { return m_graylog; } - Entry *create_entry(int level, int subsys, const char* msg = nullptr); - Entry *create_entry(int level, int subsys, size_t* expected_size); - void submit_entry(Entry *e); + void submit_entry(Entry&& e); void start(); void stop(); diff --git a/src/log/test.cc b/src/log/test.cc index fae67e964fc..d127064bdb0 100644 --- a/src/log/test.cc +++ b/src/log/test.cc @@ -41,8 +41,8 @@ TEST(Log, Simple) int sys = i % 4; int l = 5 + (i%4); if (subs.should_gather(sys, l)) { - Entry *e = log.create_entry(l, sys, "hello world"); - log.submit_entry(e); + MutableEntry e(l, sys); + log.submit_entry(std::move(e)); } } @@ -65,18 +65,18 @@ TEST(Log, ReuseBad) const int l = 0; { - auto e = log.create_entry(l, 1); - auto& out = e->get_ostream(); + MutableEntry e(l, 1); + auto& out = e.get_ostream(); out << (std::streambuf*)nullptr; EXPECT_TRUE(out.bad()); // writing nullptr to a stream sets its badbit - log.submit_entry(e); + log.submit_entry(std::move(e)); } { - auto e = log.create_entry(l, 1); - auto& out = e->get_ostream(); + MutableEntry e(l, 1); + auto& out = e.get_ostream(); EXPECT_FALSE(out.bad()); // should not see failures from previous log entry out << "hello world"; - log.submit_entry(e); + log.submit_entry(std::move(e)); } log.flush(); @@ -97,7 +97,7 @@ TEST(Log, ManyNoGather) for (int i=0; iset_str(oss.str()); - log.submit_entry(e); - } - } - log.flush(); - log.stop(); -} -TEST(Log, ManyGatherLogStringAssignWithReserve) -{ - SubsystemMap subs; - subs.set_log_level(1, 20); - subs.set_gather_level(1, 10); - Log log(&subs); - log.start(); - log.set_log_file("/tmp/big"); - log.reopen_log_file(); - for (int i=0; iset_str(oss.str()); - log.submit_entry(e); + MutableEntry e(l, 1); + e.get_ostream() << "this is a long string asdf asdf asdf asdf asdf asdf asd fasd fasdf "; + log.submit_entry(std::move(e)); } } log.flush(); log.stop(); } -TEST(Log, ManyGatherLogPrebuf) +TEST(Log, ManyGatherLogStringAssign) { SubsystemMap subs; subs.set_log_level(1, 20); @@ -181,19 +137,16 @@ TEST(Log, ManyGatherLogPrebuf) for (int i=0; im_static_buf, sizeof(e->m_static_buf)); - ostream oss(&psb); - oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd"; - //e->m_str = oss.str(); - log.submit_entry(e); + MutableEntry e(l, 1); + e.get_ostream() << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd"; + log.submit_entry(std::move(e)); } } log.flush(); log.stop(); } -TEST(Log, ManyGatherLogPrebufOverflow) +TEST(Log, ManyGatherLogStackSpillover) { SubsystemMap subs; subs.set_log_level(1, 20); @@ -205,13 +158,11 @@ TEST(Log, ManyGatherLogPrebufOverflow) for (int i=0; im_static_buf, sizeof(e->m_static_buf)); - ostream oss(&psb); - oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd" - << std::string(sizeof(e->m_static_buf) * 2, '-') ; - //e->m_str = oss.str(); - log.submit_entry(e); + MutableEntry e(l, 1); + auto& s = e.get_ostream(); + s << "foo"; + s << std::string(sizeof(e) * 2, '-'); + log.submit_entry(std::move(e)); } } log.flush(); @@ -230,7 +181,7 @@ TEST(Log, ManyGather) for (int i=0; iset_str(msg); - log.submit_entry(e); + { + MutableEntry e(l, 1); + std::string msg(10000000, 'a'); + e.get_ostream() << msg; + log.submit_entry(std::move(e)); + } log.flush(); log.stop(); } @@ -295,8 +247,9 @@ TEST(Log, TimeSwitch) int l = 10; bool coarse = true; for (auto i = 0U; i < 300; ++i) { - log.submit_entry( - log.create_entry(l, 1, "SQUID THEFT! PUNISHABLE BY DEATH!")); + MutableEntry e(l, 1); + e.get_ostream() << "SQUID THEFT! PUNISHABLE BY DEATH!"; + log.submit_entry(std::move(e)); if (i % 50) log.set_coarse_timestamps(coarse = !coarse); } -- 2.39.5