]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
log: avoid heap allocations for most log entries
authorPatrick Donnelly <pdonnell@redhat.com>
Thu, 23 Aug 2018 17:40:20 +0000 (10:40 -0700)
committerPatrick Donnelly <pdonnell@redhat.com>
Sat, 15 Sep 2018 02:46:08 +0000 (19:46 -0700)
Each log Entry now exists on the stack and uses a large (4k) buffer for its log
stream. This Entry is std::move'd to the queues (std::vector and
boost::circular_buffer) in the Log, involving only memory copies in the general
case. There are two memory copies (std::move) for any given Entry, once in
Log::submit_entry and again in Log::_flush

In practice, this eliminates 100% of allocations outside of startup
allocations

I've run a simple experiment with the MDS that copies /usr/bin to CephFS. I got
measurements for the number of allocations from the heap profiler and the
profile of CPU usage in the MDS.

** Before this patch **

    == Heap profile: ==

    $ google-pprof --alloc_objects --text bin/ceph-mds out/mds.a.profile.0001.heap
    Total: 1105048 objects
      433329  39.2%  39.2%   433329  39.2% ceph::logging::Log::create_entry
      209311  18.9%  58.2%   209311  18.9% __gnu_cxx::__aligned_membuf::_M_addr (inline)
      192963  17.5%  75.6%   192963  17.5% __gnu_cxx::new_allocator::allocate (inline)
       61774   5.6%  81.2%    61774   5.6% std::__cxx11::basic_string::_M_mutate
       37689   3.4%  84.6%    37689   3.4% ceph::buffer::raw_combined::create (inline)
       22773   2.1%  86.7%    22773   2.1% mempool::pool_allocator::allocate (inline)
       17761   1.6%  88.3%    20523   1.9% std::pair::pair (inline)
       15795   1.4%  89.7%    15797   1.4% std::swap (inline)
       11011   1.0%  90.7%   130061  11.8% std::__cxx11::list::_M_insert (inline)
       10822   1.0%  91.7%    10822   1.0% std::__cxx11::basic_string::reserve
        9108   0.8%  92.5%    32721   3.0% __gnu_cxx::new_allocator::construct (inline)
        8608   0.8%  93.3%     8610   0.8% std::_Deque_base::_M_initialize_map (inline)
        7694   0.7%  94.0%     7694   0.7% std::__cxx11::basic_string::_M_capacity (inline)
        6160   0.6%  94.5%     6160   0.6% Journaler::wrap_finisher
        6084   0.6%  95.1%    70892   6.4% std::map::operator[] (inline)
        5347   0.5%  95.6%     5347   0.5% MutationImpl::add_projected_fnode
        4381   0.4%  96.0%     7706   0.7% mempool::pool_allocator::construct (inline)
        3588   0.3%  96.3%   182966  16.6% Locker::_do_cap_update
        3049   0.3%  96.6%     5280   0.5% std::__shared_count::__shared_count (inline)
        3043   0.3%  96.9%     3043   0.3% MDSLogContextBase::MDSLogContextBase (inline)
        3038   0.3%  97.1%    14763   1.3% std::__uninitialized_copy::__uninit_copy (inline)

   So approximately 430k heap allocations for Entry were created! The
   basic_string::_M_mutate is also another allocation via PrebufferedStreambuf

== Profile data ==

Selecting interesting functions

