]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common: enable sending local logs to journald
author胡玮文 <huww98@outlook.com>
Wed, 3 Mar 2021 07:01:56 +0000 (15:01 +0800)
committer胡玮文 <huww98@outlook.com>
Tue, 9 Mar 2021 12:14:00 +0000 (20:14 +0800)
Enable ceph daemons to directly send logs to journald via unix domain socket.

While sending logs, metadata like priority, thread, timestamp is sent as
structured data. And can be queried by journalctl.

Note that I don't use libsystemd because I want the implementation to be as
efficient as possible.

Signed-off-by: 胡玮文 <huww98@outlook.com>
src/common/CMakeLists.txt
src/common/Journald.cc [new file with mode: 0644]
src/common/Journald.h [new file with mode: 0644]
src/common/ceph_context.cc
src/common/options.cc
src/crimson/CMakeLists.txt
src/log/Log.cc
src/log/Log.h

index 498e2dd2de011f5155824cf0fd69bd14ae4d4474..d31c74b273f17eb97521f85057240ba1bc3408b9 100644 (file)
@@ -25,6 +25,7 @@ set(common_srcs
   Graylog.cc
   HTMLFormatter.cc
   HeartbeatMap.cc
+  Journald.cc
   LogClient.cc
   LogEntry.cc
   ostream_temp.cc
diff --git a/src/common/Journald.cc b/src/common/Journald.cc
new file mode 100644 (file)
index 0000000..6962f96
--- /dev/null
@@ -0,0 +1,255 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Journald.h"
+
+#include <endian.h>
+#include <fcntl.h>
+#include <memory>
+#include <string>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <syslog.h>
+#include <unistd.h>
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include "include/ceph_assert.h"
+#include "common/LogEntry.h"
+#include "log/Entry.h"
+#include "log/SubsystemMap.h"
+
+
+namespace ceph::logging {
+
+namespace {
+const struct sockaddr_un sockaddr = {
+  AF_UNIX,
+  "/run/systemd/journal/socket",
+};
+
+ssize_t sendmsg_fd(int transport_fd, int fd)
+{
+  constexpr size_t control_len = CMSG_LEN(sizeof(int));
+  char control[control_len];
+  struct msghdr mh = {
+    (struct sockaddr*)&sockaddr, // msg_name
+    sizeof(sockaddr),            // msg_namelen
+    nullptr,                     // msg_iov
+    0,                           // msg_iovlen
+    &control,                    // msg_control
+    control_len,                 // msg_controllen
+  };
+  ceph_assert(transport_fd >= 0);
+
+  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&mh);
+  cmsg->cmsg_level = SOL_SOCKET;
+  cmsg->cmsg_type = SCM_RIGHTS;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+  *reinterpret_cast<int *>(CMSG_DATA(cmsg)) = fd;
+
+  return sendmsg(transport_fd, &mh, MSG_NOSIGNAL);
+}
+
+char map_prio(short ceph_prio)
+{
+  if (ceph_prio < 0)
+    return LOG_ERR;
+  if (ceph_prio == 0)
+    return LOG_WARNING;
+  if (ceph_prio < 5)
+    return LOG_NOTICE;
+  if (ceph_prio < 10)
+    return LOG_INFO;
+  return LOG_DEBUG;
+}
+}
+
+namespace detail {
+class EntryEncoderBase {
+ public:
+  EntryEncoderBase():
+    m_msg_vec {
+      {}, {}, { (char *)"\n", 1 },
+    } 
+  {
+  }
+
+  constexpr struct iovec *iovec() { return this->m_msg_vec; }
+  constexpr std::size_t iovec_len()
+  {
+    return sizeof(m_msg_vec) / sizeof(m_msg_vec[0]);
+  }
+
+ protected:
+  fmt::memory_buffer meta_buf;
+  struct iovec m_msg_vec[3];
+};
+
+class EntryEncoder : public EntryEncoderBase {
+ public:
+  void encode(const Entry& e, const SubsystemMap *s)
+  {
+    meta_buf.clear();
+    fmt::format_to(meta_buf,
+      R"(PRIORITY={:d}
+CEPH_SUBSYS={}
+TIMESTAMP={}
+CEPH_PRIO={}
+THREAD={:016x}
+MESSAGE
+)",
+      map_prio(e.m_prio),
+      s->get_name(e.m_subsys),
+      e.m_stamp.time_since_epoch().count().count,
+      e.m_prio,
+      e.m_thread);
+
+    uint64_t msg_len = htole64(e.size());
+    meta_buf.resize(meta_buf.size() + sizeof(msg_len));
+    *(reinterpret_cast<uint64_t*>(meta_buf.end()) - 1) = htole64(e.size());
+
+    m_msg_vec[0].iov_base = meta_buf.data();
+    m_msg_vec[0].iov_len = meta_buf.size();
+
+    m_msg_vec[1].iov_base = (void *)e.strv().data();
+    m_msg_vec[1].iov_len = e.size();
+  }
+};
+
+enum class JournaldClient::MemFileMode {
+  MEMFD_CREATE,
+  OPEN_TMPFILE,
+  OPEN_UNLINK,  
+};
+
+constexpr const char *mem_file_dir = "/dev/shm";
+
+void JournaldClient::detect_mem_file_mode()
+{
+  int memfd = memfd_create("ceph-journald", MFD_ALLOW_SEALING | MFD_CLOEXEC);
+  if (memfd >= 0) {
+    mem_file_mode = MemFileMode::MEMFD_CREATE;
+    close(memfd);
+    return;
+  }
+  memfd = open(mem_file_dir, O_TMPFILE | O_EXCL | O_CLOEXEC, S_IRUSR | S_IWUSR);
+  if (memfd >= 0) {
+    mem_file_mode = MemFileMode::OPEN_TMPFILE;
+    close(memfd);
+    return;
+  }
+  mem_file_mode = MemFileMode::OPEN_UNLINK;
+}
+
+int JournaldClient::open_mem_file()
+{
+  switch (mem_file_mode) {
+  case MemFileMode::MEMFD_CREATE:
+    return memfd_create("ceph-journald", MFD_ALLOW_SEALING | MFD_CLOEXEC);
+  case MemFileMode::OPEN_TMPFILE:
+    return open(mem_file_dir, O_TMPFILE | O_EXCL | O_CLOEXEC, S_IRUSR | S_IWUSR);
+  case MemFileMode::OPEN_UNLINK:
+    char mem_file_template[] = "/dev/shm/ceph-journald-XXXXXX";
+    int fd = mkostemp(mem_file_template, O_CLOEXEC);
+    unlink(mem_file_template);
+    return fd;
+  }
+  ceph_abort("Unexpected mem_file_mode");
+}
+
+JournaldClient::JournaldClient() :
+  m_msghdr({
+    (struct sockaddr*)&sockaddr, // msg_name
+    sizeof(sockaddr),            // msg_namelen
+  })
+{
+  fd = socket(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC, 0);
+  ceph_assertf(fd > 0, "socket creation failed: %s", strerror(errno));
+
+  int sendbuf = 2 * 1024 * 1024;
+  setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendbuf, sizeof(sendbuf));
+
+  detect_mem_file_mode();
+}
+
+JournaldClient::~JournaldClient()
+{
+  close(fd);
+}
+
+int JournaldClient::send()
+{
+  int ret = sendmsg(fd, &m_msghdr, MSG_NOSIGNAL);
+  if (ret >= 0)
+    return 0;
+
+  /* Fail silently if the journal is not available */
+  if (errno == ENOENT)
+    return -1;
+
+  if (errno != EMSGSIZE && errno != ENOBUFS) {
+    std::cerr << "Failed to send log to journald: " << strerror(errno) << std::endl;
+    return -1;
+  }
+  /* Message doesn't fit... Let's dump the data in a memfd and
+   * just pass a file descriptor of it to the other side.
+   */
+  int buffer_fd = open_mem_file();
+  if (buffer_fd < 0) {
+    std::cerr << "Failed to open buffer_fd while sending log to journald: " << strerror(errno) << std::endl;
+    return -1;
+  }
+
+  ret = writev(buffer_fd, m_msghdr.msg_iov, m_msghdr.msg_iovlen);
+  if (ret < 0) {
+    std::cerr << "Failed to write to buffer_fd while sending log to journald: " << strerror(errno) << std::endl;
+    goto err_close_buffer_fd;
+  }
+
+  if (mem_file_mode == MemFileMode::MEMFD_CREATE) {
+    ret = fcntl(buffer_fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_WRITE | F_SEAL_SEAL);
+    if (ret) {
+      std::cerr << "Failed to seal buffer_fd while sending log to journald: " << strerror(errno) << std::endl;
+      goto err_close_buffer_fd;
+    }
+  }
+  
+  ret = sendmsg_fd(fd, buffer_fd);
+  if (ret < 0) {
+    /* Fail silently if the journal is not available */
+    if (errno == ENOENT)
+      goto err_close_buffer_fd;
+
+    std::cerr << "Failed to send fd while sending log to journald: " << strerror(errno) << std::endl;
+    goto err_close_buffer_fd;
+  }
+  close(buffer_fd);
+  return 0;
+
+err_close_buffer_fd:
+  close(buffer_fd);
+  return -1;
+}
+
+} // namespace ceph::logging::detail
+
+JournaldLogger::JournaldLogger(const SubsystemMap *s) :
+  m_entry_encoder(make_unique<detail::EntryEncoder>()),
+  m_subs(s)
+{
+  client.m_msghdr.msg_iov = m_entry_encoder->iovec();
+  client.m_msghdr.msg_iovlen = m_entry_encoder->iovec_len();
+}
+
+JournaldLogger::~JournaldLogger() = default;
+
+int JournaldLogger::log_entry(const Entry& e)
+{
+  m_entry_encoder->encode(e, m_subs);
+  return client.send();
+}
+
+}
diff --git a/src/common/Journald.h b/src/common/Journald.h
new file mode 100644 (file)
index 0000000..388d843
--- /dev/null
@@ -0,0 +1,65 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_COMMON_JOURNALD_H
+#define CEPH_COMMON_JOURNALD_H
+
+#include <memory>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+namespace ceph {
+
+namespace logging {
+
+namespace detail {
+class EntryEncoder;
+
+class JournaldClient {
+ public:
+  JournaldClient();
+  ~JournaldClient();
+  int send();
+  struct msghdr m_msghdr;
+ private:
+  int fd;
+
+  enum class MemFileMode;
+  MemFileMode mem_file_mode;
+
+  void detect_mem_file_mode();
+  int open_mem_file();
+};
+}
+
+class Entry;
+class SubsystemMap;
+
+/**
+ * Logger to send local logs to journald
+ * 
+ * local logs means @code dout(0) << ... @endcode and similars
+ */
+class JournaldLogger {
+ public:
+  JournaldLogger(const SubsystemMap *s);
+  ~JournaldLogger();
+
+  /**
+   * @returns 0 if log entry is successfully sent, -1 otherwise.
+   */
+  int log_entry(const Entry &e);
+
+ private:
+  detail::JournaldClient client;
+
+  std::unique_ptr<detail::EntryEncoder> m_entry_encoder;
+
+  const SubsystemMap * m_subs;
+};
+
+
+}
+}
+
+#endif
index 25e96b0966bbade00160bab4f8d0778c484b320c..93e94dd6dddd9e364a663cf801b8b9704875688f 100644 (file)
@@ -287,6 +287,8 @@ public:
       "err_to_graylog",
       "log_graylog_host",
       "log_graylog_port",
