]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/LogMonitor: factor logging to file out of update_from_paxos
authorSage Weil <sage@newdream.net>
Tue, 22 Jun 2021 20:24:35 +0000 (16:24 -0400)
committerSage Weil <sage@newdream.net>
Fri, 2 Jul 2021 12:58:30 +0000 (08:58 -0400)
Move this into a helper.  Keep an fd open for each channel so that we
can more efficiently log individual LogEntry's.

Signed-off-by: Sage Weil <sage@newdream.net>
src/mon/LogMonitor.cc
src/mon/LogMonitor.h
src/mon/Monitor.cc

index 085c32427b84be3a41d7e4877688ee1440566d47..4773a5edfcb1a960304ec74fd1ec9c902396503d 100644 (file)
@@ -246,8 +246,6 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
     return;
   ceph_assert(version >= summary.version);
 
-  map<string,bufferlist> channel_blog;
-
   version_t latest_full = get_version_latest_full();
   dout(10) << __func__ << " latest full " << latest_full << dendl;
   if ((latest_full > 0) && (latest_full > summary.version)) {
@@ -273,69 +271,10 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
     while (!p.end()) {
       LogEntry le;
       le.decode(p);
-      dout(7) << "update_from_paxos applying incremental log " << summary.version+1 <<  " " << le << dendl;
-
-      string channel = le.channel;
-      if (channel.empty()) // keep retrocompatibility
-        channel = CLOG_CHANNEL_CLUSTER;
-
-      if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
-       cerr << channel << " " << le << std::endl;
-      }
-
-      if (channels.do_log_to_syslog(channel)) {
-        string level = channels.get_level(channel);
-        string facility = channels.get_facility(channel);
-        if (level.empty() || facility.empty()) {
-          derr << __func__ << " unable to log to syslog -- level or facility"
-               << " not defined (level: " << level << ", facility: "
-               << facility << ")" << dendl;
-          continue;
-        }
-        le.log_to_syslog(channels.get_level(channel),
-                         channels.get_facility(channel));
-      }
-
-      if (channels.do_log_to_graylog(channel)) {
-       ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
-       if (graylog) {
-         graylog->log_log_entry(&le);
-       }
-       dout(7) << "graylog: " << channel << " " << graylog
-               << " host:" << channels.log_to_graylog_host << dendl;
-      }
-
-      if (channels.do_log_to_journald(channel)) {
-        auto &journald = channels.get_journald();
-        journald.log_log_entry(le);
-        dout(7) << "journald: " << channel << dendl;
-      }
-
-      if (g_conf()->mon_cluster_log_to_file) {
-       string log_file = channels.get_log_file(channel);
-       dout(20) << __func__ << " logging for channel '" << channel
-                << "' to file '" << log_file << "'" << dendl;
-
-       if (!log_file.empty()) {
-         string log_file_level = channels.get_log_file_level(channel);
-         if (log_file_level.empty()) {
-           dout(1) << __func__ << " warning: log file level not defined for"
-                   << " channel '" << channel << "' yet a log file is --"
-                   << " will assume lowest level possible" << dendl;
-         }
-
-         int min = string_to_syslog_level(log_file_level);
-         int l = clog_type_to_syslog_level(le.prio);
-         if (l <= min) {
-           stringstream ss;
-           ss << le << "\n";
-           // init entry if DNE
-           bufferlist &blog = channel_blog[channel];
-           blog.append(ss.str());
-         }
-       }
-      }
+      dout(7) << "update_from_paxos applying incremental log "
+             << summary.version+1 <<  " " << le << dendl;
 
+      log_external(le);
       summary.add(le);
     }
 
@@ -343,36 +282,95 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
     summary.prune(g_conf()->mon_log_max_summary);
   }
 
-  dout(15) << __func__ << " logging for "
-           << channel_blog.size() << " channels" << dendl;
-  for(map<string,bufferlist>::iterator p = channel_blog.begin();
-      p != channel_blog.end(); ++p) {
-    if (!p->second.length()) {
-      dout(15) << __func__ << " channel '" << p->first
-               << "': nothing to log" << dendl;
-      continue;
-    }
+  check_subs();
+}
 
-    dout(15) << __func__ << " channel '" << p->first
-             << "' logging " << p->second.length() << " bytes" << dendl;
-    string log_file = channels.get_log_file(p->first);
+void LogMonitor::log_external(const LogEntry& le)
+{
+  string channel = le.channel;
+  if (channel.empty()) { // keep retrocompatibility
+    channel = CLOG_CHANNEL_CLUSTER;
+  }
+
+  if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
+    cerr << channel << " " << le << std::endl;
+  }
 
