From: 胡玮文 Date: Wed, 3 Mar 2021 07:01:56 +0000 (+0800) Subject: common: enable sending local logs to journald X-Git-Tag: v17.1.0~2374^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f7a4b8c5944e5575f5eaeeed771483d78bd0c343;p=ceph.git common: enable sending local logs to journald Enable ceph daemons to directly send logs to journald via unix domain socket. While sending logs, metadata like priority, thread, timestamp is sent as structured data. And can be queried by journalctl. Note that I don't use libsystemd because I want the implementation to be as efficient as possible. Signed-off-by: 胡玮文 --- diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 498e2dd2de0..d31c74b273f 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -25,6 +25,7 @@ set(common_srcs Graylog.cc HTMLFormatter.cc HeartbeatMap.cc + Journald.cc LogClient.cc LogEntry.cc ostream_temp.cc diff --git a/src/common/Journald.cc b/src/common/Journald.cc new file mode 100644 index 00000000000..6962f9630e0 --- /dev/null +++ b/src/common/Journald.cc @@ -0,0 +1,255 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Journald.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "include/ceph_assert.h" +#include "common/LogEntry.h" +#include "log/Entry.h" +#include "log/SubsystemMap.h" + + +namespace ceph::logging { + +namespace { +const struct sockaddr_un sockaddr = { + AF_UNIX, + "/run/systemd/journal/socket", +}; + +ssize_t sendmsg_fd(int transport_fd, int fd) +{ + constexpr size_t control_len = CMSG_LEN(sizeof(int)); + char control[control_len]; + struct msghdr mh = { + (struct sockaddr*)&sockaddr, // msg_name + sizeof(sockaddr), // msg_namelen + nullptr, // msg_iov + 0, // msg_iovlen + &control, // msg_control + control_len, // msg_controllen + }; + ceph_assert(transport_fd >= 0); + + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&mh); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + *reinterpret_cast(CMSG_DATA(cmsg)) = fd; + + return sendmsg(transport_fd, &mh, MSG_NOSIGNAL); +} + +char map_prio(short ceph_prio) +{ + if (ceph_prio < 0) + return LOG_ERR; + if (ceph_prio == 0) + return LOG_WARNING; + if (ceph_prio < 5) + return LOG_NOTICE; + if (ceph_prio < 10) + return LOG_INFO; + return LOG_DEBUG; +} +} + +namespace detail { +class EntryEncoderBase { + public: + EntryEncoderBase(): + m_msg_vec { + {}, {}, { (char *)"\n", 1 }, + } + { + } + + constexpr struct iovec *iovec() { return this->m_msg_vec; } + constexpr std::size_t iovec_len() + { + return sizeof(m_msg_vec) / sizeof(m_msg_vec[0]); + } + + protected: + fmt::memory_buffer meta_buf; + struct iovec m_msg_vec[3]; +}; + +class EntryEncoder : public EntryEncoderBase { + public: + void encode(const Entry& e, const SubsystemMap *s) + { + meta_buf.clear(); + fmt::format_to(meta_buf, + R"(PRIORITY={:d} +CEPH_SUBSYS={} +TIMESTAMP={} +CEPH_PRIO={} +THREAD={:016x} +MESSAGE +)", + map_prio(e.m_prio), + s->get_name(e.m_subsys), + e.m_stamp.time_since_epoch().count().count, + e.m_prio, + e.m_thread); + + uint64_t msg_len = htole64(e.size()); + meta_buf.resize(meta_buf.size() + sizeof(msg_len)); + *(reinterpret_cast(meta_buf.end()) - 1) = htole64(e.size()); + + m_msg_vec[0].iov_base = meta_buf.data(); + m_msg_vec[0].iov_len = meta_buf.size(); + + m_msg_vec[1].iov_base = (void *)e.strv().data(); + m_msg_vec[1].iov_len = e.size(); + } +}; + +enum class JournaldClient::MemFileMode { + MEMFD_CREATE, + OPEN_TMPFILE, + OPEN_UNLINK, +}; + +constexpr const char *mem_file_dir = "/dev/shm"; + +void JournaldClient::detect_mem_file_mode() +{ + int memfd = memfd_create("ceph-journald", MFD_ALLOW_SEALING | MFD_CLOEXEC); + if (memfd >= 0) { + mem_file_mode = MemFileMode::MEMFD_CREATE; + close(memfd); + return; + } + memfd = open(mem_file_dir, O_TMPFILE | O_EXCL | O_CLOEXEC, S_IRUSR | S_IWUSR); + if (memfd >= 0) { + mem_file_mode = MemFileMode::OPEN_TMPFILE; + close(memfd); + return; + } + mem_file_mode = MemFileMode::OPEN_UNLINK; +} + +int JournaldClient::open_mem_file() +{ + switch (mem_file_mode) { + case MemFileMode::MEMFD_CREATE: + return memfd_create("ceph-journald", MFD_ALLOW_SEALING | MFD_CLOEXEC); + case MemFileMode::OPEN_TMPFILE: + return open(mem_file_dir, O_TMPFILE | O_EXCL | O_CLOEXEC, S_IRUSR | S_IWUSR); + case MemFileMode::OPEN_UNLINK: + char mem_file_template[] = "/dev/shm/ceph-journald-XXXXXX"; + int fd = mkostemp(mem_file_template, O_CLOEXEC); + unlink(mem_file_template); + return fd; + } + ceph_abort("Unexpected mem_file_mode"); +} + +JournaldClient::JournaldClient() : + m_msghdr({ + (struct sockaddr*)&sockaddr, // msg_name + sizeof(sockaddr), // msg_namelen + }) +{ + fd = socket(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC, 0); + ceph_assertf(fd > 0, "socket creation failed: %s", strerror(errno)); + + int sendbuf = 2 * 1024 * 1024; + setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendbuf, sizeof(sendbuf)); + + detect_mem_file_mode(); +} + +JournaldClient::~JournaldClient() +{ + close(fd); +} + +int JournaldClient::send() +{ + int ret = sendmsg(fd, &m_msghdr, MSG_NOSIGNAL); + if (ret >= 0) + return 0; + + /* Fail silently if the journal is not available */ + if (errno == ENOENT) + return -1; + + if (errno != EMSGSIZE && errno != ENOBUFS) { + std::cerr << "Failed to send log to journald: " << strerror(errno) << std::endl; + return -1; + } + /* Message doesn't fit... Let's dump the data in a memfd and + * just pass a file descriptor of it to the other side. + */ + int buffer_fd = open_mem_file(); + if (buffer_fd < 0) { + std::cerr << "Failed to open buffer_fd while sending log to journald: " << strerror(errno) << std::endl; + return -1; + } + + ret = writev(buffer_fd, m_msghdr.msg_iov, m_msghdr.msg_iovlen); + if (ret < 0) { + std::cerr << "Failed to write to buffer_fd while sending log to journald: " << strerror(errno) << std::endl; + goto err_close_buffer_fd; + } + + if (mem_file_mode == MemFileMode::MEMFD_CREATE) { + ret = fcntl(buffer_fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_WRITE | F_SEAL_SEAL); + if (ret) { + std::cerr << "Failed to seal buffer_fd while sending log to journald: " << strerror(errno) << std::endl; + goto err_close_buffer_fd; + } + } + + ret = sendmsg_fd(fd, buffer_fd); + if (ret < 0) { + /* Fail silently if the journal is not available */ + if (errno == ENOENT) + goto err_close_buffer_fd; + + std::cerr << "Failed to send fd while sending log to journald: " << strerror(errno) << std::endl; + goto err_close_buffer_fd; + } + close(buffer_fd); + return 0; + +err_close_buffer_fd: + close(buffer_fd); + return -1; +} + +} // namespace ceph::logging::detail + +JournaldLogger::JournaldLogger(const SubsystemMap *s) : + m_entry_encoder(make_unique()), + m_subs(s) +{ + client.m_msghdr.msg_iov = m_entry_encoder->iovec(); + client.m_msghdr.msg_iovlen = m_entry_encoder->iovec_len(); +} + +JournaldLogger::~JournaldLogger() = default; + +int JournaldLogger::log_entry(const Entry& e) +{ + m_entry_encoder->encode(e, m_subs); + return client.send(); +} + +} diff --git a/src/common/Journald.h b/src/common/Journald.h new file mode 100644 index 00000000000..388d843a03d --- /dev/null +++ b/src/common/Journald.h @@ -0,0 +1,65 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_COMMON_JOURNALD_H +#define CEPH_COMMON_JOURNALD_H + +#include +#include +#include + +namespace ceph { + +namespace logging { + +namespace detail { +class EntryEncoder; + +class JournaldClient { + public: + JournaldClient(); + ~JournaldClient(); + int send(); + struct msghdr m_msghdr; + private: + int fd; + + enum class MemFileMode; + MemFileMode mem_file_mode; + + void detect_mem_file_mode(); + int open_mem_file(); +}; +} + +class Entry; +class SubsystemMap; + +/** + * Logger to send local logs to journald + * + * local logs means @code dout(0) << ... @endcode and similars + */ +class JournaldLogger { + public: + JournaldLogger(const SubsystemMap *s); + ~JournaldLogger(); + + /** + * @returns 0 if log entry is successfully sent, -1 otherwise. + */ + int log_entry(const Entry &e); + + private: + detail::JournaldClient client; + + std::unique_ptr m_entry_encoder; + + const SubsystemMap * m_subs; +}; + + +} +} + +#endif diff --git a/src/common/ceph_context.cc b/src/common/ceph_context.cc index 25e96b0966b..93e94dd6ddd 100644 --- a/src/common/ceph_context.cc +++ b/src/common/ceph_context.cc @@ -287,6 +287,8 @@ public: "err_to_graylog", "log_graylog_host", "log_graylog_port", + "log_to_journald", + "err_to_journald", "log_coarse_timestamps", "fsid", "host", @@ -350,6 +352,18 @@ public: log->graylog()->set_destination(conf->log_graylog_host, conf->log_graylog_port); } + // journald + if (changed.count("log_to_journald") || changed.count("err_to_journald")) { + int l = conf.get_val("log_to_journald") ? 99 : (conf.get_val("err_to_journald") ? -1 : -2); + log->set_journald_level(l, l); + + if (l > -2) { + log->start_journald_logger(); + } else { + log->stop_journald_logger(); + } + } + if (changed.find("log_coarse_timestamps") != changed.end()) { log->set_coarse_timestamps(conf.get_val("log_coarse_timestamps")); } diff --git a/src/common/options.cc b/src/common/options.cc index 8673abc4cf0..d86306e9b8c 100644 --- a/src/common/options.cc +++ b/src/common/options.cc @@ -665,6 +665,16 @@ std::vector