+      "log_to_journald",
+      "err_to_journald",
       "log_coarse_timestamps",
       "fsid",
       "host",
@@ -350,6 +352,18 @@ public:
       log->graylog()->set_destination(conf->log_graylog_host, conf->log_graylog_port);
     }
 
+    // journald
+    if (changed.count("log_to_journald") || changed.count("err_to_journald")) {
+      int l = conf.get_val<bool>("log_to_journald") ? 99 : (conf.get_val<bool>("err_to_journald") ? -1 : -2);
+      log->set_journald_level(l, l);
+
+      if (l > -2) {
+        log->start_journald_logger();
+      } else {
+        log->stop_journald_logger();
+      }
+    }
+
     if (changed.find("log_coarse_timestamps") != changed.end()) {
       log->set_coarse_timestamps(conf.get_val<bool>("log_coarse_timestamps"));
     }
index 8673abc4cf05d6465e61ed14bf2d994caf6a705e..d86306e9b8cc75d997e2efb4fa3ce7e60d4c61d7 100644 (file)
@@ -665,6 +665,16 @@ std::vector<Option> get_global_options() {
     .set_description("port number for the remote graylog server")
     .add_see_also("log_graylog_host"),
 
+    Option("log_to_journald", Option::TYPE_BOOL, Option::LEVEL_BASIC)
+    .set_default(false)
+    .set_description("send log lines to journald")
+    .add_see_also("err_to_journald"),
+
+    Option("err_to_journald", Option::TYPE_BOOL, Option::LEVEL_BASIC)
+    .set_default(false)
+    .set_description("send critical error log lines to journald")
+    .add_see_also("log_to_journald"),
+
     Option("log_coarse_timestamps", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
     .set_default(true)
     .set_description("timestamp log entries from coarse system clock "
index 1c1f897e7596fe7c23b944ad6671002776c9cd86..89572322f3146415e48d815886e10b0ef8ab6414 100644 (file)
@@ -78,6 +78,7 @@ add_library(crimson-common STATIC
   ${PROJECT_SOURCE_DIR}/src/common/HTMLFormatter.cc
   ${PROJECT_SOURCE_DIR}/src/common/Formatter.cc
   ${PROJECT_SOURCE_DIR}/src/common/Graylog.cc
+  ${PROJECT_SOURCE_DIR}/src/common/Journald.cc
   ${PROJECT_SOURCE_DIR}/src/common/ostream_temp.cc
   ${PROJECT_SOURCE_DIR}/src/common/LogEntry.cc
   ${PROJECT_SOURCE_DIR}/src/common/TextTable.cc
index 28f1dbf21e328f91b6b0c2ec7389a561a7af9271..98c9bfce68efbadcb07a2bc9908778e47926fc3d 100644 (file)
@@ -6,6 +6,7 @@
 #include "common/errno.h"
 #include "common/safe_io.h"
 #include "common/Graylog.h"
+#include "common/Journald.h"
 #include "common/valgrind.h"
 
 #include "include/ceph_assert.h"
@@ -178,6 +179,27 @@ void Log::stop_graylog()
   m_graylog.reset();
 }
 
+void Log::set_journald_level(int log, int crash)
+{
+  std::scoped_lock lock(m_flush_mutex);
+  m_journald_log = log;
+  m_journald_crash = crash;
+}
+
+void Log::start_journald_logger()
+{
+  std::scoped_lock lock(m_flush_mutex);
+  if (!m_journald) {
+    m_journald = std::make_unique<JournaldLogger>(m_subs);
+  }
+}
+
+void Log::stop_journald_logger()
+{
+  std::scoped_lock lock(m_flush_mutex);
+  m_journald.reset();
+}
+
 void Log::submit_entry(Entry&& e)
 {
   std::unique_lock lock(m_queue_mutex);
@@ -260,6 +282,7 @@ void Log::_flush(EntryVector& t, bool crash)
     bool do_syslog = m_syslog_crash >= prio && should_log;
     bool do_stderr = m_stderr_crash >= prio && should_log;
     bool do_graylog2 = m_graylog_crash >= prio && should_log;
+    bool do_journald = m_journald_crash >= prio && should_log;
 
     if (do_fd || do_syslog || do_stderr) {
       const std::size_t cur = m_log_buf.size();
@@ -306,6 +329,10 @@ void Log::_flush(EntryVector& t, bool crash)
       m_graylog->log_entry(e);
     }
 
+    if (do_journald && m_journald) {
+      m_journald->log_entry(e);
+    }
+
     m_recent.push_back(std::move(e));
   }
   t.clear();
index 5eeed410171bf93f1cc24b3de3cc2ea425a8e432..ad3a44b4fee3122ba1cfe9b06b4db91e466b8fc8 100644 (file)
@@ -22,6 +22,7 @@ namespace ceph {
 namespace logging {
 
 class Graylog;
+class JournaldLogger;
 class SubsystemMap;
 
 class Log : private Thread
@@ -58,10 +59,12 @@ class Log : private Thread
   int m_syslog_log = -2, m_syslog_crash = -2;
   int m_stderr_log = -1, m_stderr_crash = -1;
   int m_graylog_log = -3, m_graylog_crash = -3;
+  int m_journald_log = -3, m_journald_crash = -3;
 
   std::string m_log_stderr_prefix;
 
   std::shared_ptr<Graylog> m_graylog;
+  std::unique_ptr<JournaldLogger> m_journald;
 
   std::vector<char> m_log_buf;
 
@@ -107,6 +110,11 @@ public:
   void start_graylog();
   void stop_graylog();
 
+  void set_journald_level(int log, int crash);
+
+  void start_journald_logger();
+  void stop_journald_logger();
+
   std::shared_ptr<Graylog> graylog() { return m_graylog; }
 
   void submit_entry(Entry&& e);