Each log Entry now exists on the stack and uses a large (4k) buffer for its log
stream. This Entry is std::move'd to the queues (std::vector and
boost::circular_buffer) in the Log, involving only memory copies in the general
case. There are two memory copies (std::move) for any given Entry, once in
Log::submit_entry and again in Log::_flush
In practice, this eliminates 100% of allocations outside of startup
allocations
I've run a simple experiment with the MDS that copies /usr/bin to CephFS. I got
measurements for the number of allocations from the heap profiler and the
profile of CPU usage in the MDS.
** Before this patch **
== Heap profile: ==
$ google-pprof --alloc_objects --text bin/ceph-mds out/mds.a.profile.0001.heap
Total:
1105048 objects
433329 39.2% 39.2% 433329 39.2% ceph::logging::Log::create_entry
209311 18.9% 58.2% 209311 18.9% __gnu_cxx::__aligned_membuf::_M_addr (inline)
192963 17.5% 75.6% 192963 17.5% __gnu_cxx::new_allocator::allocate (inline)
61774 5.6% 81.2% 61774 5.6% std::__cxx11::basic_string::_M_mutate
37689 3.4% 84.6% 37689 3.4% ceph::buffer::raw_combined::create (inline)
22773 2.1% 86.7% 22773 2.1% mempool::pool_allocator::allocate (inline)
17761 1.6% 88.3% 20523 1.9% std::pair::pair (inline)
15795 1.4% 89.7% 15797 1.4% std::swap (inline)
11011 1.0% 90.7% 130061 11.8% std::__cxx11::list::_M_insert (inline)
10822 1.0% 91.7% 10822 1.0% std::__cxx11::basic_string::reserve
9108 0.8% 92.5% 32721 3.0% __gnu_cxx::new_allocator::construct (inline)
8608 0.8% 93.3% 8610 0.8% std::_Deque_base::_M_initialize_map (inline)
7694 0.7% 94.0% 7694 0.7% std::__cxx11::basic_string::_M_capacity (inline)
6160 0.6% 94.5% 6160 0.6% Journaler::wrap_finisher
6084 0.6% 95.1% 70892 6.4% std::map::operator[] (inline)
5347 0.5% 95.6% 5347 0.5% MutationImpl::add_projected_fnode
4381 0.4% 96.0% 7706 0.7% mempool::pool_allocator::construct (inline)
3588 0.3% 96.3% 182966 16.6% Locker::_do_cap_update
3049 0.3% 96.6% 5280 0.5% std::__shared_count::__shared_count (inline)
3043 0.3% 96.9% 3043 0.3% MDSLogContextBase::MDSLogContextBase (inline)
3038 0.3% 97.1% 14763 1.3% std::__uninitialized_copy::__uninit_copy (inline)
So approximately 430k heap allocations for Entry were created! The
basic_string::_M_mutate is also another allocation via PrebufferedStreambuf
== Profile data ==
Selecting interesting functions
Samples: 798K of event 'cycles:pp', Event count (approx
Children Self Com Shared Object Symbol
+ 1.04% 1.04% log libceph-common.so.0 [.] ceph::logging::Log::_flush
+ 0.05% 0.05% log libceph-common.so.0 [.] ceph::logging::Log::flush
+ 0.00% 0.00% log libceph-common.so.0 [.] ceph::logging::Log::_log_safe_write
+ 0.00% 0.00% log libceph-common.so.0 [.] ceph::logging::Log::entry
+ 0.00% 0.00% log libceph-common.so.0 [.] ceph::logging::Log::_flush_logbuf
...
Children Self Command Shared Object Symbol
+ 3.69% 0.00% safe_timer libceph-common.so.0 [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf
+ 0.53% 0.00% ms_dispatch libceph-common.so.0 [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf
+ 0.13% 0.00% fn_anonymous libceph-common.so.0 [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf
+ 0.00% 0.00% ms_dispatch libceph-common.so.0 [.] CachedPrebufferedStreambuf::create
+ 0.00% 0.00% fn_anonymous libceph-common.so.0 [.] CachedPrebufferedStreambuf::create
Children Self Command Shared Object Symbol
+ 0.07% 0.07% fn_anonymous libceph-common.so.0 [.] ceph::logging::Log::create_entry
+ 0.00% 0.00% ms_dispatch libceph-common.so.0 [.] ceph::logging::Log::create_entry
+ 0.00% 0.00% ms_dispatch ceph-mds [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt
+ 0.00% 0.00% md_submit libceph-common.so.0 [.] ceph::logging::Log::create_entry
+ 0.00% 0.00% fn_anonymous ceph-mds [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt
+ 0.00% 0.00% safe_timer libceph-common.so.0 [.] ceph::logging::Log::create_entry
+ 0.00% 0.00% mds_rank_progr libceph-common.so.0 [.] ceph::logging::Log::create_entry
+ 0.00% 0.00% mds_rank_progr ceph-mds [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt
+ 0.00% 0.00% msgr-worker-0 libceph-common.so.0 [.] ceph::logging::Log::create_entry
+ 0.00% 0.00% msgr-worker-2 libceph-common.so.0 [.] ceph::logging::Log::create_entry
+ 0.00% 0.00% md_submit ceph-mds [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt
+ 0.00% 0.00% msgr-worker-1 libceph-common.so.0 [.] ceph::logging::Log::create_entry
Children Self Command Shared Object Symbol
+ 8.29% 0.00% ms_dispatch libstdc++.so.6.0.24 [.] virtual thunk to std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+ 7.55% 1.46% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::_M_insert<long>
+ 3.87% 0.00% fn_anonymous libstdc++.so.6.0.24 [.] std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+ 2.92% 0.00% md_submit libstdc++.so.6.0.24 [.] virtual thunk to std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+ 2.38% 0.00% fn_anonymous libstdc++.so.6.0.24 [.] virtual thunk to std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+ 2.22% 2.22% fn_anonymous libstdc++.so.6.0.24 [.] std::ostream::sentry::sentry
+ 1.89% 0.13% fn_anonymous libstdc++.so.6.0.24 [.] std::__ostream_insert<char, std::char_traits<char> >
+ 0.71% 0.00% ms_dispatch libstdc++.so.6.0.24 [.] std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+ 0.39% 0.06% fn_anonymous libstdc++.so.6.0.24 [.] std::ostream::_M_insert<long>
+ 0.29% 0.21% ms_dispatch libstdc++.so.6.0.24 [.] std::__ostream_insert<char, std::char_traits<char> >
+ 0.27% 0.27% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::sentry::~sentry
+ 0.27% 0.27% fn_anonymous libstdc++.so.6.0.24 [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<long>
+ 0.22% 0.22% ms_dispatch libstdc++.so.6.0.24 [.] std::basic_streambuf<char, std::char_traits<char> >::xsputn
+ 0.20% 0.20% ms_dispatch libstdc++.so.6.0.24 [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<long>
+ 0.15% 0.15% fn_anonymous libstdc++.so.6.0.24 [.] std::ostream::sentry::~sentry
+ 0.14% 0.14% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::sentry::sentry
+ 0.13% 0.00% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::_M_insert<unsigned long>
+ 0.13% 0.13% fn_anonymous libstdc++.so.6.0.24 [.] std::basic_streambuf<char, std::char_traits<char> >::xsputn
+ 0.00% 0.00% fn_anonymous libstdc++.so.6.0.24 [.] std::ostream::_M_insert<unsigned long>
+ 0.00% 0.00% ms_dispatch libstdc++.so.6.0.24 [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<unsigned long>
And the unittest_log time:
$ bin/unittest_log
[==========] Running 15 tests from 1 test case
[----------] Global test environment set-up
[----------] 15 tests from Log
[ RUN ] Log.Simple
[ OK ] Log.Simple (0 ms)
[ RUN ] Log.ReuseBad
[ OK ] Log.ReuseBad (1 ms)
[ RUN ] Log.ManyNoGather
[ OK ] Log.ManyNoGather (0 ms)
[ RUN ] Log.ManyGatherLog
[ OK ] Log.ManyGatherLog (12 ms)
[ RUN ] Log.ManyGatherLogStringAssign
[ OK ] Log.ManyGatherLogStringAssign (27 ms)
[ RUN ] Log.ManyGatherLogStringAssignWithReserve
[ OK ] Log.ManyGatherLogStringAssignWithReserve (27 ms)
[ RUN ] Log.ManyGatherLogPrebuf
[ OK ] Log.ManyGatherLogPrebuf (15 ms)
[ RUN ] Log.ManyGatherLogPrebufOverflow
[ OK ] Log.ManyGatherLogPrebufOverflow (15 ms)
[ RUN ] Log.ManyGather
[ OK ] Log.ManyGather (8 ms)
[ RUN ] Log.InternalSegv
[WARNING] /home/pdonnell/cephfs-shell/src/googletest/googletest/src/gtest-death-test.cc:836:: Death tests use fork(), which is unsafe particularly in a threaded context. For this test, Google Test detected 3 threads
[ OK ] Log.InternalSegv (8 ms)
[ RUN ] Log.LargeLog
[ OK ] Log.LargeLog (43 ms)
[ RUN ] Log.TimeSwitch
[ OK ] Log.TimeSwitch (1 ms)
[ RUN ] Log.TimeFormat
[ OK ] Log.TimeFormat (0 ms)
[ RUN ] Log.Speed_gather
[ OK ] Log.Speed_gather (1779 ms)
[ RUN ] Log.Speed_nogather
[ OK ] Log.Speed_nogather (64 ms)
[----------] 15 tests from Log (2000 ms total)
[----------] Global test environment tear-down
[==========] 15 tests from 1 test case ran. (2000 ms total)
[ PASSED ] 15 tests
** After Patch **
The StackStreamBuf uses 4k for its default buffer. This appears to be more than
reasonable for preventing allocations for logging
== Heap profile: ==
$ google-pprof --alloc_objects --text bin/ceph-mds out/mds.a.profile.0001.heap
Total:
1052127 objects
384957 36.6% 36.6% 384957 36.6% __gnu_cxx::new_allocator::allocate (inline)
274720 26.1% 62.7% 274720 26.1% __gnu_cxx::__aligned_membuf::_M_addr (inline)
68496 6.5% 69.2% 68496 6.5% std::__cxx11::basic_string::_M_mutate
44140 4.2% 73.4% 51828 4.9% std::pair::pair (inline)
43091 4.1% 77.5% 43091 4.1% ceph::buffer::raw_combined::create (inline)
27706 2.6% 80.1% 236407 22.5% std::__cxx11::list::_M_insert (inline)
25699 2.4% 82.6% 25699 2.4% std::__cxx11::basic_string::reserve
23183 2.2% 84.8% 23183 2.2% mempool::pool_allocator::allocate (inline)
19466 1.9% 86.6% 81716 7.8% __gnu_cxx::new_allocator::construct (inline)
17606 1.7% 88.3% 17606 1.7% std::__cxx11::basic_string::_M_capacity (inline)
16879 1.6% 89.9% 16881 1.6% std::swap (inline)
8572 0.8% 90.7% 8574 0.8% std::_Deque_base::_M_initialize_map (inline)
8477 0.8% 91.5% 11808 1.1% mempool::pool_allocator::construct (inline)
6166 0.6% 92.1% 6166 0.6% Journaler::wrap_finisher
6080 0.6% 92.7% 134975 12.8% std::map::operator[] (inline)
6079 0.6% 93.3% 6079 0.6% MutationImpl::add_projected_fnode
== Profile data ==
Samples: 62K of event 'cycles:u', Event count (approx.)
Overhead Command Shared Object Symbol
+ 5.91% log libc-2.23.so [.] vfprintf
+ 5.75% ms_dispatch libstdc++.so.6.0.24 [.] std::__ostream_insert<char, std::char_traits<char> >
+ 4.59% ms_dispatch ceph-mds [.] StackStringBuf<4096ul>::xsputn
+ 4.26% ms_dispatch libc-2.23.so [.] __memmove_ssse3_back
+ 4.07% log libceph-common.so.0 [.] ceph::logging::Log::_flush
+ 2.48% ms_dispatch libstdc++.so.6.0.24 [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<l
+ 2.13% fn_anonymous libstdc++.so.6.0.24 [.] std::__ostream_insert<char, std::char_traits<char> >
+ 2.09% ms_dispatch ceph-mds [.] CDir::check_rstats
+ 2.06% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::sentry::sentry
+ 1.98% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::sentry::~sentry
+ 1.87% log libc-2.23.so [.] __strcpy_sse2_unaligned
+ 1.60% fn_anonymous ceph-mds [.] StackStringBuf<4096ul>::xsputn
+ 1.46% log libc-2.23.so [.] _IO_default_xsputn
+ 1.45% log libc-2.23.so [.] _itoa_word
+ 1.43% fn_anonymous libc-2.23.so [.] __memmove_ssse3_back
+ 1.40% ms_dispatch libstdc++.so.6.0.24 [.] std::ostream::_M_insert<long>
+ 0.98% fn_anonymous libstdc++.so.6.0.24 [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<l
+ 0.89% ms_dispatch libstdc++.so.6.0.24 [.] 0x
+ 0.88% ms_dispatch libstdc++.so.6.0.24 [.] std::_Rb_tree_increment
And the unittest_log time:
$ bin/unittest_log
[==========] Running 13 tests from 1 test case.
[----------] Global test environment set-up.
[----------] 13 tests from Log
[ RUN ] Log.Simple
[ OK ] Log.Simple (1 ms)
[ RUN ] Log.ReuseBad
[ OK ] Log.ReuseBad (0 ms)
[ RUN ] Log.ManyNoGather
[ OK ] Log.ManyNoGather (0 ms)
[ RUN ] Log.ManyGatherLog
[ OK ] Log.ManyGatherLog (83 ms)
[ RUN ] Log.ManyGatherLogStringAssign
[ OK ] Log.ManyGatherLogStringAssign (79 ms)
[ RUN ] Log.ManyGatherLogStackSpillover
[ OK ] Log.ManyGatherLogStackSpillover (81 ms)
[ RUN ] Log.ManyGather
[ OK ] Log.ManyGather (80 ms)
[ RUN ] Log.InternalSegv
[WARNING] /home/pdonnell/ceph/src/googletest/googletest/src/gtest-death-test.cc:836:: Death tests use fork(), which is unsafe particularly in a threaded context. For this test, Google Test detected 3 threads.
[ OK ] Log.InternalSegv (7 ms)
[ RUN ] Log.LargeLog
[ OK ] Log.LargeLog (55 ms)
[ RUN ] Log.TimeSwitch
[ OK ] Log.TimeSwitch (4 ms)
[ RUN ] Log.TimeFormat
[ OK ] Log.TimeFormat (1 ms)
[ RUN ] Log.Speed_gather
[ OK ] Log.Speed_gather (1441 ms)
[ RUN ] Log.Speed_nogather
[ OK ] Log.Speed_nogather (63 ms)
[----------] 13 tests from Log (1895 ms total)
[----------] Global test environment tear-down
[==========] 13 tests from 1 test case ran. (1895 ms total)
[ PASSED ] 13 tests.
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
m_fsid = std::string(&buf[0]);
}
-void Graylog::log_entry(Entry const * const e)
+void Graylog::log_entry(const Entry& e)
{
if (m_log_dst_valid) {
- std::string s = e->get_str();
+ auto s = e.strv();
m_formatter->open_object_section("");
m_formatter->dump_string("version", "1.1");
m_formatter->dump_string("host", m_hostname);
m_formatter->dump_string("short_message", s);
m_formatter->dump_string("_app", "ceph");
- auto t = ceph::logging::log_clock::to_timeval(e->m_stamp);
+ auto t = ceph::logging::log_clock::to_timeval(e.m_stamp);
m_formatter->dump_float("timestamp", t.tv_sec + (t.tv_usec / 1000000.0));
- m_formatter->dump_unsigned("_thread", (uint64_t)e->m_thread);
- m_formatter->dump_int("_level", e->m_prio);
+ m_formatter->dump_unsigned("_thread", (uint64_t)e.m_thread);
+ m_formatter->dump_int("_level", e.m_prio);
if (m_subs != NULL)
- m_formatter->dump_string("_subsys_name", m_subs->get_name(e->m_subsys));
- m_formatter->dump_int("_subsys_id", e->m_subsys);
+ m_formatter->dump_string("_subsys_name", m_subs->get_name(e.m_subsys));
+ m_formatter->dump_int("_subsys_id", e.m_subsys);
m_formatter->dump_string("_fsid", m_fsid);
m_formatter->dump_string("_logger", m_logger);
m_formatter->close_section();
namespace logging {
-struct Entry;
+class Entry;
class SubsystemMap;
// Graylog logging backend: Convert log datastructures (LogEntry, Entry) to
void set_destination(const std::string& host, int port);
- void log_entry(Entry const * const e);
+ void log_entry(const Entry& e);
void log_log_entry(LogEntry const * const e);
typedef std::shared_ptr<Graylog> Ref;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef COMMON_STACKSTRINGSTREAM_H
+#define COMMON_STACKSTRINGSTREAM_H
+
+#include <boost/container/small_vector.hpp>
+
+#include <algorithm>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <ostream>
+#include <sstream>
+#include <string_view>
+#include <vector>
+
+template<std::size_t SIZE>
+class StackStringBuf : public std::basic_streambuf<char>
+{
+public:
+ StackStringBuf() = default;
+ StackStringBuf(const StackStringBuf&) = delete;
+ StackStringBuf& operator=(const StackStringBuf&) = delete;
+ StackStringBuf(StackStringBuf&& o) = delete;
+ StackStringBuf& operator=(StackStringBuf&& o) = delete;
+ ~StackStringBuf() override = default;
+
+ void push(std::string_view sv)
+ {
+ vec.reserve(vec.size() + sv.size());
+ vec.insert(vec.end(), sv.begin(), sv.end());
+ }
+
+ void clear()
+ {
+ vec.clear();
+ }
+
+ std::string_view strv() const
+ {
+ return std::string_view(vec.data(), vec.size());
+ }
+
+protected:
+ std::streamsize xsputn(const char *s, std::streamsize n)
+ {
+ push(std::string_view(s, n));
+ return n;
+ }
+
+ int overflow(int c)
+ {
+ if (traits_type::not_eof(c)) {
+ char str = traits_type::to_char_type(c);
+ push(std::string_view(&str, 1));
+ return c;
+ }
+ return EOF;
+ }
+
+private:
+
+ boost::container::small_vector<char, SIZE> vec;
+};
+
+template<std::size_t SIZE>
+class StackStringStream : public std::basic_ostream<char>
+{
+public:
+ StackStringStream() : basic_ostream<char>(&ssb) {}
+ StackStringStream(const StackStringStream& o) = delete;
+ StackStringStream& operator=(const StackStringStream& o) = delete;
+ StackStringStream(StackStringStream&& o) = delete;
+ StackStringStream& operator=(StackStringStream&& o) = delete;
+ ~StackStringStream() override = default;
+
+ void clear() {
+ basic_ostream<char>::clear();
+ ssb.clear();
+ }
+
+ std::string_view strv() const {
+ return ssb.strv();
+ }
+
+private:
+ StackStringBuf<SIZE> ssb;
+};
+
+/* In an ideal world, we could use StackStringStream indiscriminately, but alas
+ * it's very expensive to construct/destruct. So, we cache them in a
+ * thread_local vector. DO NOT share these with other threads. The copy/move
+ * constructors are deliberately restrictive to make this more difficult to
+ * accidentally do.
+ */
+class CachedStackStringStream {
+public:
+ using sss = StackStringStream<4096>;
+ using osptr = std::unique_ptr<sss>;
+
+ CachedStackStringStream() {
+ if (cache.destructed || cache.c.empty()) {
+ osp = std::make_unique<sss>();
+ } else {
+ osp = std::move(cache.c.back());
+ cache.c.pop_back();
+ osp->clear();
+ }
+ }
+ CachedStackStringStream(const CachedStackStringStream&) = delete;
+ CachedStackStringStream& operator=(const CachedStackStringStream&) = delete;
+ CachedStackStringStream(CachedStackStringStream&&) = delete;
+ CachedStackStringStream& operator=(CachedStackStringStream&&) = delete;
+ ~CachedStackStringStream() {
+ if (!cache.destructed && cache.c.size() < max_elems) {
+ cache.c.emplace_back(std::move(osp));
+ }
+ }
+
+ sss& get_stream() {
+ return *osp;
+ }
+ const sss& get_stream() const {
+ return *osp;
+ }
+
+private:
+ static constexpr std::size_t max_elems = 8;
+
+ /* The thread_local cache may be destructed before other static structures.
+ * If those destructors try to create a CachedStackStringStream (e.g. for
+ * logging) and access this cache, that access will be undefined. So note if
+ * the cache has been destructed and check before use.
+ */
+ struct Cache {
+ using container = std::vector<osptr>;
+
+ Cache() {}
+ ~Cache() { destructed = true; }
+
+ container c;
+ bool destructed = false;
+ };
+
+ inline static thread_local Cache cache;
+ osptr osp;
+};
+
+#endif
}(cct); \
\
if (should_gather) { \
- static size_t _log_exp_length = 80; \
- ceph::logging::Entry *_dout_e = \
- cct->_log->create_entry(v, sub, &_log_exp_length); \
+ ceph::logging::MutableEntry _dout_e(v, sub); \
static_assert(std::is_convertible<decltype(&*cct), \
CephContext* >::value, \
"provided cct must be compatible with CephContext*"); \
auto _dout_cct = cct; \
- std::ostream* _dout = &_dout_e->get_ostream();
+ std::ostream* _dout = &_dout_e.get_ostream();
-#define dendl_impl std::flush; \
- _dout_cct->_log->submit_entry(_dout_e); \
- } \
+#define dendl_impl std::flush; \
+ _dout_cct->_log->submit_entry(std::move(_dout_e)); \
+ } \
} while (0)
#endif // WITH_SEASTAR
${PROJECT_SOURCE_DIR}/src/common/utf8.c
${PROJECT_SOURCE_DIR}/src/common/version.cc
${PROJECT_SOURCE_DIR}/src/common/BackTrace.cc
- ${PROJECT_SOURCE_DIR}/src/common/CachedPrebufferedStreambuf.cc
${PROJECT_SOURCE_DIR}/src/common/ConfUtils.cc
${PROJECT_SOURCE_DIR}/src/common/DecayCounter.cc
${PROJECT_SOURCE_DIR}/src/common/HTMLFormatter.cc
#ifndef __CEPH_LOG_ENTRY_H
#define __CEPH_LOG_ENTRY_H
-#include "common/CachedPrebufferedStreambuf.h"
-#include <pthread.h>
-#include <string>
#include "log/LogClock.h"
+#include "common/StackStringStream.h"
-namespace ceph {
-namespace logging {
+#include "boost/container/small_vector.hpp"
-struct Entry {
- log_time m_stamp;
- pthread_t m_thread;
- short m_prio, m_subsys;
- Entry *m_next;
+#include <pthread.h>
- size_t m_buf_len;
- size_t* m_exp_len;
- char m_static_buf[1];
+#include <string_view>
- prebuffered_data m_data;
- CachedPrebufferedStreambuf* m_streambuf;
+namespace ceph {
+namespace logging {
- Entry()
- : Entry(log_time{}, 0, 0, 0, nullptr)
- {}
- Entry(log_time s, pthread_t t, short pr, short sub,
- const char *msg = NULL)
- : Entry(s, t, pr, sub, m_static_buf, sizeof(m_static_buf), nullptr,
- msg)
+class Entry {
+public:
+ using time = log_time;
+
+ Entry() = delete;
+ Entry(short pr, short sub) :
+ m_stamp(clock().now()),
+ m_thread(pthread_self()),
+ m_prio(pr),
+ m_subsys(sub)
{}
- Entry(log_time s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len,
- const char *msg = NULL)
- : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub),
- m_next(NULL),
- m_buf_len(buf_len),
- m_exp_len(exp_len),
- m_data(buf, buf_len),
- m_streambuf(CachedPrebufferedStreambuf::create(&m_data))
- {
- if (msg) {
- get_ostream() << msg;
- }
- }
+ Entry(const Entry &) = default;
+ Entry& operator=(const Entry &) = default;
+ Entry(Entry &&e) = default;
+ Entry& operator=(Entry &&e) = default;
+ virtual ~Entry() = default;
+
+ virtual std::string_view strv() const = 0;
+ virtual std::size_t size() const = 0;
+
+ time m_stamp;
+ pthread_t m_thread;
+ short m_prio, m_subsys;
private:
- ~Entry() = default;
+ static log_clock& clock() {
+ static log_clock clock;
+ return clock;
+ }
+};
+/* This should never be moved to the heap! Only allocate this on the stack. See
+ * CachedStackStringStream for rationale.
+ */
+class MutableEntry : public Entry {
public:
+ MutableEntry() = delete;
+ MutableEntry(short pr, short sub) : Entry(pr, sub) {}
+ MutableEntry(const MutableEntry&) = delete;
+ MutableEntry& operator=(const MutableEntry&) = delete;
+ MutableEntry(MutableEntry&&) = delete;
+ MutableEntry& operator=(MutableEntry&&) = delete;
+ ~MutableEntry() override = default;
+
std::ostream& get_ostream() {
- return m_streambuf->get_ostream();
+ return cos.get_stream();
}
- // function improves estimate for expected size of message
- void hint_size() {
- if (m_exp_len != NULL) {
- size_t size = m_data.size();
- if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) {
- //log larger then expected, just expand
- __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED);
- }
- else {
- //asymptotically adapt expected size to message size
- __atomic_store_n(m_exp_len, (size + 10 + m_buf_len*31) / 32, __ATOMIC_RELAXED);
- }
- }
+ std::string_view strv() const override {
+ return cos.get_stream().strv();
}
-
- void set_str(const std::string &s) {
- get_ostream() << s;
+ std::size_t size() const override {
+ return cos.get_stream().strv().size();
}
- std::string get_str() const {
- return m_data.get_str();
- }
+private:
+ CachedStackStringStream cos;
+};
- // returns current size of content
- size_t size() const {
- return m_data.size();
+class ConcreteEntry : public Entry {
+public:
+ ConcreteEntry() = delete;
+ ConcreteEntry(const Entry& e) : Entry(e) {
+ auto strv = e.strv();
+ str.reserve(strv.size());
+ str.insert(str.end(), strv.begin(), strv.end());
}
-
- // extracts up to avail chars of content
- int snprintf(char* dst, size_t avail) const {
- return m_data.snprintf(dst, avail);
+ ConcreteEntry& operator=(const Entry& e) {
+ Entry::operator=(e);
+ auto strv = e.strv();
+ str.reserve(strv.size());
+ str.assign(strv.begin(), strv.end());
+ return *this;
}
-
- void finish() {
- m_streambuf->finish();
+ ConcreteEntry(ConcreteEntry&& e) : Entry(e), str(std::move(e.str)) {}
+ ConcreteEntry& operator=(ConcreteEntry&& e) {
+ Entry::operator=(e);
+ str = std::move(e.str);
+ return *this;
}
+ ~ConcreteEntry() override = default;
- void destroy() {
- if (m_exp_len != NULL) {
- this->~Entry();
- ::operator delete(this);
- } else {
- delete(this);
- }
+ std::string_view strv() const override {
+ return std::string_view(str.data(), str.size());
}
+ std::size_t size() const override {
+ return str.size();
+ }
+
+private:
+ boost::container::small_vector<char, 4096> str;
};
}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef __CEPH_LOG_ENTRYQUEUE_H
-#define __CEPH_LOG_ENTRYQUEUE_H
-
-#include "Entry.h"
-
-namespace ceph {
-namespace logging {
-
-struct EntryQueue {
- int m_len;
- struct Entry *m_head, *m_tail;
-
- bool empty() const {
- return m_len == 0;
- }
-
- void swap(EntryQueue& other) {
- int len = m_len;
- struct Entry *h = m_head, *t = m_tail;
- m_len = other.m_len;
- m_head = other.m_head;
- m_tail = other.m_tail;
- other.m_len = len;
- other.m_head = h;
- other.m_tail = t;
- }
-
- void enqueue(Entry *e) {
- if (m_tail) {
- m_tail->m_next = e;
- m_tail = e;
- } else {
- m_head = m_tail = e;
- }
- m_len++;
- }
-
- Entry *dequeue() {
- if (!m_head)
- return NULL;
- Entry *e = m_head;
- m_head = m_head->m_next;
- if (!m_head)
- m_tail = NULL;
- m_len--;
- e->m_next = NULL;
- return e;
- }
-
- EntryQueue()
- : m_len(0),
- m_head(NULL),
- m_tail(NULL)
- {}
- ~EntryQueue() {
- Entry *t;
- while (m_head) {
- t = m_head->m_next;
- m_head->destroy();
- m_head = t;
- }
- }
-};
-
-}
-}
-
-#endif
#include "Log.h"
-#include <errno.h>
-#include <syslog.h>
-
#include "common/errno.h"
#include "common/safe_io.h"
-#include "common/Clock.h"
#include "common/Graylog.h"
#include "common/valgrind.h"
#include "LogClock.h"
#include "SubsystemMap.h"
-#define DEFAULT_MAX_NEW 100
-#define DEFAULT_MAX_RECENT 10000
+#include <errno.h>
+#include <syslog.h>
+
+#include <iostream>
-#define PREALLOC 1000000
#define MAX_LOG_BUF 65536
namespace ceph {
}
Log::Log(const SubsystemMap *s)
- : m_indirect_this(NULL),
+ : m_indirect_this(nullptr),
m_subs(s),
- m_queue_mutex_holder(0),
- m_flush_mutex_holder(0),
- m_new(), m_recent(),
- m_fd(-1),
- m_uid(0),
- m_gid(0),
- m_fd_last_error(0),
- m_syslog_log(-2), m_syslog_crash(-2),
- m_stderr_log(1), m_stderr_crash(-1),
- m_graylog_log(-3), m_graylog_crash(-3),
- m_log_buf(nullptr), m_log_buf_pos(0),
- m_stop(false),
- m_max_new(DEFAULT_MAX_NEW),
- m_max_recent(DEFAULT_MAX_RECENT),
- m_inject_segv(false)
+ m_recent(DEFAULT_MAX_RECENT)
{
- int ret;
-
- ret = pthread_mutex_init(&m_flush_mutex, NULL);
- ceph_assert(ret == 0);
-
- ret = pthread_mutex_init(&m_queue_mutex, NULL);
- ceph_assert(ret == 0);
-
- ret = pthread_cond_init(&m_cond_loggers, NULL);
- ceph_assert(ret == 0);
-
- ret = pthread_cond_init(&m_cond_flusher, NULL);
- ceph_assert(ret == 0);
-
- m_log_buf = (char*)malloc(MAX_LOG_BUF);
-
- // kludge for prealloc testing
- if (false)
- for (int i=0; i < PREALLOC; i++)
- m_recent.enqueue(new Entry);
+ m_log_buf.reserve(MAX_LOG_BUF);
}
Log::~Log()
{
if (m_indirect_this) {
- *m_indirect_this = NULL;
+ *m_indirect_this = nullptr;
}
ceph_assert(!is_started());
if (m_fd >= 0)
VOID_TEMP_FAILURE_RETRY(::close(m_fd));
- free(m_log_buf);
-
- pthread_mutex_destroy(&m_queue_mutex);
- pthread_mutex_destroy(&m_flush_mutex);
- pthread_cond_destroy(&m_cond_loggers);
- pthread_cond_destroy(&m_cond_flusher);
}
///
void Log::set_coarse_timestamps(bool coarse) {
+ std::scoped_lock lock(m_flush_mutex);
if (coarse)
clock.coarsen();
else
void Log::set_flush_on_exit()
{
+ std::scoped_lock lock(m_flush_mutex);
// Make sure we flush on shutdown. We do this by deliberately
// leaking an indirect pointer to ourselves (on_exit() can't
// unregister a callback). This is not racy only becuase we
}
}
-void Log::set_max_new(int n)
+void Log::set_max_new(std::size_t n)
{
+ std::scoped_lock lock(m_queue_mutex);
m_max_new = n;
}
-void Log::set_max_recent(int n)
+void Log::set_max_recent(std::size_t n)
{
- pthread_mutex_lock(&m_flush_mutex);
- m_flush_mutex_holder = pthread_self();
+ std::scoped_lock lock(m_flush_mutex);
m_max_recent = n;
- m_flush_mutex_holder = 0;
- pthread_mutex_unlock(&m_flush_mutex);
}
-void Log::set_log_file(string fn)
+void Log::set_log_file(std::string_view fn)
{
+ std::scoped_lock lock(m_flush_mutex);
m_log_file = fn;
}
-void Log::set_log_stderr_prefix(const std::string& p)
+void Log::set_log_stderr_prefix(std::string_view p)
{
+ std::scoped_lock lock(m_flush_mutex);
m_log_stderr_prefix = p;
}
void Log::reopen_log_file()
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock(m_flush_mutex);
m_flush_mutex_holder = pthread_self();
if (m_fd >= 0)
VOID_TEMP_FAILURE_RETRY(::close(m_fd));
int r = ::fchown(m_fd, m_uid, m_gid);
if (r < 0) {
r = -errno;
- cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r)
+ std::cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r)
<< std::endl;
}
}
m_fd = -1;
}
m_flush_mutex_holder = 0;
- pthread_mutex_unlock(&m_flush_mutex);
}
void Log::chown_log_file(uid_t uid, gid_t gid)
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock(m_flush_mutex);
if (m_fd >= 0) {
int r = ::fchown(m_fd, uid, gid);
if (r < 0) {
r = -errno;
- cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r)
+ std::cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r)
<< std::endl;
}
}
- pthread_mutex_unlock(&m_flush_mutex);
}
void Log::set_syslog_level(int log, int crash)
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock(m_flush_mutex);
m_syslog_log = log;
m_syslog_crash = crash;
- pthread_mutex_unlock(&m_flush_mutex);
}
void Log::set_stderr_level(int log, int crash)
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock(m_flush_mutex);
m_stderr_log = log;
m_stderr_crash = crash;
- pthread_mutex_unlock(&m_flush_mutex);
}
void Log::set_graylog_level(int log, int crash)
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock(m_flush_mutex);
m_graylog_log = log;
m_graylog_crash = crash;
- pthread_mutex_unlock(&m_flush_mutex);
}
void Log::start_graylog()
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock(m_flush_mutex);
if (! m_graylog.get())
m_graylog = std::make_shared<Graylog>(m_subs, "dlog");
- pthread_mutex_unlock(&m_flush_mutex);
}
void Log::stop_graylog()
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock(m_flush_mutex);
m_graylog.reset();
- pthread_mutex_unlock(&m_flush_mutex);
}
-void Log::submit_entry(Entry *e)
+void Log::submit_entry(Entry&& e)
{
- e->finish();
-
- pthread_mutex_lock(&m_queue_mutex);
+ std::unique_lock lock(m_queue_mutex);
m_queue_mutex_holder = pthread_self();
- if (m_inject_segv)
+ if (unlikely(m_inject_segv))
*(volatile int *)(0) = 0xdead;
// wait for flush to catch up
- while (m_new.m_len > m_max_new)
- pthread_cond_wait(&m_cond_loggers, &m_queue_mutex);
-
- m_new.enqueue(e);
- pthread_cond_signal(&m_cond_flusher);
- m_queue_mutex_holder = 0;
- pthread_mutex_unlock(&m_queue_mutex);
-}
-
-
-Entry *Log::create_entry(int level, int subsys, const char* msg)
-{
- if (true) {
- return new Entry(clock.now(),
- pthread_self(),
- level, subsys, msg);
- } else {
- // kludge for perf testing
- Entry *e = m_recent.dequeue();
- e->m_stamp = clock.now();
- e->m_thread = pthread_self();
- e->m_prio = level;
- e->m_subsys = subsys;
- return e;
+ while (m_new.size() > m_max_new) {
+ if (m_stop) break; // force addition
+ m_cond_loggers.wait(lock);
}
-}
-Entry *Log::create_entry(int level, int subsys, size_t* expected_size)
-{
- if (true) {
- ANNOTATE_BENIGN_RACE_SIZED(expected_size, sizeof(*expected_size),
- "Log hint");
- size_t size = __atomic_load_n(expected_size, __ATOMIC_RELAXED);
- void *ptr = ::operator new(sizeof(Entry) + size);
- return new(ptr) Entry(clock.now(),
- pthread_self(), level, subsys,
- reinterpret_cast<char*>(ptr) + sizeof(Entry), size, expected_size);
- } else {
- // kludge for perf testing
- Entry *e = m_recent.dequeue();
- e->m_stamp = clock.now();
- e->m_thread = pthread_self();
- e->m_prio = level;
- e->m_subsys = subsys;
- return e;
- }
+ m_new.emplace_back(std::move(e));
+ m_cond_flusher.notify_all();
+ m_queue_mutex_holder = 0;
}
void Log::flush()
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock1(m_flush_mutex);
m_flush_mutex_holder = pthread_self();
- pthread_mutex_lock(&m_queue_mutex);
- m_queue_mutex_holder = pthread_self();
- EntryQueue t;
- t.swap(m_new);
- pthread_cond_broadcast(&m_cond_loggers);
- m_queue_mutex_holder = 0;
- pthread_mutex_unlock(&m_queue_mutex);
- _flush(&t, &m_recent, false);
- // trim
- while (m_recent.m_len > m_max_recent) {
- m_recent.dequeue()->destroy();
+ {
+ std::scoped_lock lock2(m_queue_mutex);
+ m_queue_mutex_holder = pthread_self();
+ assert(m_flush.empty());
+ m_flush.swap(m_new);
+ m_cond_loggers.notify_all();
+ m_queue_mutex_holder = 0;
}
+ _flush(m_flush, true, false);
m_flush_mutex_holder = 0;
- pthread_mutex_unlock(&m_flush_mutex);
}
-void Log::_log_safe_write(const char* what, size_t write_len)
+void Log::_log_safe_write(std::string_view sv)
{
if (m_fd < 0)
return;
- int r = safe_write(m_fd, what, write_len);
+ int r = safe_write(m_fd, sv.data(), sv.size());
if (r != m_fd_last_error) {
if (r < 0)
- cerr << "problem writing to " << m_log_file
+ std::cerr << "problem writing to " << m_log_file
<< ": " << cpp_strerror(r)
<< std::endl;
m_fd_last_error = r;
void Log::_flush_logbuf()
{
- if (m_log_buf_pos) {
- _log_safe_write(m_log_buf, m_log_buf_pos);
- m_log_buf_pos = 0;
+ if (m_log_buf.size()) {
+ _log_safe_write(std::string_view(m_log_buf.data(), m_log_buf.size()));
+ m_log_buf.resize(0);
}
}
-void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
+void Log::_flush(EntryVector& t, bool requeue, bool crash)
{
- Entry *e = nullptr;
long len = 0;
if (crash) {
- len = t->m_len;
+ len = t.size();
}
- if (!requeue) {
- e = t->m_head;
- if (!e) {
- return;
- }
+ if (!requeue && t.empty()) {
+ return;
}
- while (true) {
- if (requeue) {
- e = t->dequeue();
- if (!e) {
- break;
- }
- requeue->enqueue(e);
- } else {
- e = e->m_next;
- if (!e) {
- break;
- }
- }
-
- unsigned sub = e->m_subsys;
-
- bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio;
+ for (auto& e : t) {
+ auto prio = e.m_prio;
+ auto stamp = e.m_stamp;
+ auto sub = e.m_subsys;
+ auto thread = e.m_thread;
+ auto str = e.strv();
+
+ bool should_log = crash || m_subs->get_log_level(sub) >= prio;
bool do_fd = m_fd >= 0 && should_log;
- bool do_syslog = m_syslog_crash >= e->m_prio && should_log;
- bool do_stderr = m_stderr_crash >= e->m_prio && should_log;
- bool do_graylog2 = m_graylog_crash >= e->m_prio && should_log;
+ bool do_syslog = m_syslog_crash >= prio && should_log;
+ bool do_stderr = m_stderr_crash >= prio && should_log;
+ bool do_graylog2 = m_graylog_crash >= prio && should_log;
- e->hint_size();
if (do_fd || do_syslog || do_stderr) {
- size_t line_used = 0;
+ const std::size_t cur = m_log_buf.size();
+ std::size_t used = 0;
+ const std::size_t allocated = e.size() + 80;
+ m_log_buf.resize(cur + allocated);
- char *line;
- size_t line_size = 80 + e->size();
- bool need_dynamic = line_size >= MAX_LOG_BUF;
-
- // this flushes the existing buffers if either line is longer
- // than our buffer, or buffer is too full to fit it
- if (m_log_buf_pos + line_size >= MAX_LOG_BUF) {
- _flush_logbuf();
- }
- if (need_dynamic) {
- line = new char[line_size];
- } else {
- line = &m_log_buf[m_log_buf_pos];
- }
+ char* const start = m_log_buf.data();
+ char* pos = start + cur;
if (crash) {
- line_used += snprintf(line, line_size, "%6ld> ", -(--len));
+ used += (std::size_t)snprintf(pos + used, allocated - used, "%6ld> ", -(--len));
}
- line_used += append_time(e->m_stamp, line + line_used, line_size - line_used);
- line_used += snprintf(line + line_used, line_size - line_used, " %lx %2d ",
- (unsigned long)e->m_thread, e->m_prio);
-
- line_used += e->snprintf(line + line_used, line_size - line_used - 1);
- ceph_assert(line_used < line_size - 1);
+ used += (std::size_t)append_time(stamp, pos + used, allocated - used);
+ used += (std::size_t)snprintf(pos + used, allocated - used, " %lx %2d ", (unsigned long)thread, prio);
+ memcpy(pos + used, str.data(), str.size());
+ used += str.size();
+ pos[used] = '\0';
+ ceph_assert((used + 1 /* '\n' */) < allocated);
if (do_syslog) {
- syslog(LOG_USER|LOG_INFO, "%s", line);
+ syslog(LOG_USER|LOG_INFO, "%s", pos);
}
if (do_stderr) {
- cerr << m_log_stderr_prefix << line << std::endl;
+ std::cerr << m_log_stderr_prefix << std::string_view(pos, used) << std::endl;
}
+ /* now add newline */
+ pos[used++] = '\n';
+
if (do_fd) {
- line[line_used] = '\n';
- if (need_dynamic) {
- _log_safe_write(line, line_used + 1);
- m_log_buf_pos = 0;
- } else {
- m_log_buf_pos += line_used + 1;
- }
+ m_log_buf.resize(cur + used);
} else {
- m_log_buf_pos = 0;
+ m_log_buf.resize(0);
}
- if (need_dynamic) {
- delete[] line;
+ if (m_log_buf.size() > MAX_LOG_BUF) {
+ _flush_logbuf();
}
}
m_graylog->log_entry(e);
}
+ if (requeue) {
+ m_recent.push_back(std::move(e));
+ }
}
+ t.clear();
_flush_logbuf();
-
}
void Log::_log_message(const char *s, bool crash)
b += '\n';
int r = safe_write(m_fd, b.c_str(), b.size());
if (r < 0)
- cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl;
+ std::cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl;
}
if ((crash ? m_syslog_crash : m_syslog_log) >= 0) {
syslog(LOG_USER|LOG_INFO, "%s", s);
}
if ((crash ? m_stderr_crash : m_stderr_log) >= 0) {
- cerr << s << std::endl;
+ std::cerr << s << std::endl;
}
}
void Log::dump_recent()
{
- pthread_mutex_lock(&m_flush_mutex);
+ std::scoped_lock lock1(m_flush_mutex);
m_flush_mutex_holder = pthread_self();
- pthread_mutex_lock(&m_queue_mutex);
- m_queue_mutex_holder = pthread_self();
-
- EntryQueue t;
- t.swap(m_new);
+ {
+ std::scoped_lock lock2(m_queue_mutex);
+ m_queue_mutex_holder = pthread_self();
+ assert(m_flush.empty());
+ m_flush.swap(m_new);
+ m_queue_mutex_holder = 0;
+ }
- m_queue_mutex_holder = 0;
- pthread_mutex_unlock(&m_queue_mutex);
- _flush(&t, &m_recent, false);
+ _flush(m_flush, true, false);
_flush_logbuf();
_log_message("--- begin dump of recent events ---", true);
- _flush(&m_recent, nullptr, true);
+ {
+ EntryVector t;
+ t.insert(t.end(), std::make_move_iterator(m_recent.begin()), std::make_move_iterator(m_recent.end()));
+ m_recent.clear();
+ _flush(t, false, true);
+ }
char buf[4096];
_log_message("--- logging levels ---", true);
_log_message(buf, true);
sprintf(buf, " %2d/%2d (stderr threshold)", m_stderr_log, m_stderr_crash);
_log_message(buf, true);
- sprintf(buf, " max_recent %9d", m_max_recent);
+ sprintf(buf, " max_recent %9zu", m_max_recent);
_log_message(buf, true);
- sprintf(buf, " max_new %9d", m_max_new);
+ sprintf(buf, " max_new %9zu", m_max_new);
_log_message(buf, true);
sprintf(buf, " log_file %s", m_log_file.c_str());
_log_message(buf, true);
_flush_logbuf();
m_flush_mutex_holder = 0;
- pthread_mutex_unlock(&m_flush_mutex);
}
void Log::start()
{
ceph_assert(!is_started());
- pthread_mutex_lock(&m_queue_mutex);
- m_stop = false;
- pthread_mutex_unlock(&m_queue_mutex);
+ {
+ std::scoped_lock lock(m_queue_mutex);
+ m_stop = false;
+ }
create("log");
}
void Log::stop()
{
if (is_started()) {
- pthread_mutex_lock(&m_queue_mutex);
- m_stop = true;
- pthread_cond_signal(&m_cond_flusher);
- pthread_cond_broadcast(&m_cond_loggers);
- pthread_mutex_unlock(&m_queue_mutex);
+ {
+ std::scoped_lock lock(m_queue_mutex);
+ m_stop = true;
+ m_cond_flusher.notify_one();
+ m_cond_loggers.notify_all();
+ }
join();
}
}
void *Log::entry()
{
- pthread_mutex_lock(&m_queue_mutex);
- m_queue_mutex_holder = pthread_self();
- while (!m_stop) {
- if (!m_new.empty()) {
- m_queue_mutex_holder = 0;
- pthread_mutex_unlock(&m_queue_mutex);
- flush();
- pthread_mutex_lock(&m_queue_mutex);
- m_queue_mutex_holder = pthread_self();
- continue;
- }
+ {
+ std::unique_lock lock(m_queue_mutex);
+ m_queue_mutex_holder = pthread_self();
+ while (!m_stop) {
+ if (!m_new.empty()) {
+ m_queue_mutex_holder = 0;
+ lock.unlock();
+ flush();
+ lock.lock();
+ m_queue_mutex_holder = pthread_self();
+ continue;
+ }
- pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);
+ m_cond_flusher.wait(lock);
+ }
+ m_queue_mutex_holder = 0;
}
- m_queue_mutex_holder = 0;
- pthread_mutex_unlock(&m_queue_mutex);
flush();
return NULL;
}
#ifndef __CEPH_LOG_LOG_H
#define __CEPH_LOG_LOG_H
+#include <boost/circular_buffer.hpp>
+
+#include <condition_variable>
#include <memory>
+#include <mutex>
+#include <queue>
+#include <string>
+#include <string_view>
#include "common/Thread.h"
+#include "common/likely.h"
-#include "EntryQueue.h"
+#include "log/Entry.h"
namespace ceph {
namespace logging {
class Graylog;
class SubsystemMap;
-class Entry;
class Log : private Thread
{
+ using EntryRing = boost::circular_buffer<ConcreteEntry>;
+ using EntryVector = std::vector<ConcreteEntry>;
+
+ static const std::size_t DEFAULT_MAX_NEW = 100;
+ static const std::size_t DEFAULT_MAX_RECENT = 10000;
+
Log **m_indirect_this;
log_clock clock;
const SubsystemMap *m_subs;
- pthread_mutex_t m_queue_mutex;
- pthread_mutex_t m_flush_mutex;
- pthread_cond_t m_cond_loggers;
- pthread_cond_t m_cond_flusher;
+ std::mutex m_queue_mutex;
+ std::mutex m_flush_mutex;
+ std::condition_variable m_cond_loggers;
+ std::condition_variable m_cond_flusher;
pthread_t m_queue_mutex_holder;
pthread_t m_flush_mutex_holder;
- EntryQueue m_new; ///< new entries
- EntryQueue m_recent; ///< recent (less new) entries we've already written at low detail
+ EntryVector m_new; ///< new entries
+ EntryRing m_recent; ///< recent (less new) entries we've already written at low detail
+ EntryVector m_flush; ///< entries to be flushed (here to optimize heap allocations)
std::string m_log_file;
- int m_fd;
- uid_t m_uid;
- gid_t m_gid;
+ int m_fd = -1;
+ uid_t m_uid = 0;
+ gid_t m_gid = 0;
- int m_fd_last_error; ///< last error we say writing to fd (if any)
+ int m_fd_last_error = 0; ///< last error we say writing to fd (if any)
- int m_syslog_log, m_syslog_crash;
- int m_stderr_log, m_stderr_crash;
- int m_graylog_log, m_graylog_crash;
+ int m_syslog_log = -2, m_syslog_crash = -2;
+ int m_stderr_log = -1, m_stderr_crash = -1;
+ int m_graylog_log = -3, m_graylog_crash = -3;
std::string m_log_stderr_prefix;
std::shared_ptr<Graylog> m_graylog;
- char* m_log_buf; ///< coalescing buffer
- int m_log_buf_pos; ///< where we're at within coalescing buffer
+ std::vector<char> m_log_buf;
- bool m_stop;
+ bool m_stop = false;
- int m_max_new, m_max_recent;
+ std::size_t m_max_new = DEFAULT_MAX_NEW;
+ std::size_t m_max_recent = DEFAULT_MAX_RECENT;
- bool m_inject_segv;
+ bool m_inject_segv = false;
void *entry() override;
- void _log_safe_write(const char* what, size_t write_len);
+ void _log_safe_write(std::string_view sv);
void _flush_logbuf();
- void _flush(EntryQueue *q, EntryQueue *requeue, bool crash);
+ void _flush(EntryVector& q, bool requeue, bool crash);
void _log_message(const char *s, bool crash);
void set_flush_on_exit();
void set_coarse_timestamps(bool coarse);
- void set_max_new(int n);
- void set_max_recent(int n);
- void set_log_file(std::string fn);
+ void set_max_new(std::size_t n);
+ void set_max_recent(std::size_t n);
+ void set_log_file(std::string_view fn);
void reopen_log_file();
void chown_log_file(uid_t uid, gid_t gid);
- void set_log_stderr_prefix(const std::string& p);
+ void set_log_stderr_prefix(std::string_view p);
void flush();
std::shared_ptr<Graylog> graylog() { return m_graylog; }
- Entry *create_entry(int level, int subsys, const char* msg = nullptr);
- Entry *create_entry(int level, int subsys, size_t* expected_size);
- void submit_entry(Entry *e);
+ void submit_entry(Entry&& e);
void start();
void stop();
int sys = i % 4;
int l = 5 + (i%4);
if (subs.should_gather(sys, l)) {
- Entry *e = log.create_entry(l, sys, "hello world");
- log.submit_entry(e);
+ MutableEntry e(l, sys);
+ log.submit_entry(std::move(e));
}
}
const int l = 0;
{
- auto e = log.create_entry(l, 1);
- auto& out = e->get_ostream();
+ MutableEntry e(l, 1);
+ auto& out = e.get_ostream();
out << (std::streambuf*)nullptr;
EXPECT_TRUE(out.bad()); // writing nullptr to a stream sets its badbit
- log.submit_entry(e);
+ log.submit_entry(std::move(e));
}
{
- auto e = log.create_entry(l, 1);
- auto& out = e->get_ostream();
+ MutableEntry e(l, 1);
+ auto& out = e.get_ostream();
EXPECT_FALSE(out.bad()); // should not see failures from previous log entry
out << "hello world";
- log.submit_entry(e);
+ log.submit_entry(std::move(e));
}
log.flush();
for (int i=0; i<many; i++) {
int l = 10;
if (subs.should_gather(1, l))
- log.submit_entry(log.create_entry(l, 1));
+ log.submit_entry(MutableEntry(1, 0));
}
log.flush();
log.stop();
TEST(Log, ManyGatherLog)
-{
- SubsystemMap subs;
- subs.set_log_level(1, 20);
- subs.set_gather_level(1, 10);
- Log log(&subs);
- log.start();
- log.set_log_file("/tmp/big");
- log.reopen_log_file();
- for (int i=0; i<many; i++) {
- int l = 10;
- if (subs.should_gather(1, l))
- log.submit_entry(log.create_entry(l, 1,
- "this is a long string asdf asdf asdf asdf asdf asdf asd fasd fasdf "));
- }
- log.flush();
- log.stop();
-}
-
-TEST(Log, ManyGatherLogStringAssign)
{
SubsystemMap subs;
subs.set_log_level(1, 20);
for (int i=0; i<many; i++) {
int l = 10;
if (subs.should_gather(1, l)) {
- Entry *e = log.create_entry(l, 1);
- ostringstream oss;
- oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
- e->set_str(oss.str());
- log.submit_entry(e);
- }
- }
- log.flush();
- log.stop();
-}
-TEST(Log, ManyGatherLogStringAssignWithReserve)
-{
- SubsystemMap subs;
- subs.set_log_level(1, 20);
- subs.set_gather_level(1, 10);
- Log log(&subs);
- log.start();
- log.set_log_file("/tmp/big");
- log.reopen_log_file();
- for (int i=0; i<many; i++) {
- int l = 10;
- if (subs.should_gather(1, l)) {
- Entry *e = log.create_entry(l, 1);
- ostringstream oss;
- oss.str().reserve(80);
- oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
- e->set_str(oss.str());
- log.submit_entry(e);
+ MutableEntry e(l, 1);
+ e.get_ostream() << "this is a long string asdf asdf asdf asdf asdf asdf asd fasd fasdf ";
+ log.submit_entry(std::move(e));
}
}
log.flush();
log.stop();
}
-TEST(Log, ManyGatherLogPrebuf)
+TEST(Log, ManyGatherLogStringAssign)
{
SubsystemMap subs;
subs.set_log_level(1, 20);
for (int i=0; i<many; i++) {
int l = 10;
if (subs.should_gather(1, l)) {
- Entry *e = log.create_entry(l, 1);
- PrebufferedStreambuf psb(e->m_static_buf, sizeof(e->m_static_buf));
- ostream oss(&psb);
- oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
- //e->m_str = oss.str();
- log.submit_entry(e);
+ MutableEntry e(l, 1);
+ e.get_ostream() << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
+ log.submit_entry(std::move(e));
}
}
log.flush();
log.stop();
}
-TEST(Log, ManyGatherLogPrebufOverflow)
+TEST(Log, ManyGatherLogStackSpillover)
{
SubsystemMap subs;
subs.set_log_level(1, 20);
for (int i=0; i<many; i++) {
int l = 10;
if (subs.should_gather(1, l)) {
- Entry *e = log.create_entry(l, 1);
- PrebufferedStreambuf psb(e->m_static_buf, sizeof(e->m_static_buf));
- ostream oss(&psb);
- oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd"
- << std::string(sizeof(e->m_static_buf) * 2, '-') ;
- //e->m_str = oss.str();
- log.submit_entry(e);
+ MutableEntry e(l, 1);
+ auto& s = e.get_ostream();
+ s << "foo";
+ s << std::string(sizeof(e) * 2, '-');
+ log.submit_entry(std::move(e));
}
}
log.flush();
for (int i=0; i<many; i++) {
int l = 10;
if (subs.should_gather(1, l))
- log.submit_entry(log.create_entry(l, 1));
+ log.submit_entry(MutableEntry(l, 1));
}
log.flush();
log.stop();
log.reopen_log_file();
log.inject_segv();
- Entry *e = log.create_entry(10, 1);
+ MutableEntry e(10, 1);
{
PrCtl unset_dumpable;
- log.submit_entry(e); // this should segv
+ log.submit_entry(std::move(e)); // this should segv
}
log.flush();
log.set_log_file("/tmp/big");
log.reopen_log_file();
int l = 10;
- Entry *e = log.create_entry(l, 1);
-
- std::string msg(10000000, 0);
- e->set_str(msg);
- log.submit_entry(e);
+ {
+ MutableEntry e(l, 1);
+ std::string msg(10000000, 'a');
+ e.get_ostream() << msg;
+ log.submit_entry(std::move(e));
+ }
log.flush();
log.stop();
}
int l = 10;
bool coarse = true;
for (auto i = 0U; i < 300; ++i) {
- log.submit_entry(
- log.create_entry(l, 1, "SQUID THEFT! PUNISHABLE BY DEATH!"));
+ MutableEntry e(l, 1);
+ e.get_ostream() << "SQUID THEFT! PUNISHABLE BY DEATH!";
+ log.submit_entry(std::move(e));
if (i % 50)
log.set_coarse_timestamps(coarse = !coarse);
}