-    int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT|O_CLOEXEC, 0600);
-    if (fd < 0) {
-      int err = -errno;
-      dout(1) << "unable to write to '" << log_file << "' for channel '"
-              << p->first << "': " << cpp_strerror(err) << dendl;
+  if (channels.do_log_to_syslog(channel)) {
+    string level = channels.get_level(channel);
+    string facility = channels.get_facility(channel);
+    if (level.empty() || facility.empty()) {
+      derr << __func__ << " unable to log to syslog -- level or facility"
+          << " not defined (level: " << level << ", facility: "
+          << facility << ")" << dendl;
     } else {
-      int err = p->second.write_fd(fd);
+      le.log_to_syslog(channels.get_level(channel),
+                      channels.get_facility(channel));
+    }
+  }
+
+  if (channels.do_log_to_graylog(channel)) {
+    ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
+    if (graylog) {
+      graylog->log_log_entry(&le);
+    }
+    dout(7) << "graylog: " << channel << " " << graylog
+           << " host:" << channels.log_to_graylog_host << dendl;
+  }
+
+  if (channels.do_log_to_journald(channel)) {
+    auto &journald = channels.get_journald();
+    journald.log_log_entry(le);
+    dout(7) << "journald: " << channel << dendl;
+  }
+
+  if (g_conf()->mon_cluster_log_to_file) {
+    auto p = channel_fds.find(channel);
+    int fd;
+    if (p == channel_fds.end()) {
+      string log_file = channels.get_log_file(channel);
+      dout(20) << __func__ << " logging for channel '" << channel
+              << "' to file '" << log_file << "'" << dendl;
+      if (log_file.empty()) {
+       // do not log this channel
+       fd = -1;
+      } else {
+       fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT|O_CLOEXEC, 0600);
+       if (fd < 0) {
+         int err = -errno;
+         dout(1) << "unable to write to '" << log_file << "' for channel '"
+                 << p->first << "': " << cpp_strerror(err) << dendl;
+       }
+       channel_fds[channel] = fd;
+      }
+    } else {
+      fd = p->second;
+    }
+
+    if (fd >= 0) {
+      fmt::format_to(file_log_buffer, "{}\n", le);
+      int err = safe_write(fd, file_log_buffer.data(), file_log_buffer.size());
+      file_log_buffer.clear();
       if (err < 0) {
-       dout(1) << "error writing to '" << log_file << "' for channel '"
-                << p->first << ": " << cpp_strerror(err) << dendl;
+       dout(1) << "error writing to '" << channels.get_log_file(channel)
+               << "' for channel '" << channel
+               << ": " << cpp_strerror(err) << dendl;
+       ::close(fd);
+       channel_fds.erase(channel);
       }
-      VOID_TEMP_FAILURE_RETRY(::close(fd));
     }
   }
+}
 
-  check_subs();
+void LogMonitor::log_external_close_fds()
+{
+  for (auto& [channel, fd] : channel_fds) {
+    if (fd >= 0) {
+      dout(10) << __func__ << " closing " << channel << " (" << fd << ")" << dendl;
+      ::close(fd);
+    }
+  }
+  channel_fds.clear();
 }
 
 void LogMonitor::create_pending()
@@ -972,6 +970,7 @@ void LogMonitor::update_log_channels()
   }
 
   channels.expand_channel_meta();
+  log_external_close_fds();
 }
 
 
@@ -986,7 +985,8 @@ void LogMonitor::handle_conf_change(const ConfigProxy& conf,
       changed.count("mon_cluster_log_to_graylog") ||
       changed.count("mon_cluster_log_to_graylog_host") ||
       changed.count("mon_cluster_log_to_graylog_port") ||
-      changed.count("mon_cluster_log_to_journald")) {
+      changed.count("mon_cluster_log_to_journald") ||
+      changed.count("mon_cluster_log_to_file")) {
     update_log_channels();
   }
 }
index 035168848da87cdcf386f9c1e05cf1ed3b1308eb..d60b57b4205b47538255b554f022f4fadec2e4d1 100644 (file)
@@ -18,6 +18,9 @@
 #include <map>
 #include <set>
 
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
 #include "include/types.h"
 #include "PaxosService.h"
 
@@ -42,6 +45,10 @@ private:
   std::multimap<utime_t,LogEntry> pending_log;
   LogSummary pending_summary, summary;
 
+  std::map<std::string, int> channel_fds;
+
+  fmt::memory_buffer file_log_buffer;
+
   struct log_channel_info {
 
     std::map<std::string,std::string> log_to_syslog;
@@ -159,6 +166,9 @@ private:
   void check_subs();
   void check_sub(Subscription *s);
 
+  void log_external_close_fds();
+  void log_external(const LogEntry& le);
+
   /**
    * translate log sub name ('log-info') to integer id
    *
index 645ccdbed949e5cdce257329d0ead45a1fb8b8a2..36647053c9775ef16e4b315334edc24108c8be36 100644 (file)
@@ -611,6 +611,7 @@ const char** Monitor::get_tracked_conf_keys() const
     "clog_to_graylog",
     "clog_to_graylog_host",
     "clog_to_graylog_port",
+    "mon_cluster_log_to_file",
     "host",
     "fsid",
     // periodic health to clog