Samples: 798K of event 'cycles:pp', Event count (approx
  Children      Self  Com  Shared Object        Symbol
+    1.04%     1.04%  log  libceph-common.so.0  [.] ceph::logging::Log::_flush
+    0.05%     0.05%  log  libceph-common.so.0  [.] ceph::logging::Log::flush
+    0.00%     0.00%  log  libceph-common.so.0  [.] ceph::logging::Log::_log_safe_write
+    0.00%     0.00%  log  libceph-common.so.0  [.] ceph::logging::Log::entry
+    0.00%     0.00%  log  libceph-common.so.0  [.] ceph::logging::Log::_flush_logbuf
...

  Children      Self  Command         Shared Object        Symbol
+    3.69%     0.00%  safe_timer      libceph-common.so.0  [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf
+    0.53%     0.00%  ms_dispatch     libceph-common.so.0  [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf
+    0.13%     0.00%  fn_anonymous    libceph-common.so.0  [.] CachedPrebufferedStreambuf::~CachedPrebufferedStreambuf
+    0.00%     0.00%  ms_dispatch     libceph-common.so.0  [.] CachedPrebufferedStreambuf::create
+    0.00%     0.00%  fn_anonymous    libceph-common.so.0  [.] CachedPrebufferedStreambuf::create

  Children      Self  Command         Shared Object        Symbol
+    0.07%     0.07%  fn_anonymous    libceph-common.so.0  [.] ceph::logging::Log::create_entry
+    0.00%     0.00%  ms_dispatch     libceph-common.so.0  [.] ceph::logging::Log::create_entry
+    0.00%     0.00%  ms_dispatch     ceph-mds             [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt
+    0.00%     0.00%  md_submit       libceph-common.so.0  [.] ceph::logging::Log::create_entry
+    0.00%     0.00%  fn_anonymous    ceph-mds             [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt
+    0.00%     0.00%  safe_timer      libceph-common.so.0  [.] ceph::logging::Log::create_entry
+    0.00%     0.00%  mds_rank_progr  libceph-common.so.0  [.] ceph::logging::Log::create_entry
+    0.00%     0.00%  mds_rank_progr  ceph-mds             [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt
+    0.00%     0.00%  msgr-worker-0   libceph-common.so.0  [.] ceph::logging::Log::create_entry
+    0.00%     0.00%  msgr-worker-2   libceph-common.so.0  [.] ceph::logging::Log::create_entry
+    0.00%     0.00%  md_submit       ceph-mds             [.] _ZN4ceph7logging3Log12create_entryEiiPm@plt
+    0.00%     0.00%  msgr-worker-1   libceph-common.so.0  [.] ceph::logging::Log::create_entry

  Children      Self  Command         Shared Object        Symbol
+    8.29%     0.00%  ms_dispatch     libstdc++.so.6.0.24  [.] virtual thunk to std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+    7.55%     1.46%  ms_dispatch     libstdc++.so.6.0.24  [.] std::ostream::_M_insert<long>
+    3.87%     0.00%  fn_anonymous    libstdc++.so.6.0.24  [.] std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+    2.92%     0.00%  md_submit       libstdc++.so.6.0.24  [.] virtual thunk to std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+    2.38%     0.00%  fn_anonymous    libstdc++.so.6.0.24  [.] virtual thunk to std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+    2.22%     2.22%  fn_anonymous    libstdc++.so.6.0.24  [.] std::ostream::sentry::sentry
+    1.89%     0.13%  fn_anonymous    libstdc++.so.6.0.24  [.] std::__ostream_insert<char, std::char_traits<char> >
+    0.71%     0.00%  ms_dispatch     libstdc++.so.6.0.24  [.] std::basic_ostream<char, std::char_traits<char> >::~basic_ostream
+    0.39%     0.06%  fn_anonymous    libstdc++.so.6.0.24  [.] std::ostream::_M_insert<long>
+    0.29%     0.21%  ms_dispatch     libstdc++.so.6.0.24  [.] std::__ostream_insert<char, std::char_traits<char> >
+    0.27%     0.27%  ms_dispatch     libstdc++.so.6.0.24  [.] std::ostream::sentry::~sentry
+    0.27%     0.27%  fn_anonymous    libstdc++.so.6.0.24  [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<long>
+    0.22%     0.22%  ms_dispatch     libstdc++.so.6.0.24  [.] std::basic_streambuf<char, std::char_traits<char> >::xsputn
+    0.20%     0.20%  ms_dispatch     libstdc++.so.6.0.24  [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<long>
+    0.15%     0.15%  fn_anonymous    libstdc++.so.6.0.24  [.] std::ostream::sentry::~sentry
+    0.14%     0.14%  ms_dispatch     libstdc++.so.6.0.24  [.] std::ostream::sentry::sentry
+    0.13%     0.00%  ms_dispatch     libstdc++.so.6.0.24  [.] std::ostream::_M_insert<unsigned long>
+    0.13%     0.13%  fn_anonymous    libstdc++.so.6.0.24  [.] std::basic_streambuf<char, std::char_traits<char> >::xsputn
+    0.00%     0.00%  fn_anonymous    libstdc++.so.6.0.24  [.] std::ostream::_M_insert<unsigned long>
+    0.00%     0.00%  ms_dispatch     libstdc++.so.6.0.24  [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<unsigned long>

    And the unittest_log time:

    $ bin/unittest_log
    [==========] Running 15 tests from 1 test case
    [----------] Global test environment set-up
    [----------] 15 tests from Log
    [ RUN      ] Log.Simple
    [       OK ] Log.Simple (0 ms)
    [ RUN      ] Log.ReuseBad
    [       OK ] Log.ReuseBad (1 ms)
    [ RUN      ] Log.ManyNoGather
    [       OK ] Log.ManyNoGather (0 ms)
    [ RUN      ] Log.ManyGatherLog
    [       OK ] Log.ManyGatherLog (12 ms)
    [ RUN      ] Log.ManyGatherLogStringAssign
    [       OK ] Log.ManyGatherLogStringAssign (27 ms)
    [ RUN      ] Log.ManyGatherLogStringAssignWithReserve
    [       OK ] Log.ManyGatherLogStringAssignWithReserve (27 ms)
    [ RUN      ] Log.ManyGatherLogPrebuf
    [       OK ] Log.ManyGatherLogPrebuf (15 ms)
    [ RUN      ] Log.ManyGatherLogPrebufOverflow
    [       OK ] Log.ManyGatherLogPrebufOverflow (15 ms)
    [ RUN      ] Log.ManyGather
    [       OK ] Log.ManyGather (8 ms)
    [ RUN      ] Log.InternalSegv

    [WARNING] /home/pdonnell/cephfs-shell/src/googletest/googletest/src/gtest-death-test.cc:836:: Death tests use fork(), which is unsafe particularly in a threaded context. For this test, Google Test detected 3 threads
    [       OK ] Log.InternalSegv (8 ms)
    [ RUN      ] Log.LargeLog
    [       OK ] Log.LargeLog (43 ms)
    [ RUN      ] Log.TimeSwitch
    [       OK ] Log.TimeSwitch (1 ms)
    [ RUN      ] Log.TimeFormat
    [       OK ] Log.TimeFormat (0 ms)
    [ RUN      ] Log.Speed_gather
    [       OK ] Log.Speed_gather (1779 ms)
    [ RUN      ] Log.Speed_nogather
    [       OK ] Log.Speed_nogather (64 ms)
    [----------] 15 tests from Log (2000 ms total)

    [----------] Global test environment tear-down
    [==========] 15 tests from 1 test case ran. (2000 ms total)
    [  PASSED  ] 15 tests

** After Patch **

The StackStreamBuf uses 4k for its default buffer. This appears to be more than
reasonable for preventing allocations for logging

    == Heap profile: ==

$ google-pprof --alloc_objects --text bin/ceph-mds out/mds.a.profile.0001.heap
Total: 1052127 objects
      384957  36.6%  36.6%   384957  36.6% __gnu_cxx::new_allocator::allocate (inline)
      274720  26.1%  62.7%   274720  26.1% __gnu_cxx::__aligned_membuf::_M_addr (inline)
       68496   6.5%  69.2%    68496   6.5% std::__cxx11::basic_string::_M_mutate
       44140   4.2%  73.4%    51828   4.9% std::pair::pair (inline)
       43091   4.1%  77.5%    43091   4.1% ceph::buffer::raw_combined::create (inline)
       27706   2.6%  80.1%   236407  22.5% std::__cxx11::list::_M_insert (inline)
       25699   2.4%  82.6%    25699   2.4% std::__cxx11::basic_string::reserve
       23183   2.2%  84.8%    23183   2.2% mempool::pool_allocator::allocate (inline)
       19466   1.9%  86.6%    81716   7.8% __gnu_cxx::new_allocator::construct (inline)
       17606   1.7%  88.3%    17606   1.7% std::__cxx11::basic_string::_M_capacity (inline)
       16879   1.6%  89.9%    16881   1.6% std::swap (inline)
        8572   0.8%  90.7%     8574   0.8% std::_Deque_base::_M_initialize_map (inline)
        8477   0.8%  91.5%    11808   1.1% mempool::pool_allocator::construct (inline)
        6166   0.6%  92.1%     6166   0.6% Journaler::wrap_finisher
        6080   0.6%  92.7%   134975  12.8% std::map::operator[] (inline)
        6079   0.6%  93.3%     6079   0.6% MutationImpl::add_projected_fnode

== Profile data ==

    Samples: 62K of event 'cycles:u', Event count (approx.)
      Overhead  Command         Shared Object         Symbol
    +    5.91%  log             libc-2.23.so          [.] vfprintf
    +    5.75%  ms_dispatch     libstdc++.so.6.0.24   [.] std::__ostream_insert<char, std::char_traits<char> >
    +    4.59%  ms_dispatch     ceph-mds              [.] StackStringBuf<4096ul>::xsputn
    +    4.26%  ms_dispatch     libc-2.23.so          [.] __memmove_ssse3_back
    +    4.07%  log             libceph-common.so.0   [.] ceph::logging::Log::_flush
    +    2.48%  ms_dispatch     libstdc++.so.6.0.24   [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<l
    +    2.13%  fn_anonymous    libstdc++.so.6.0.24   [.] std::__ostream_insert<char, std::char_traits<char> >
    +    2.09%  ms_dispatch     ceph-mds              [.] CDir::check_rstats
    +    2.06%  ms_dispatch     libstdc++.so.6.0.24   [.] std::ostream::sentry::sentry
    +    1.98%  ms_dispatch     libstdc++.so.6.0.24   [.] std::ostream::sentry::~sentry
    +    1.87%  log             libc-2.23.so          [.] __strcpy_sse2_unaligned
    +    1.60%  fn_anonymous    ceph-mds              [.] StackStringBuf<4096ul>::xsputn
    +    1.46%  log             libc-2.23.so          [.] _IO_default_xsputn
    +    1.45%  log             libc-2.23.so          [.] _itoa_word
    +    1.43%  fn_anonymous    libc-2.23.so          [.] __memmove_ssse3_back
    +    1.40%  ms_dispatch     libstdc++.so.6.0.24   [.] std::ostream::_M_insert<long>
    +    0.98%  fn_anonymous    libstdc++.so.6.0.24   [.] std::num_put<char, std::ostreambuf_iterator<char, std::char_traits<char> > >::_M_insert_int<l
    +    0.89%  ms_dispatch     libstdc++.so.6.0.24   [.] 0x
    +    0.88%  ms_dispatch     libstdc++.so.6.0.24   [.] std::_Rb_tree_increment

    And the unittest_log time:

    $ bin/unittest_log
    [==========] Running 13 tests from 1 test case.
    [----------] Global test environment set-up.
    [----------] 13 tests from Log
    [ RUN      ] Log.Simple
    [       OK ] Log.Simple (1 ms)
    [ RUN      ] Log.ReuseBad
    [       OK ] Log.ReuseBad (0 ms)
    [ RUN      ] Log.ManyNoGather
    [       OK ] Log.ManyNoGather (0 ms)
    [ RUN      ] Log.ManyGatherLog
    [       OK ] Log.ManyGatherLog (83 ms)
    [ RUN      ] Log.ManyGatherLogStringAssign
    [       OK ] Log.ManyGatherLogStringAssign (79 ms)
    [ RUN      ] Log.ManyGatherLogStackSpillover
    [       OK ] Log.ManyGatherLogStackSpillover (81 ms)
    [ RUN      ] Log.ManyGather
    [       OK ] Log.ManyGather (80 ms)
    [ RUN      ] Log.InternalSegv

    [WARNING] /home/pdonnell/ceph/src/googletest/googletest/src/gtest-death-test.cc:836:: Death tests use fork(), which is unsafe particularly in a threaded context. For this test, Google Test detected 3 threads.
    [       OK ] Log.InternalSegv (7 ms)
    [ RUN      ] Log.LargeLog
    [       OK ] Log.LargeLog (55 ms)
    [ RUN      ] Log.TimeSwitch
    [       OK ] Log.TimeSwitch (4 ms)
    [ RUN      ] Log.TimeFormat
    [       OK ] Log.TimeFormat (1 ms)
    [ RUN      ] Log.Speed_gather
    [       OK ] Log.Speed_gather (1441 ms)
    [ RUN      ] Log.Speed_nogather
    [       OK ] Log.Speed_nogather (63 ms)
    [----------] 13 tests from Log (1895 ms total)

    [----------] Global test environment tear-down
    [==========] 13 tests from 1 test case ran. (1895 ms total)
    [  PASSED  ] 13 tests.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/common/Graylog.cc
src/common/Graylog.h
src/common/StackStringStream.h [new file with mode: 0644]
src/common/dout.h
src/crimson/CMakeLists.txt
src/log/Entry.h
src/log/EntryQueue.h [deleted file]
src/log/Log.cc
src/log/Log.h
src/log/test.cc

index 3ed10434a2d2baea52ce11a97efe5061e5f730dc..79a537eeddeea6e397b4593a209815d485134358 100644 (file)
@@ -67,23 +67,23 @@ void Graylog::set_fsid(const uuid_d& fsid)
   m_fsid = std::string(&buf[0]);
 }
 
-void Graylog::log_entry(Entry const * const e)
+void Graylog::log_entry(const Entry& e)
 {
   if (m_log_dst_valid) {
-    std::string s = e->get_str();
+    auto s = e.strv();
 
     m_formatter->open_object_section("");
     m_formatter->dump_string("version", "1.1");
     m_formatter->dump_string("host", m_hostname);
     m_formatter->dump_string("short_message", s);
     m_formatter->dump_string("_app", "ceph");
-    auto t = ceph::logging::log_clock::to_timeval(e->m_stamp);
+    auto t = ceph::logging::log_clock::to_timeval(e.m_stamp);
     m_formatter->dump_float("timestamp", t.tv_sec + (t.tv_usec / 1000000.0));
-    m_formatter->dump_unsigned("_thread", (uint64_t)e->m_thread);
-    m_formatter->dump_int("_level", e->m_prio);
+    m_formatter->dump_unsigned("_thread", (uint64_t)e.m_thread);
+    m_formatter->dump_int("_level", e.m_prio);
     if (m_subs != NULL)
-    m_formatter->dump_string("_subsys_name", m_subs->get_name(e->m_subsys));
-    m_formatter->dump_int("_subsys_id", e->m_subsys);
+    m_formatter->dump_string("_subsys_name", m_subs->get_name(e.m_subsys));
+    m_formatter->dump_int("_subsys_id", e.m_subsys);
     m_formatter->dump_string("_fsid", m_fsid);
     m_formatter->dump_string("_logger", m_logger);
     m_formatter->close_section();
index 70cd770504ebc4311bbcd8f7b2429b427f60fcd7..f78c936ae9ab39e74e3b10bbe215ec44e0663d6d 100644 (file)
@@ -19,7 +19,7 @@ class Formatter;
 
 namespace logging {
 
-struct Entry;
+class Entry;
 class SubsystemMap;
 
 // Graylog logging backend: Convert log datastructures (LogEntry, Entry) to
@@ -51,7 +51,7 @@ class Graylog
 
   void set_destination(const std::string& host, int port);
 
-  void log_entry(Entry const * const e);
+  void log_entry(const Entry& e);
   void log_log_entry(LogEntry const * const e);
 
   typedef std::shared_ptr<Graylog> Ref;
diff --git a/src/common/StackStringStream.h b/src/common/StackStringStream.h
new file mode 100644 (file)
index 0000000..6d080f3
--- /dev/null
@@ -0,0 +1,161 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef COMMON_STACKSTRINGSTREAM_H
+#define COMMON_STACKSTRINGSTREAM_H
+
+#include <boost/container/small_vector.hpp>
+
+#include <algorithm>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <ostream>
+#include <sstream>
+#include <string_view>
+#include <vector>
+
+template<std::size_t SIZE>
+class StackStringBuf : public std::basic_streambuf<char>
+{
+public:
+  StackStringBuf() = default;
+  StackStringBuf(const StackStringBuf&) = delete;
+  StackStringBuf& operator=(const StackStringBuf&) = delete;
+  StackStringBuf(StackStringBuf&& o) = delete;
+  StackStringBuf& operator=(StackStringBuf&& o) = delete;
+  ~StackStringBuf() override = default;
+
+  void push(std::string_view sv)
+  {
+    vec.reserve(vec.size() + sv.size());
+    vec.insert(vec.end(), sv.begin(), sv.end());
+  }
+
+  void clear()
+  {
+    vec.clear();
+  }
+
+  std::string_view strv() const
+  {
+    return std::string_view(vec.data(), vec.size());
+  }
+
+protected:
+  std::streamsize xsputn(const char *s, std::streamsize n)
+  {
+    push(std::string_view(s, n));
+    return n;
+  }
+
+  int overflow(int c)
+  {
+    if (traits_type::not_eof(c)) {
+      char str = traits_type::to_char_type(c);
+      push(std::string_view(&str, 1));
+      return c;
+    }
+    return EOF;
+  }
+
+private:
+
+  boost::container::small_vector<char, SIZE> vec;
+};
+
+template<std::size_t SIZE>
+class StackStringStream : public std::basic_ostream<char>
+{
+public:
+  StackStringStream() : basic_ostream<char>(&ssb) {}
+  StackStringStream(const StackStringStream& o) = delete;
+  StackStringStream& operator=(const StackStringStream& o) = delete;
+  StackStringStream(StackStringStream&& o) = delete;
+  StackStringStream& operator=(StackStringStream&& o) = delete;
+  ~StackStringStream() override = default;
+
+  void clear() {
+    basic_ostream<char>::clear();
+    ssb.clear();
+  }
+
+  std::string_view strv() const {
+    return ssb.strv();
+  }
+
+private:
+  StackStringBuf<SIZE> ssb;
+};
+
+/* In an ideal world, we could use StackStringStream indiscriminately, but alas
+ * it's very expensive to construct/destruct. So, we cache them in a
+ * thread_local vector. DO NOT share these with other threads. The copy/move
+ * constructors are deliberately restrictive to make this more difficult to
+ * accidentally do.
+ */
+class CachedStackStringStream {
+public:
+  using sss = StackStringStream<4096>;
+  using osptr = std::unique_ptr<sss>;
+
+  CachedStackStringStream() {
+    if (cache.destructed || cache.c.empty()) {
+      osp = std::make_unique<sss>();
+    } else {
+      osp = std::move(cache.c.back());
+      cache.c.pop_back();
+      osp->clear();
+    }
+  }
+  CachedStackStringStream(const CachedStackStringStream&) = delete;
+  CachedStackStringStream& operator=(const CachedStackStringStream&) = delete;
+  CachedStackStringStream(CachedStackStringStream&&) = delete;
+  CachedStackStringStream& operator=(CachedStackStringStream&&) = delete;
+  ~CachedStackStringStream() {
+    if (!cache.destructed && cache.c.size() < max_elems) {
+      cache.c.emplace_back(std::move(osp));
+    }
+  }
+
+  sss& get_stream() {
+    return *osp;
+  }
+  const sss& get_stream() const {
+    return *osp;
+  }
+
+private:
+  static constexpr std::size_t max_elems = 8;
+
+  /* The thread_local cache may be destructed before other static structures.
+   * If those destructors try to create a CachedStackStringStream (e.g. for
+   * logging) and access this cache, that access will be undefined. So note if
+   * the cache has been destructed and check before use.
+   */
+  struct Cache {
+    using container = std::vector<osptr>;
+
+    Cache() {}
+    ~Cache() { destructed = true; }
+
+    container c;
+    bool destructed = false;
+  };
+
+  inline static thread_local Cache cache;
+  osptr osp;
+};
+
+#endif
index a98dcfa3e1ef0c61e8b3e9ea2fd1af417f1b30c8..fba299cc812812491947f0e2556ad43347c5f0cc 100644 (file)
@@ -157,18 +157,16 @@ struct is_dynamic<dynamic_marker_t<T>> : public std::true_type {};
   }(cct);                                                              \
                                                                        \
   if (should_gather) {                                                 \
-    static size_t _log_exp_length = 80;                                \
-    ceph::logging::Entry *_dout_e =                                    \
-      cct->_log->create_entry(v, sub, &_log_exp_length);               \
+    ceph::logging::MutableEntry _dout_e(v, sub);                        \
     static_assert(std::is_convertible<decltype(&*cct),                         \
                                      CephContext* >::value,            \
                  "provided cct must be compatible with CephContext*"); \
     auto _dout_cct = cct;                                              \
-    std::ostream* _dout = &_dout_e->get_ostream();
+    std::ostream* _dout = &_dout_e.get_ostream();
 
-#define dendl_impl std::flush;                         \
-    _dout_cct->_log->submit_entry(_dout_e);            \
-  }                                                    \
+#define dendl_impl std::flush;                                          \
+    _dout_cct->_log->submit_entry(std::move(_dout_e));                  \
+  }                                                                     \
   } while (0)
 #endif // WITH_SEASTAR
 
index 4d6302f41b6773d4b53f1a4ce87bda83bb40ebba..3fffa8b6e372c7bfb70c9f1a91b3deb7b7ee54e6 100644 (file)
@@ -60,7 +60,6 @@ add_library(crimson-common STATIC
   ${PROJECT_SOURCE_DIR}/src/common/utf8.c
   ${PROJECT_SOURCE_DIR}/src/common/version.cc
   ${PROJECT_SOURCE_DIR}/src/common/BackTrace.cc
-  ${PROJECT_SOURCE_DIR}/src/common/CachedPrebufferedStreambuf.cc
   ${PROJECT_SOURCE_DIR}/src/common/ConfUtils.cc
   ${PROJECT_SOURCE_DIR}/src/common/DecayCounter.cc
   ${PROJECT_SOURCE_DIR}/src/common/HTMLFormatter.cc
index e0677ef337ffca60b0368c96e8da6aec07f151f9..076512cbe0c012973036cb4f37ec6f646226805e 100644 (file)
 #ifndef __CEPH_LOG_ENTRY_H
 #define __CEPH_LOG_ENTRY_H
 
-#include "common/CachedPrebufferedStreambuf.h"
-#include <pthread.h>
-#include <string>
 #include "log/LogClock.h"
 
+#include "common/StackStringStream.h"
 
-namespace ceph {
-namespace logging {
+#include "boost/container/small_vector.hpp"
 
-struct Entry {
-  log_time m_stamp;
-  pthread_t m_thread;
-  short m_prio, m_subsys;
-  Entry *m_next;
+#include <pthread.h>
 
-  size_t m_buf_len;
-  size_t* m_exp_len;
-  char m_static_buf[1];
+#include <string_view>
 
-  prebuffered_data m_data;
-  CachedPrebufferedStreambuf* m_streambuf;
+namespace ceph {
+namespace logging {
 
-  Entry()
-    : Entry(log_time{}, 0, 0, 0, nullptr)
-  {}
-  Entry(log_time s, pthread_t t, short pr, short sub,
-       const char *msg = NULL)
-    : Entry(s, t, pr, sub, m_static_buf, sizeof(m_static_buf), nullptr,
-           msg)
+class Entry {
+public:
+  using time = log_time;
+
+  Entry() = delete;
+  Entry(short pr, short sub) :
+    m_stamp(clock().now()),
+    m_thread(pthread_self()),
+    m_prio(pr),
+    m_subsys(sub)
   {}
-  Entry(log_time s, pthread_t t, short pr, short sub, char* buf, size_t buf_len, size_t* exp_len,
-       const char *msg = NULL)
-    : m_stamp(s), m_thread(t), m_prio(pr), m_subsys(sub),
-      m_next(NULL),
-      m_buf_len(buf_len),
-      m_exp_len(exp_len),
-      m_data(buf, buf_len),
-      m_streambuf(CachedPrebufferedStreambuf::create(&m_data))
-  {
-    if (msg) {
-      get_ostream() << msg;
-    }
-  }
+  Entry(const Entry &) = default;
+  Entry& operator=(const Entry &) = default;
+  Entry(Entry &&e) = default;
+  Entry& operator=(Entry &&e) = default;
+  virtual ~Entry() = default;
+
+  virtual std::string_view strv() const = 0;
+  virtual std::size_t size() const = 0;
+
+  time m_stamp;
+  pthread_t m_thread;
+  short m_prio, m_subsys;
 
 private:
-  ~Entry() = default;
+  static log_clock& clock() {
+    static log_clock clock;
+    return clock;
+  }
+};
 
+/* This should never be moved to the heap! Only allocate this on the stack. See
+ * CachedStackStringStream for rationale.
+ */
+class MutableEntry : public Entry {
 public:
+  MutableEntry() = delete;
+  MutableEntry(short pr, short sub) : Entry(pr, sub) {}
+  MutableEntry(const MutableEntry&) = delete;
+  MutableEntry& operator=(const MutableEntry&) = delete;
+  MutableEntry(MutableEntry&&) = delete;
+  MutableEntry& operator=(MutableEntry&&) = delete;
+  ~MutableEntry() override = default;
+
   std::ostream& get_ostream() {
-    return m_streambuf->get_ostream();
+    return cos.get_stream();
   }
 
-  // function improves estimate for expected size of message
-  void hint_size() {
-    if (m_exp_len != NULL) {
-      size_t size = m_data.size();
-      if (size > __atomic_load_n(m_exp_len, __ATOMIC_RELAXED)) {
-        //log larger then expected, just expand
-        __atomic_store_n(m_exp_len, size + 10, __ATOMIC_RELAXED);
-      }
-      else {
-        //asymptotically adapt expected size to message size
-        __atomic_store_n(m_exp_len, (size + 10 + m_buf_len*31) / 32, __ATOMIC_RELAXED);
-      }
-    }
+  std::string_view strv() const override {
+    return cos.get_stream().strv();
   }
-
-  void set_str(const std::string &s) {
-    get_ostream() << s;
+  std::size_t size() const override {
+    return cos.get_stream().strv().size();
   }
 
-  std::string get_str() const {
-    return m_data.get_str();
-  }
+private:
+  CachedStackStringStream cos;
+};
 
-  // returns current size of content
-  size_t size() const {
-    return m_data.size();
+class ConcreteEntry : public Entry {
+public:
+  ConcreteEntry() = delete;
+  ConcreteEntry(const Entry& e) : Entry(e) {
+    auto strv = e.strv();
+    str.reserve(strv.size());
+    str.insert(str.end(), strv.begin(), strv.end());
   }
-
-  // extracts up to avail chars of content
-  int snprintf(char* dst, size_t avail) const {
-    return m_data.snprintf(dst, avail);
+  ConcreteEntry& operator=(const Entry& e) {
+    Entry::operator=(e);
+    auto strv = e.strv();
+    str.reserve(strv.size());
+    str.assign(strv.begin(), strv.end());
+    return *this;
   }
-
-  void finish() {
-    m_streambuf->finish();
+  ConcreteEntry(ConcreteEntry&& e) : Entry(e), str(std::move(e.str)) {}
+  ConcreteEntry& operator=(ConcreteEntry&& e) {
+    Entry::operator=(e);
+    str = std::move(e.str);
+    return *this;
   }
+  ~ConcreteEntry() override = default;
 
-  void destroy() {
-    if (m_exp_len != NULL) {
-      this->~Entry();
-      ::operator delete(this);
-    } else {
-      delete(this);
-    }
+  std::string_view strv() const override {
+    return std::string_view(str.data(), str.size());
   }
+  std::size_t size() const override {
+    return str.size();
+  }
+
+private:
+  boost::container::small_vector<char, 4096> str;
 };
 
 }
diff --git a/src/log/EntryQueue.h b/src/log/EntryQueue.h
deleted file mode 100644 (file)
index 490b6c2..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef __CEPH_LOG_ENTRYQUEUE_H
-#define __CEPH_LOG_ENTRYQUEUE_H
-
-#include "Entry.h"
-
-namespace ceph {
-namespace logging {
-
-struct EntryQueue {
-  int m_len;
-  struct Entry *m_head, *m_tail;
-
-  bool empty() const {
-    return m_len == 0;
-  }
-
-  void swap(EntryQueue& other) {
-    int len = m_len;
-    struct Entry *h = m_head, *t = m_tail;
-    m_len = other.m_len;
-    m_head = other.m_head;
-    m_tail = other.m_tail;
-    other.m_len = len;
-    other.m_head = h;
-    other.m_tail = t;
-  }
-
-  void enqueue(Entry *e) {
-    if (m_tail) {
-      m_tail->m_next = e;
-      m_tail = e;
-    } else {
-      m_head = m_tail = e;
-    }
-    m_len++;
-  }
-
-  Entry *dequeue() {
-    if (!m_head)
-      return NULL;
-    Entry *e = m_head;
-    m_head = m_head->m_next;
-    if (!m_head)
-      m_tail = NULL;
-    m_len--;
-    e->m_next = NULL;
-    return e;
-  }
-
-  EntryQueue()
-    : m_len(0),
-      m_head(NULL),
-      m_tail(NULL)
-  {}
-  ~EntryQueue() {
-    Entry *t;
-    while (m_head) {
-      t = m_head->m_next;
-      m_head->destroy();
-      m_head = t;
-    }      
-  }
-};
-
-}
-}
-
-#endif
index c1f8e76562a4b2f513cf968860ec76af9d85273c..a11f19b048bd2ead7ec317caef986cb1abb4fdad 100644 (file)
@@ -3,12 +3,8 @@
 
 #include "Log.h"
 
-#include <errno.h>
-#include <syslog.h>
-
 #include "common/errno.h"
 #include "common/safe_io.h"
-#include "common/Clock.h"
 #include "common/Graylog.h"
 #include "common/valgrind.h"
 
 #include "LogClock.h"
 #include "SubsystemMap.h"
 
-#define DEFAULT_MAX_NEW    100
-#define DEFAULT_MAX_RECENT 10000
+#include <errno.h>
+#include <syslog.h>
+
+#include <iostream>
 
-#define PREALLOC 1000000
 #define MAX_LOG_BUF 65536
 
 namespace ceph {
@@ -40,66 +37,28 @@ static void log_on_exit(void *p)
 }
 
 Log::Log(const SubsystemMap *s)
-  : m_indirect_this(NULL),
+  : m_indirect_this(nullptr),
     m_subs(s),
-    m_queue_mutex_holder(0),
-    m_flush_mutex_holder(0),
-    m_new(), m_recent(),
-    m_fd(-1),
-    m_uid(0),
-    m_gid(0),
-    m_fd_last_error(0),
-    m_syslog_log(-2), m_syslog_crash(-2),
-    m_stderr_log(1), m_stderr_crash(-1),
-    m_graylog_log(-3), m_graylog_crash(-3),
-    m_log_buf(nullptr), m_log_buf_pos(0),
-    m_stop(false),
-    m_max_new(DEFAULT_MAX_NEW),
-    m_max_recent(DEFAULT_MAX_RECENT),
-    m_inject_segv(false)
+    m_recent(DEFAULT_MAX_RECENT)
 {
-  int ret;
-
-  ret = pthread_mutex_init(&m_flush_mutex, NULL);
-  ceph_assert(ret == 0);
-
-  ret = pthread_mutex_init(&m_queue_mutex, NULL);
-  ceph_assert(ret == 0);
-
-  ret = pthread_cond_init(&m_cond_loggers, NULL);
-  ceph_assert(ret == 0);
-
-  ret = pthread_cond_init(&m_cond_flusher, NULL);
-  ceph_assert(ret == 0);
-
-  m_log_buf = (char*)malloc(MAX_LOG_BUF);
-
-  // kludge for prealloc testing
-  if (false)
-    for (int i=0; i < PREALLOC; i++)
-      m_recent.enqueue(new Entry);
+  m_log_buf.reserve(MAX_LOG_BUF);
 }
 
 Log::~Log()
 {
   if (m_indirect_this) {
-    *m_indirect_this = NULL;
+    *m_indirect_this = nullptr;
   }
 
   ceph_assert(!is_started());
   if (m_fd >= 0)
     VOID_TEMP_FAILURE_RETRY(::close(m_fd));
-  free(m_log_buf);
-
-  pthread_mutex_destroy(&m_queue_mutex);
-  pthread_mutex_destroy(&m_flush_mutex);
-  pthread_cond_destroy(&m_cond_loggers);
-  pthread_cond_destroy(&m_cond_flusher);
 }
 
 
 ///
 void Log::set_coarse_timestamps(bool coarse) {
+  std::scoped_lock lock(m_flush_mutex);
   if (coarse)
     clock.coarsen();
   else
@@ -108,6 +67,7 @@ void Log::set_coarse_timestamps(bool coarse) {
 
 void Log::set_flush_on_exit()
 {
+  std::scoped_lock lock(m_flush_mutex);
   // Make sure we flush on shutdown.  We do this by deliberately
   // leaking an indirect pointer to ourselves (on_exit() can't
   // unregister a callback).  This is not racy only becuase we
@@ -118,33 +78,33 @@ void Log::set_flush_on_exit()
   }
 }
 
-void Log::set_max_new(int n)
+void Log::set_max_new(std::size_t n)
 {
+  std::scoped_lock lock(m_queue_mutex);
   m_max_new = n;
 }
 
-void Log::set_max_recent(int n)
+void Log::set_max_recent(std::size_t n)
 {
-  pthread_mutex_lock(&m_flush_mutex);
-  m_flush_mutex_holder = pthread_self();
+  std::scoped_lock lock(m_flush_mutex);
   m_max_recent = n;
-  m_flush_mutex_holder = 0;
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
-void Log::set_log_file(string fn)
+void Log::set_log_file(std::string_view fn)
 {
+  std::scoped_lock lock(m_flush_mutex);
   m_log_file = fn;
 }
 
-void Log::set_log_stderr_prefix(const std::string& p)
+void Log::set_log_stderr_prefix(std::string_view p)
 {
+  std::scoped_lock lock(m_flush_mutex);
   m_log_stderr_prefix = p;
 }
 
 void Log::reopen_log_file()
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock(m_flush_mutex);
   m_flush_mutex_holder = pthread_self();
   if (m_fd >= 0)
     VOID_TEMP_FAILURE_RETRY(::close(m_fd));
@@ -154,7 +114,7 @@ void Log::reopen_log_file()
       int r = ::fchown(m_fd, m_uid, m_gid);
       if (r < 0) {
        r = -errno;
-       cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r)
+       std::cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r)
             << std::endl;
       }
     }
@@ -162,152 +122,101 @@ void Log::reopen_log_file()
     m_fd = -1;
   }
   m_flush_mutex_holder = 0;
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
 void Log::chown_log_file(uid_t uid, gid_t gid)
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock(m_flush_mutex);
   if (m_fd >= 0) {
     int r = ::fchown(m_fd, uid, gid);
     if (r < 0) {
       r = -errno;
-      cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r)
+      std::cerr << "failed to chown " << m_log_file << ": " << cpp_strerror(r)
           << std::endl;
     }
   }
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
 void Log::set_syslog_level(int log, int crash)
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock(m_flush_mutex);
   m_syslog_log = log;
   m_syslog_crash = crash;
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
 void Log::set_stderr_level(int log, int crash)
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock(m_flush_mutex);
   m_stderr_log = log;
   m_stderr_crash = crash;
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
 void Log::set_graylog_level(int log, int crash)
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock(m_flush_mutex);
   m_graylog_log = log;
   m_graylog_crash = crash;
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
 void Log::start_graylog()
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock(m_flush_mutex);
   if (! m_graylog.get())
     m_graylog = std::make_shared<Graylog>(m_subs, "dlog");
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
 
 void Log::stop_graylog()
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock(m_flush_mutex);
   m_graylog.reset();
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
-void Log::submit_entry(Entry *e)
+void Log::submit_entry(Entry&& e)
 {
-  e->finish();
-
-  pthread_mutex_lock(&m_queue_mutex);
+  std::unique_lock lock(m_queue_mutex);
   m_queue_mutex_holder = pthread_self();
 
-  if (m_inject_segv)
+  if (unlikely(m_inject_segv))
     *(volatile int *)(0) = 0xdead;
 
   // wait for flush to catch up
-  while (m_new.m_len > m_max_new)
-    pthread_cond_wait(&m_cond_loggers, &m_queue_mutex);
-
-  m_new.enqueue(e);
-  pthread_cond_signal(&m_cond_flusher);
-  m_queue_mutex_holder = 0;
-  pthread_mutex_unlock(&m_queue_mutex);
-}
-
-
-Entry *Log::create_entry(int level, int subsys, const char* msg)
-{
-  if (true) {
-    return new Entry(clock.now(),
-                    pthread_self(),
-                    level, subsys, msg);
-  } else {
-    // kludge for perf testing
-    Entry *e = m_recent.dequeue();
-    e->m_stamp = clock.now();
-    e->m_thread = pthread_self();
-    e->m_prio = level;
-    e->m_subsys = subsys;
-    return e;
+  while (m_new.size() > m_max_new) {
+    if (m_stop) break; // force addition
+    m_cond_loggers.wait(lock);
   }
-}
 
-Entry *Log::create_entry(int level, int subsys, size_t* expected_size)
-{
-  if (true) {
-    ANNOTATE_BENIGN_RACE_SIZED(expected_size, sizeof(*expected_size),
-                               "Log hint");
-    size_t size = __atomic_load_n(expected_size, __ATOMIC_RELAXED);
-    void *ptr = ::operator new(sizeof(Entry) + size);
-    return new(ptr) Entry(clock.now(),
-       pthread_self(), level, subsys,
-       reinterpret_cast<char*>(ptr) + sizeof(Entry), size, expected_size);
-  } else {
-    // kludge for perf testing
-    Entry *e = m_recent.dequeue();
-    e->m_stamp = clock.now();
-    e->m_thread = pthread_self();
-    e->m_prio = level;
-    e->m_subsys = subsys;
-    return e;
-  }
+  m_new.emplace_back(std::move(e));
+  m_cond_flusher.notify_all();
+  m_queue_mutex_holder = 0;
 }
 
 void Log::flush()
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock1(m_flush_mutex);
   m_flush_mutex_holder = pthread_self();
-  pthread_mutex_lock(&m_queue_mutex);
-  m_queue_mutex_holder = pthread_self();
-  EntryQueue t;
-  t.swap(m_new);
-  pthread_cond_broadcast(&m_cond_loggers);
-  m_queue_mutex_holder = 0;
-  pthread_mutex_unlock(&m_queue_mutex);
-  _flush(&t, &m_recent, false);
 
-  // trim
-  while (m_recent.m_len > m_max_recent) {
-    m_recent.dequeue()->destroy();
+  {
+    std::scoped_lock lock2(m_queue_mutex);
+    m_queue_mutex_holder = pthread_self();
+    assert(m_flush.empty());
+    m_flush.swap(m_new);
+    m_cond_loggers.notify_all();
+    m_queue_mutex_holder = 0;
   }
 
+  _flush(m_flush, true, false);
   m_flush_mutex_holder = 0;
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
-void Log::_log_safe_write(const char* what, size_t write_len)
+void Log::_log_safe_write(std::string_view sv)
 {
   if (m_fd < 0)
     return;
-  int r = safe_write(m_fd, what, write_len);
+  int r = safe_write(m_fd, sv.data(), sv.size());
   if (r != m_fd_last_error) {
     if (r < 0)
-      cerr << "problem writing to " << m_log_file
+      std::cerr << "problem writing to " << m_log_file
            << ": " << cpp_strerror(r)
            << std::endl;
     m_fd_last_error = r;
@@ -316,98 +225,72 @@ void Log::_log_safe_write(const char* what, size_t write_len)
 
 void Log::_flush_logbuf()
 {
-  if (m_log_buf_pos) {
-    _log_safe_write(m_log_buf, m_log_buf_pos);
-    m_log_buf_pos = 0;
+  if (m_log_buf.size()) {
+    _log_safe_write(std::string_view(m_log_buf.data(), m_log_buf.size()));
+    m_log_buf.resize(0);
   }
 }
 
-void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
+void Log::_flush(EntryVector& t, bool requeue, bool crash)
 {
-  Entry *e = nullptr;
   long len = 0;
   if (crash) {
-    len = t->m_len;
+    len = t.size();
   }
-  if (!requeue) {
-    e = t->m_head;
-    if (!e) {
-      return;
-    }
+  if (!requeue && t.empty()) {
+    return;
   }
-  while (true) {
-    if (requeue) {
-      e = t->dequeue();
-      if (!e) {
-       break;
-      }
-      requeue->enqueue(e);
-    } else {
-      e = e->m_next;
-      if (!e) {
-       break;
-      }
-    }
-
-    unsigned sub = e->m_subsys;
-
-    bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio;
+  for (auto& e : t) {
+    auto prio = e.m_prio;
+    auto stamp = e.m_stamp;
+    auto sub = e.m_subsys;
+    auto thread = e.m_thread;
+    auto str = e.strv();
+
+    bool should_log = crash || m_subs->get_log_level(sub) >= prio;
     bool do_fd = m_fd >= 0 && should_log;
-    bool do_syslog = m_syslog_crash >= e->m_prio && should_log;
-    bool do_stderr = m_stderr_crash >= e->m_prio && should_log;
-    bool do_graylog2 = m_graylog_crash >= e->m_prio && should_log;
+    bool do_syslog = m_syslog_crash >= prio && should_log;
+    bool do_stderr = m_stderr_crash >= prio && should_log;
+    bool do_graylog2 = m_graylog_crash >= prio && should_log;
 
-    e->hint_size();
     if (do_fd || do_syslog || do_stderr) {
-      size_t line_used = 0;
+      const std::size_t cur = m_log_buf.size();
+      std::size_t used = 0;
+      const std::size_t allocated = e.size() + 80;
+      m_log_buf.resize(cur + allocated);
 
-      char *line;
-      size_t line_size = 80 + e->size();
-      bool need_dynamic = line_size >= MAX_LOG_BUF;
-
-      // this flushes the existing buffers if either line is longer
-      // than our buffer, or buffer is too full to fit it
-      if (m_log_buf_pos + line_size >= MAX_LOG_BUF) {
-       _flush_logbuf();
-      }
-      if (need_dynamic) {
-        line = new char[line_size];
-      } else {
-        line = &m_log_buf[m_log_buf_pos];
-      }
+      char* const start = m_log_buf.data();
+      char* pos = start + cur;
 
       if (crash) {
-        line_used += snprintf(line, line_size, "%6ld> ", -(--len));
+        used += (std::size_t)snprintf(pos + used, allocated - used, "%6ld> ", -(--len));
       }
-      line_used += append_time(e->m_stamp, line + line_used, line_size - line_used);
-      line_used += snprintf(line + line_used, line_size - line_used, " %lx %2d ",
-                       (unsigned long)e->m_thread, e->m_prio);
-
-      line_used += e->snprintf(line + line_used, line_size - line_used - 1);
-      ceph_assert(line_used < line_size - 1);
+      used += (std::size_t)append_time(stamp, pos + used, allocated - used);
+      used += (std::size_t)snprintf(pos + used, allocated - used, " %lx %2d ", (unsigned long)thread, prio);
+      memcpy(pos + used, str.data(), str.size());
+      used += str.size();
+      pos[used] = '\0';
+      ceph_assert((used + 1 /* '\n' */) < allocated);
 
       if (do_syslog) {
-        syslog(LOG_USER|LOG_INFO, "%s", line);
+        syslog(LOG_USER|LOG_INFO, "%s", pos);
       }
 
       if (do_stderr) {
-        cerr << m_log_stderr_prefix << line << std::endl;
+        std::cerr << m_log_stderr_prefix << std::string_view(pos, used) << std::endl;
       }
 
+      /* now add newline */
+      pos[used++] = '\n';
+
       if (do_fd) {
-       line[line_used] = '\n';
-       if (need_dynamic) {
-         _log_safe_write(line, line_used + 1);
-         m_log_buf_pos = 0;
-       } else {
-         m_log_buf_pos += line_used + 1;
-       }
+        m_log_buf.resize(cur + used);
       } else {
-       m_log_buf_pos = 0;
+        m_log_buf.resize(0);
       }
 
-      if (need_dynamic) {
-        delete[] line;
+      if (m_log_buf.size() > MAX_LOG_BUF) {
+        _flush_logbuf();
       }
     }
 
@@ -415,10 +298,13 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
       m_graylog->log_entry(e);
     }
 
+    if (requeue) {
+      m_recent.push_back(std::move(e));
+    }
   }
+  t.clear();
 
   _flush_logbuf();
-
 }
 
 void Log::_log_message(const char *s, bool crash)
@@ -431,35 +317,40 @@ void Log::_log_message(const char *s, bool crash)
     b += '\n';
     int r = safe_write(m_fd, b.c_str(), b.size());
     if (r < 0)
-      cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl;
+      std::cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl;
   }
   if ((crash ? m_syslog_crash : m_syslog_log) >= 0) {
     syslog(LOG_USER|LOG_INFO, "%s", s);
   }
 
   if ((crash ? m_stderr_crash : m_stderr_log) >= 0) {
-    cerr << s << std::endl;
+    std::cerr << s << std::endl;
   }
 }
 
 void Log::dump_recent()
 {
-  pthread_mutex_lock(&m_flush_mutex);
+  std::scoped_lock lock1(m_flush_mutex);
   m_flush_mutex_holder = pthread_self();
 
-  pthread_mutex_lock(&m_queue_mutex);
-  m_queue_mutex_holder = pthread_self();
-
-  EntryQueue t;
-  t.swap(m_new);
+  {
+    std::scoped_lock lock2(m_queue_mutex);
+    m_queue_mutex_holder = pthread_self();
+    assert(m_flush.empty());
+    m_flush.swap(m_new);
+    m_queue_mutex_holder = 0;
+  }
 
-  m_queue_mutex_holder = 0;
-  pthread_mutex_unlock(&m_queue_mutex);
-  _flush(&t, &m_recent, false);
+  _flush(m_flush, true, false);
   _flush_logbuf();
 
   _log_message("--- begin dump of recent events ---", true);
-  _flush(&m_recent, nullptr, true);
+  {
+    EntryVector t;
+    t.insert(t.end(), std::make_move_iterator(m_recent.begin()), std::make_move_iterator(m_recent.end()));
+    m_recent.clear();
+    _flush(t, false, true);
+  }
 
   char buf[4096];
   _log_message("--- logging levels ---", true);
@@ -472,9 +363,9 @@ void Log::dump_recent()
   _log_message(buf, true);
   sprintf(buf, "  %2d/%2d (stderr threshold)", m_stderr_log, m_stderr_crash);
   _log_message(buf, true);
-  sprintf(buf, "  max_recent %9d", m_max_recent);
+  sprintf(buf, "  max_recent %9zu", m_max_recent);
   _log_message(buf, true);
-  sprintf(buf, "  max_new    %9d", m_max_new);
+  sprintf(buf, "  max_new    %9zu", m_max_new);
   _log_message(buf, true);
   sprintf(buf, "  log_file %s", m_log_file.c_str());
   _log_message(buf, true);
@@ -484,48 +375,50 @@ void Log::dump_recent()
   _flush_logbuf();
 
   m_flush_mutex_holder = 0;
-  pthread_mutex_unlock(&m_flush_mutex);
 }
 
 void Log::start()
 {
   ceph_assert(!is_started());
-  pthread_mutex_lock(&m_queue_mutex);
-  m_stop = false;
-  pthread_mutex_unlock(&m_queue_mutex);
+  {
+    std::scoped_lock lock(m_queue_mutex);
+    m_stop = false;
+  }
   create("log");
 }
 
 void Log::stop()
 {
   if (is_started()) {
-    pthread_mutex_lock(&m_queue_mutex);
-    m_stop = true;
-    pthread_cond_signal(&m_cond_flusher);
-    pthread_cond_broadcast(&m_cond_loggers);
-    pthread_mutex_unlock(&m_queue_mutex);
+    {
+      std::scoped_lock lock(m_queue_mutex);
+      m_stop = true;
+      m_cond_flusher.notify_one();
+      m_cond_loggers.notify_all();
+    }
     join();
   }
 }
 
 void *Log::entry()
 {
-  pthread_mutex_lock(&m_queue_mutex);
-  m_queue_mutex_holder = pthread_self();
-  while (!m_stop) {
-    if (!m_new.empty()) {
-      m_queue_mutex_holder = 0;
-      pthread_mutex_unlock(&m_queue_mutex);
-      flush();
-      pthread_mutex_lock(&m_queue_mutex);
-      m_queue_mutex_holder = pthread_self();
-      continue;
-    }
+  {
+    std::unique_lock lock(m_queue_mutex);
+    m_queue_mutex_holder = pthread_self();
+    while (!m_stop) {
+      if (!m_new.empty()) {
+        m_queue_mutex_holder = 0;
+        lock.unlock();
+        flush();
+        lock.lock();
+        m_queue_mutex_holder = pthread_self();
+        continue;
+      }
 
-    pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);
+      m_cond_flusher.wait(lock);
+    }
+    m_queue_mutex_holder = 0;
   }
-  m_queue_mutex_holder = 0;
-  pthread_mutex_unlock(&m_queue_mutex);
   flush();
   return NULL;
 }
index 07505e6de25836304345f35ec37450d2aaa9cfaf..ac10cf9047d959a3637b1fb2ba2d6db02a63d387 100644 (file)
@@ -4,66 +4,80 @@
 #ifndef __CEPH_LOG_LOG_H
 #define __CEPH_LOG_LOG_H
 
+#include <boost/circular_buffer.hpp>
+
+#include <condition_variable>
 #include <memory>
+#include <mutex>
+#include <queue>
+#include <string>
+#include <string_view>
 
 #include "common/Thread.h"
+#include "common/likely.h"
 
-#include "EntryQueue.h"
+#include "log/Entry.h"
 
 namespace ceph {
 namespace logging {
 
 class Graylog;
 class SubsystemMap;
-class Entry;
 
 class Log : private Thread
 {
+  using EntryRing = boost::circular_buffer<ConcreteEntry>;
+  using EntryVector = std::vector<ConcreteEntry>;
+
+  static const std::size_t DEFAULT_MAX_NEW = 100;
+  static const std::size_t DEFAULT_MAX_RECENT = 10000;
+
   Log **m_indirect_this;
   log_clock clock;
 
   const SubsystemMap *m_subs;
 
-  pthread_mutex_t m_queue_mutex;
-  pthread_mutex_t m_flush_mutex;
-  pthread_cond_t m_cond_loggers;
-  pthread_cond_t m_cond_flusher;
+  std::mutex m_queue_mutex;
+  std::mutex m_flush_mutex;
+  std::condition_variable m_cond_loggers;
+  std::condition_variable m_cond_flusher;
 
   pthread_t m_queue_mutex_holder;
   pthread_t m_flush_mutex_holder;
 
-  EntryQueue m_new;    ///< new entries
-  EntryQueue m_recent; ///< recent (less new) entries we've already written at low detail
+  EntryVector m_new;    ///< new entries
+  EntryRing m_recent; ///< recent (less new) entries we've already written at low detail
+  EntryVector m_flush; ///< entries to be flushed (here to optimize heap allocations)
 
   std::string m_log_file;
-  int m_fd;
-  uid_t m_uid;
-  gid_t m_gid;
+  int m_fd = -1;
+  uid_t m_uid = 0;
+  gid_t m_gid = 0;
 
-  int m_fd_last_error;  ///< last error we say writing to fd (if any)
+  int m_fd_last_error = 0;  ///< last error we say writing to fd (if any)
 
-  int m_syslog_log, m_syslog_crash;
-  int m_stderr_log, m_stderr_crash;
-  int m_graylog_log, m_graylog_crash;
+  int m_syslog_log = -2, m_syslog_crash = -2;
+  int m_stderr_log = -1, m_stderr_crash = -1;
+  int m_graylog_log = -3, m_graylog_crash = -3;
 
   std::string m_log_stderr_prefix;
 
   std::shared_ptr<Graylog> m_graylog;
 
-  char* m_log_buf;   ///< coalescing buffer
-  int m_log_buf_pos; ///< where we're at within coalescing buffer
+  std::vector<char> m_log_buf;
 
-  bool m_stop;
+  bool m_stop = false;
 
-  int m_max_new, m_max_recent;
+  std::size_t m_max_new = DEFAULT_MAX_NEW;
+  std::size_t m_max_recent = DEFAULT_MAX_RECENT;
 
-  bool m_inject_segv;
+  bool m_inject_segv = false;
 
   void *entry() override;
 
-  void _log_safe_write(const char* what, size_t write_len);
+  void _log_safe_write(std::string_view sv);
   void _flush_logbuf();
-  void _flush(EntryQueue *q, EntryQueue *requeue, bool crash);
+  void _flush(EntryVector& q, bool requeue, bool crash);
 
   void _log_message(const char *s, bool crash);
 
@@ -74,12 +88,12 @@ public:
   void set_flush_on_exit();
 
   void set_coarse_timestamps(bool coarse);
-  void set_max_new(int n);
-  void set_max_recent(int n);
-  void set_log_file(std::string fn);
+  void set_max_new(std::size_t n);
+  void set_max_recent(std::size_t n);
+  void set_log_file(std::string_view fn);
   void reopen_log_file();
   void chown_log_file(uid_t uid, gid_t gid);
-  void set_log_stderr_prefix(const std::string& p);
+  void set_log_stderr_prefix(std::string_view p);
 
   void flush();
 
@@ -94,9 +108,7 @@ public:
 
   std::shared_ptr<Graylog> graylog() { return m_graylog; }
 
-  Entry *create_entry(int level, int subsys, const char* msg = nullptr);
-  Entry *create_entry(int level, int subsys, size_t* expected_size);
-  void submit_entry(Entry *e);
+  void submit_entry(Entry&& e);
 
   void start();
   void stop();
index fae67e964fc8efeacdee553e58af862aac8f0646..d127064bdb0b2ccf23f6e3129766c72eedcf0d82 100644 (file)
@@ -41,8 +41,8 @@ TEST(Log, Simple)
     int sys = i % 4;
     int l = 5 + (i%4);
     if (subs.should_gather(sys, l)) {
-      Entry *e = log.create_entry(l, sys, "hello world");
-      log.submit_entry(e);
+      MutableEntry e(l, sys);
+      log.submit_entry(std::move(e));
     }
   }
   
@@ -65,18 +65,18 @@ TEST(Log, ReuseBad)
 
   const int l = 0;
   {
-    auto e = log.create_entry(l, 1);
-    auto& out = e->get_ostream();
+    MutableEntry e(l, 1);
+    auto& out = e.get_ostream();
     out << (std::streambuf*)nullptr;
     EXPECT_TRUE(out.bad()); // writing nullptr to a stream sets its badbit
-    log.submit_entry(e);
+    log.submit_entry(std::move(e));
   }
   {
-    auto e = log.create_entry(l, 1);
-    auto& out = e->get_ostream();
+    MutableEntry e(l, 1);
+    auto& out = e.get_ostream();
     EXPECT_FALSE(out.bad()); // should not see failures from previous log entry
     out << "hello world";
-    log.submit_entry(e);
+    log.submit_entry(std::move(e));
   }
 
   log.flush();
@@ -97,7 +97,7 @@ TEST(Log, ManyNoGather)
   for (int i=0; i<many; i++) {
     int l = 10;
     if (subs.should_gather(1, l))
-      log.submit_entry(log.create_entry(l, 1));
+      log.submit_entry(MutableEntry(1, 0));
   }
   log.flush();
   log.stop();
@@ -105,25 +105,6 @@ TEST(Log, ManyNoGather)
 
 
 TEST(Log, ManyGatherLog)
-{
-  SubsystemMap subs;
-  subs.set_log_level(1, 20);
-  subs.set_gather_level(1, 10);
-  Log log(&subs);
-  log.start();
-  log.set_log_file("/tmp/big");
-  log.reopen_log_file();
-  for (int i=0; i<many; i++) {
-    int l = 10;
-    if (subs.should_gather(1, l))
-      log.submit_entry(log.create_entry(l, 1,
-                                       "this is a long string asdf asdf asdf asdf asdf asdf asd fasd fasdf "));
-  }
-  log.flush();
-  log.stop();
-}
-
-TEST(Log, ManyGatherLogStringAssign)
 {
   SubsystemMap subs;
   subs.set_log_level(1, 20);
@@ -135,41 +116,16 @@ TEST(Log, ManyGatherLogStringAssign)
   for (int i=0; i<many; i++) {
     int l = 10;
     if (subs.should_gather(1, l)) {
-      Entry *e = log.create_entry(l, 1);
-      ostringstream oss;
-      oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
-      e->set_str(oss.str());
-      log.submit_entry(e);
-    }
-  }
-  log.flush();
-  log.stop();
-}
-TEST(Log, ManyGatherLogStringAssignWithReserve)
-{
-  SubsystemMap subs;
-  subs.set_log_level(1, 20);
-  subs.set_gather_level(1, 10);
-  Log log(&subs);
-  log.start();
-  log.set_log_file("/tmp/big");
-  log.reopen_log_file();
-  for (int i=0; i<many; i++) {
-    int l = 10;
-    if (subs.should_gather(1, l)) {
-      Entry *e = log.create_entry(l, 1);
-      ostringstream oss;
-      oss.str().reserve(80);
-      oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
-      e->set_str(oss.str());
-      log.submit_entry(e);
+      MutableEntry e(l, 1);
+      e.get_ostream() << "this is a long string asdf asdf asdf asdf asdf asdf asd fasd fasdf ";
+      log.submit_entry(std::move(e));
     }
   }
   log.flush();
   log.stop();
 }
 
-TEST(Log, ManyGatherLogPrebuf)
+TEST(Log, ManyGatherLogStringAssign)
 {
   SubsystemMap subs;
   subs.set_log_level(1, 20);
@@ -181,19 +137,16 @@ TEST(Log, ManyGatherLogPrebuf)
   for (int i=0; i<many; i++) {
     int l = 10;
     if (subs.should_gather(1, l)) {
-      Entry *e = log.create_entry(l, 1);
-      PrebufferedStreambuf psb(e->m_static_buf, sizeof(e->m_static_buf));
-      ostream oss(&psb);
-      oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
-      //e->m_str = oss.str();
-      log.submit_entry(e);
+      MutableEntry e(l, 1);
+      e.get_ostream() << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd";
+      log.submit_entry(std::move(e));
     }
   }
   log.flush();
   log.stop();
 }
 
-TEST(Log, ManyGatherLogPrebufOverflow)
+TEST(Log, ManyGatherLogStackSpillover)
 {
   SubsystemMap subs;
   subs.set_log_level(1, 20);
@@ -205,13 +158,11 @@ TEST(Log, ManyGatherLogPrebufOverflow)
   for (int i=0; i<many; i++) {
     int l = 10;
     if (subs.should_gather(1, l)) {
-      Entry *e = log.create_entry(l, 1);
-      PrebufferedStreambuf psb(e->m_static_buf, sizeof(e->m_static_buf));
-      ostream oss(&psb);
-      oss << "this i a long stream asdf asdf asdf asdf asdf asdf asdf asdf asdf as fd"
-          << std::string(sizeof(e->m_static_buf) * 2, '-') ;
-      //e->m_str = oss.str();
-      log.submit_entry(e);
+      MutableEntry e(l, 1);
+      auto& s = e.get_ostream();
+      s << "foo";
+      s << std::string(sizeof(e) * 2, '-');
+      log.submit_entry(std::move(e));
     }
   }
   log.flush();
@@ -230,7 +181,7 @@ TEST(Log, ManyGather)
   for (int i=0; i<many; i++) {
     int l = 10;
     if (subs.should_gather(1, l))
-      log.submit_entry(log.create_entry(l, 1));
+      log.submit_entry(MutableEntry(l, 1));
   }
   log.flush();
   log.stop();
@@ -247,10 +198,10 @@ void do_segv()
   log.reopen_log_file();
 
   log.inject_segv();
-  Entry *e = log.create_entry(10, 1);
+  MutableEntry e(10, 1);
   {
     PrCtl unset_dumpable;
-    log.submit_entry(e);  // this should segv
+    log.submit_entry(std::move(e));  // this should segv
   }
 
   log.flush();
@@ -272,11 +223,12 @@ TEST(Log, LargeLog)
   log.set_log_file("/tmp/big");
   log.reopen_log_file();
   int l = 10;
-  Entry *e = log.create_entry(l, 1);
-
-  std::string msg(10000000, 0);
-  e->set_str(msg);
-  log.submit_entry(e);
+  {
+    MutableEntry e(l, 1);
+    std::string msg(10000000, 'a');
+    e.get_ostream() << msg;
+    log.submit_entry(std::move(e));
+  }
   log.flush();
   log.stop();
 }
@@ -295,8 +247,9 @@ TEST(Log, TimeSwitch)
   int l = 10;
   bool coarse = true;
   for (auto i = 0U; i < 300; ++i) {
-    log.submit_entry(
-      log.create_entry(l, 1, "SQUID THEFT! PUNISHABLE BY DEATH!"));
+    MutableEntry e(l, 1);
+    e.get_ostream() << "SQUID THEFT! PUNISHABLE BY DEATH!";
+    log.submit_entry(std::move(e));
     if (i % 50)
       log.set_coarse_timestamps(coarse = !coarse);
   }