]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimosn/osd: Implementing cluster logging 43344/head
authorMatan Breizman <Matan.Brz@gmail.com>
Sat, 25 Sep 2021 13:17:37 +0000 (13:17 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Mon, 15 Nov 2021 16:29:15 +0000 (16:29 +0000)
Signed-off-by: Matan Breizman <mbreizma@redhat.com>
12 files changed:
src/common/LogClient.h
src/common/Throttle.h
src/crimson/CMakeLists.txt
src/crimson/common/logclient.cc [new file with mode: 0644]
src/crimson/common/logclient.h [new file with mode: 0644]
src/crimson/common/perf_counters_collection.cc
src/crimson/common/perf_counters_collection.h
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/msg/compressor_registry.h

index c20fde4f157d67b6d34681742e099680d68940b3..2e650f1ec0a77f01f445183ab061daca9d2ffb3a 100644 (file)
@@ -194,7 +194,8 @@ public:
   };
 
   LogClient(CephContext *cct, Messenger *m, MonMap *mm,
-           enum logclient_flag_t flags);
+          logclient_flag_t flags);
+
   virtual ~LogClient() {
     channels.clear();
   }
index eeaf8cb98a37368fc2976c5d8fd01aa3aa0ee812..e190b946c458a8c89619a3a5e84d21c8893895a3 100644 (file)
 #include "common/ThrottleInterface.h"
 #include "common/Timer.h"
 #include "common/convenience.h"
+#if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
+#include "crimson/common/perf_counters_collection.h"
+#else
 #include "common/perf_counters_collection.h"
+#endif
 
 /**
  * @class Throttle
index 8104227c7e1aa4a2b9cda8e207f71d263901547f..aa9c1b533027997eead0a8f8d88a8171e17104cd 100644 (file)
@@ -19,6 +19,7 @@ set(crimson_common_srcs
   common/formatter.cc
   common/perf_counters_collection.cc
   common/log.cc
+  common/logclient.cc
   common/operation.cc
   common/throttle.cc
   common/tri_mutex.cc)
diff --git a/src/crimson/common/logclient.cc b/src/crimson/common/logclient.cc
new file mode 100644 (file)
index 0000000..0371f51
--- /dev/null
@@ -0,0 +1,363 @@
+#include "crimson/common/logclient.h"
+#include "include/str_map.h"
+#include "messages/MLog.h"
+#include "messages/MLogAck.h"
+#include "messages/MMonGetVersion.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/mon/MonClient.h"
+#include "mon/MonMap.h"
+#include "common/Graylog.h"
+
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::string;
+using crimson::common::local_conf;
+
+namespace {
+  seastar::logger& logger()
+  {
+    return crimson::get_logger(ceph_subsys_monc);
+  }
+}
+
+//TODO: in order to avoid unnecessary maps declarations and moving around,
+//     create a named structure containing the maps and return optional
+//     fit to it.
+int parse_log_client_options(CephContext *cct,
+                            map<string,string> &log_to_monitors,
+                            map<string,string> &log_to_syslog,
+                            map<string,string> &log_channels,
+                            map<string,string> &log_prios,
+                            map<string,string> &log_to_graylog,
+                            map<string,string> &log_to_graylog_host,
+                            map<string,string> &log_to_graylog_port,
+                            uuid_d &fsid,
+                            string &host)
+{
+  ostringstream oss;
+
+  int r = get_conf_str_map_helper(
+    cct->_conf.get_val<string>("clog_to_monitors"), oss,
+    &log_to_monitors, CLOG_CONFIG_DEFAULT_KEY);
+  if (r < 0) {
+    logger().error("{} error parsing 'clog_to_monitors'", __func__);
+    return r;
+  }
+
+  r = get_conf_str_map_helper(
+    cct->_conf.get_val<string>("clog_to_syslog"), oss,
+                              &log_to_syslog, CLOG_CONFIG_DEFAULT_KEY);
+  if (r < 0) {
+    logger().error("{} error parsing 'clog_to_syslog'", __func__);
+    return r;
+  }
+
+  r = get_conf_str_map_helper(
+    cct->_conf.get_val<string>("clog_to_syslog_facility"), oss,
+    &log_channels, CLOG_CONFIG_DEFAULT_KEY);
+  if (r < 0) {
+    logger().error("{} error parsing 'clog_to_syslog_facility'", __func__);
+    return r;
+  }
+
+  r = get_conf_str_map_helper(
+    cct->_conf.get_val<string>("clog_to_syslog_level"), oss,
+    &log_prios, CLOG_CONFIG_DEFAULT_KEY);
+  if (r < 0) {
+    logger().error("{} error parsing 'clog_to_syslog_level'", __func__);
+    return r;
+  }
+
+  r = get_conf_str_map_helper(
+    cct->_conf.get_val<string>("clog_to_graylog"), oss,
+    &log_to_graylog, CLOG_CONFIG_DEFAULT_KEY);
+  if (r < 0) {
+    logger().error("{} error parsing 'clog_to_graylog'", __func__);
+    return r;
+  }
+
+  r = get_conf_str_map_helper(
+    cct->_conf.get_val<string>("clog_to_graylog_host"), oss,
+    &log_to_graylog_host, CLOG_CONFIG_DEFAULT_KEY);
+  if (r < 0) {
+    logger().error("{} error parsing 'clog_to_graylog_host'", __func__);
+    return r;
+  }
+
+  r = get_conf_str_map_helper(
+    cct->_conf.get_val<string>("clog_to_graylog_port"), oss,
+    &log_to_graylog_port, CLOG_CONFIG_DEFAULT_KEY);
+  if (r < 0) {
+    logger().error("{} error parsing 'clog_to_graylog_port'", __func__);
+    return r;
+  }
+
+  fsid = cct->_conf.get_val<uuid_d>("fsid");
+  host = cct->_conf->host;
+  return 0;
+}
+
+LogChannel::LogChannel(LogClient *lc, const string &channel)
+  : parent(lc), log_channel(channel), log_to_syslog(false),
+    log_to_monitors(false)
+{
+}
+
+LogChannel::LogChannel(LogClient *lc, const string &channel,
+                       const string &facility, const string &prio)
+  : parent(lc), log_channel(channel), log_prio(prio),
+    syslog_facility(facility), log_to_syslog(false),
+    log_to_monitors(false)
+{
+}
+
+LogClient::LogClient(crimson::net::Messenger *m,
+                    logclient_flag_t flags)
+  : messenger(m), is_mon(flags & FLAG_MON),
+    last_log_sent(0), last_log(0)
+{
+}
+
+void LogChannel::set_log_to_monitors(bool v)
+{
+  if (log_to_monitors != v) {
+    parent->reset();
+    log_to_monitors = v;
+  }
+}
+
+void LogChannel::update_config(map<string,string> &log_to_monitors,
+                              map<string,string> &log_to_syslog,
+                              map<string,string> &log_channels,
+                              map<string,string> &log_prios,
+                              map<string,string> &log_to_graylog,
+                              map<string,string> &log_to_graylog_host,
+                              map<string,string> &log_to_graylog_port,
+                              uuid_d &fsid,
+                              string &host)
+{
+  logger().debug(
+    "{} log_to_monitors {} log_to_syslog {} log_channels {} log_prios {}",
+    __func__, log_to_monitors, log_to_syslog, log_channels, log_prios);
+  bool to_monitors = (get_str_map_key(log_to_monitors, log_channel,
+                                      &CLOG_CONFIG_DEFAULT_KEY) == "true");
+  bool to_syslog = (get_str_map_key(log_to_syslog, log_channel,
+                                    &CLOG_CONFIG_DEFAULT_KEY) == "true");
+  string syslog_facility = get_str_map_key(log_channels, log_channel,
+                                          &CLOG_CONFIG_DEFAULT_KEY);
+  string prio = get_str_map_key(log_prios, log_channel,
+                               &CLOG_CONFIG_DEFAULT_KEY);
+  bool to_graylog = (get_str_map_key(log_to_graylog, log_channel,
+                                    &CLOG_CONFIG_DEFAULT_KEY) == "true");
+  string graylog_host = get_str_map_key(log_to_graylog_host, log_channel,
+                                      &CLOG_CONFIG_DEFAULT_KEY);
+  string graylog_port_str = get_str_map_key(log_to_graylog_port, log_channel,
+                                           &CLOG_CONFIG_DEFAULT_KEY);
+  int graylog_port = atoi(graylog_port_str.c_str());
+
+  set_log_to_monitors(to_monitors);
+  set_log_to_syslog(to_syslog);
+  set_syslog_facility(syslog_facility);
+  set_log_prio(prio);
+
+  if (to_graylog && !graylog) { /* should but isn't */
+    graylog = seastar::make_shared<ceph::logging::Graylog>("clog");
+  } else if (!to_graylog && graylog) { /* shouldn't but is */
+    graylog = nullptr;
+  }
+
+  if (to_graylog && graylog) {
+    graylog->set_fsid(fsid);
+    graylog->set_hostname(host);
+  }
+
+  if (graylog && (!graylog_host.empty()) && (graylog_port != 0)) {
+    graylog->set_destination(graylog_host, graylog_port);
+  }
+
+  logger().debug("{} to_monitors: {} to_syslog: {}"
+         "syslog_facility: {} prio: {} to_graylog: {} graylog_host: {}"
+         "graylog_port: {}", __func__, (to_monitors ? "true" : "false"),
+         (to_syslog ? "true" : "false"), syslog_facility, prio,
+         (to_graylog ? "true" : "false"), graylog_host, graylog_port);
+}
+
+void LogChannel::do_log(clog_type prio, std::stringstream& ss)
+{
+  while (!ss.eof()) {
+    string s;
+    getline(ss, s);
+    if (!s.empty()) {
+      do_log(prio, s);
+    }
+  }
+}
+
+void LogChannel::do_log(clog_type prio, const std::string& s)
+{
+  if (CLOG_ERROR == prio) {
+    logger().error("log {} : {}", prio, s);
+  } else {
+    logger().warn("log {} : {}", prio, s);
+  }
+  LogEntry e;
+  e.stamp = ceph_clock_now();
+  e.addrs = parent->get_myaddrs();
+  e.name = parent->get_myname();
+  e.rank = parent->get_myrank();
+  e.prio = prio;
+  e.msg = s;
+  e.channel = get_log_channel();
+
+  // seq and who should be set for syslog/graylog/log_to_mon
+  // log to monitor?
+  if (log_to_monitors) {
+    e.seq = parent->queue(e);
+  } else {
+    e.seq = parent->get_next_seq();
+  }
+
+  // log to syslog?
+  if (do_log_to_syslog()) {
+    logger().warn("{} log to syslog", __func__);
+    e.log_to_syslog(get_log_prio(), get_syslog_facility());
+  }
+
+  // log to graylog?
+  if (do_log_to_graylog()) {
+    logger().warn("{} log to graylog", __func__);
+    graylog->log_log_entry(&e);
+  }
+}
+
+MessageURef LogClient::get_mon_log_message(log_flushing_t flush_flag)
+{
+  if (flush_flag == log_flushing_t::FLUSH) {
+    if (log_queue.empty()) {
+      return {};
+    }
+    // reset session
+    last_log_sent = log_queue.front().seq;
+  }
+  return _get_mon_log_message();
+}
+
+bool LogClient::are_pending() const
+{
+  return last_log > last_log_sent;
+}
+
+MessageURef LogClient::_get_mon_log_message()
+{
+  if (log_queue.empty()) {
+    return {};
+  }
+
+  // only send entries that haven't been sent yet during this mon
+  // session!  monclient needs to call reset_session() on mon session
+  // reset for this to work right.
+
+  if (last_log_sent == last_log) {
+    return {};
+  }
+
+  // limit entries per message
+  const int64_t num_unsent = last_log - last_log_sent;
+  int64_t num_to_send;
+  if (local_conf()->mon_client_max_log_entries_per_message > 0) {
+    num_to_send = std::min(num_unsent,
+                local_conf()->mon_client_max_log_entries_per_message);
+  } else {
+    num_to_send = num_unsent;
+  }
+
+  logger().debug("log_queue is {} last_log {} sent {} num {} unsent {}"
+               " sending {}", log_queue.size(), last_log,
+               last_log_sent, log_queue.size(), num_unsent, num_to_send);
+  ceph_assert((unsigned)num_unsent <= log_queue.size());
+  auto log_iter = log_queue.begin();
+  std::deque<LogEntry> out_log_queue; /* will send the logs contained here */
+  while (log_iter->seq <= last_log_sent) {
+    ++log_iter;
+    ceph_assert(log_iter != log_queue.end());
+  }
+  while (num_to_send--) {
+    ceph_assert(log_iter != log_queue.end());
+    out_log_queue.push_back(*log_iter);
+    last_log_sent = log_iter->seq;
+    logger().debug(" will send {}", *log_iter);
+    ++log_iter;
+  }
+  
+  return crimson::make_message<MLog>(m_fsid,
+                                 std::move(out_log_queue));
+}
+
+version_t LogClient::queue(LogEntry &entry)
+{
+  entry.seq = ++last_log;
+  log_queue.push_back(entry);
+
+  return entry.seq;
+}
+
+void LogClient::reset()
+{
+  if (log_queue.size()) {
+    log_queue.clear();
+  }
+  last_log_sent = last_log;
+}
+
+uint64_t LogClient::get_next_seq()
+{
+  return ++last_log;
+}
+
+entity_addrvec_t LogClient::get_myaddrs() const
+{
+  return messenger->get_myaddrs();
+}
+
+entity_name_t LogClient::get_myrank()
+{
+  return messenger->get_myname();
+}
+
+const EntityName& LogClient::get_myname() const
+{
+  return local_conf()->name;
+}
+
+seastar::future<> LogClient::handle_log_ack(Ref<MLogAck> m)
+{
+  logger().debug("handle_log_ack {}", *m);
+
+  version_t last = m->last;
+
+  auto q = log_queue.begin();
+  while (q != log_queue.end()) {
+    const LogEntry &entry(*q);
+    if (entry.seq > last)
+      break;
+    logger().debug(" logged {}", entry);
+    q = log_queue.erase(q);
+  }
+  return seastar::now();
+}
+
+LogChannelRef LogClient::create_channel(const std::string& name) {
+  auto it = channels.find(name);
+  if (it == channels.end()) {
+    it = channels.insert(it,
+           {name, seastar::make_lw_shared<LogChannel>(this, name)});
+  }
+  return it->second;
+}
+
+seastar::future<> LogClient::set_fsid(const uuid_d& fsid) {
+  m_fsid = fsid;
+  return seastar::now();
+}
+
diff --git a/src/crimson/common/logclient.h b/src/crimson/common/logclient.h
new file mode 100644 (file)
index 0000000..ccf24c2
--- /dev/null
@@ -0,0 +1,232 @@
+#ifndef CEPH_LOGCLIENT_H
+#define CEPH_LOGCLIENT_H
+
+#include "common/LogEntry.h"
+#include "common/ostream_temp.h"
+#include "common/ref.h"
+#include "include/health.h"
+#include "crimson/net/Fwd.h"
+
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/timer.hh>
+
+class LogClient;
+class MLog;
+class MLogAck;
+class Message;
+struct uuid_d;
+struct Connection;
+
+class LogChannel;
+
+namespace ceph {
+namespace logging {
+  class Graylog;
+}
+}
+
+template<typename Message> using Ref = boost::intrusive_ptr<Message>;
+namespace crimson::net {
+  class Messenger;
+}
+
+enum class log_flushing_t {
+  NO_FLUSH,
+  FLUSH
+};
+
+int parse_log_client_options(CephContext *cct,
+                            std::map<std::string,std::string> &log_to_monitors,
+                            std::map<std::string,std::string> &log_to_syslog,
+                            std::map<std::string,std::string> &log_channels,
+                            std::map<std::string,std::string> &log_prios,
+                            std::map<std::string,std::string> &log_to_graylog,
+                            std::map<std::string,std::string> &log_to_graylog_host,
+                            std::map<std::string,std::string> &log_to_graylog_port,
+                            uuid_d &fsid,
+                            std::string &host);
+
+/** Manage where we output to and at which priority
+ *
+ * Not to be confused with the LogClient, which is the almighty coordinator
+ * of channels.  We just deal with the boring part of the logging: send to
+ * syslog, send to file, generate LogEntry and queue it for the LogClient.
+ *
+ * Past queueing the LogEntry, the LogChannel is done with the whole thing.
+ * LogClient will deal with sending and handling of LogEntries.
+ */
+class LogChannel : public OstreamTemp::OstreamTempSink
+{
+public:
+  LogChannel(LogClient *lc, const std::string &channel);
+  LogChannel(LogClient *lc, const std::string &channel,
+             const std::string &facility, const std::string &prio);
+
+  OstreamTemp debug() {
+    return OstreamTemp(CLOG_DEBUG, this);
+  }
+  void debug(std::stringstream &s) {
+    do_log(CLOG_DEBUG, s);
+  }
+  /**
+   * Convenience function mapping health status to
+   * the appropriate cluster log severity.
+   */
+  OstreamTemp health(health_status_t health) {
+    switch(health) {
+      case HEALTH_OK:
+        return info();
+      case HEALTH_WARN:
+        return warn();
+      case HEALTH_ERR:
+        return error();
+      default:
+        // Invalid health_status_t value
+        ceph_abort();
+    }
+  }
+  OstreamTemp info() {
+    return OstreamTemp(CLOG_INFO, this);
+  }
+  void info(std::stringstream &s) {
+    do_log(CLOG_INFO, s);
+  }
+  OstreamTemp warn() {
+    return OstreamTemp(CLOG_WARN, this);
+  }
+  void warn(std::stringstream &s) {
+    do_log(CLOG_WARN, s);
+  }
+  OstreamTemp error() {
+    return OstreamTemp(CLOG_ERROR, this);
+  }
+  void error(std::stringstream &s) {
+    do_log(CLOG_ERROR, s);
+  }
+  OstreamTemp sec() {
+    return OstreamTemp(CLOG_SEC, this);
+  }
+  void sec(std::stringstream &s) {
+    do_log(CLOG_SEC, s);
+  }
+
+  void set_log_to_monitors(bool v);
+  void set_log_to_syslog(bool v) {
+    log_to_syslog = v;
+  }
+  void set_log_channel(const std::string& v) {
+    log_channel = v;
+  }
+  void set_log_prio(const std::string& v) {
+    log_prio = v;
+  }
+  void set_syslog_facility(const std::string& v) {
+    syslog_facility = v;
+  }
+  const std::string& get_log_prio() const { return log_prio; }
+  const std::string& get_log_channel() const { return log_channel; }
+  const std::string& get_syslog_facility() const { return syslog_facility; }
+  bool must_log_to_syslog() const { return log_to_syslog; }
+  /**
+   * Do we want to log to syslog?
+   *
+   * @return true if log_to_syslog is true and both channel and prio
+   *         are not empty; false otherwise.
+   */
+  bool do_log_to_syslog() {
+    return must_log_to_syslog() &&
+          !log_prio.empty() && !log_channel.empty();
+  }
+  bool must_log_to_monitors() { return log_to_monitors; }
+
+  bool do_log_to_graylog() {
+    return (graylog != nullptr);
+  }
+
+  using Ref = seastar::lw_shared_ptr<LogChannel>;
+
+  /**
+   * update config values from parsed k/v std::map for each config option
+   *
+   * Pick out the relevant value based on our channel.
+   */
+  void update_config(std::map<std::string,std::string> &log_to_monitors,
+                    std::map<std::string,std::string> &log_to_syslog,
+                    std::map<std::string,std::string> &log_channels,
+                    std::map<std::string,std::string> &log_prios,
+                    std::map<std::string,std::string> &log_to_graylog,
+                    std::map<std::string,std::string> &log_to_graylog_host,
+                    std::map<std::string,std::string> &log_to_graylog_port,
+                    uuid_d &fsid,
+                    std::string &host);
+
+  void do_log(clog_type prio, std::stringstream& ss);
+  void do_log(clog_type prio, const std::string& s);
+
+private:
+  LogClient *parent;
+  std::string log_channel;
+  std::string log_prio;
+  std::string syslog_facility;
+  bool log_to_syslog;
+  bool log_to_monitors;
+  seastar::shared_ptr<ceph::logging::Graylog> graylog;
+};
+
+using LogChannelRef = LogChannel::Ref;
+
+class LogClient
+{
+public:
+  enum logclient_flag_t {
+    NO_FLAGS = 0,
+    FLAG_MON = 0x1,
+  };
+
+  LogClient(crimson::net::Messenger *m, logclient_flag_t flags);
+
+  virtual ~LogClient() = default;
+
+  seastar::future<> handle_log_ack(Ref<MLogAck> m);
+  MessageURef get_mon_log_message(log_flushing_t flush_flag);
+  bool are_pending() const;
+
+  LogChannelRef create_channel() {
+    return create_channel(CLOG_CHANNEL_DEFAULT);
+  }
+
+  LogChannelRef create_channel(const std::string& name);
+
+  void destroy_channel(const std::string& name) {
+    channels.erase(name);
+  }
+
+  void shutdown() {
+    channels.clear();
+  }
+
+  uint64_t get_next_seq();
+  entity_addrvec_t get_myaddrs() const;
+  const EntityName& get_myname() const;
+  entity_name_t get_myrank();
+  version_t queue(LogEntry &entry);
+  void reset();
+  seastar::future<> set_fsid(const uuid_d& fsid);
+
+private:
+  MessageURef _get_mon_log_message();
+
+  crimson::net::Messenger *messenger;
+  bool is_mon;
+  version_t last_log_sent;
+  version_t last_log;
+  std::deque<LogEntry> log_queue;
+
+  std::map<std::string, LogChannelRef> channels;
+  uuid_d m_fsid;
+};
+#endif
+
index 2eba271dd9f75415c58bd24083b09a2125baa0b4..08501395e7e366cec92eb34c6e61d35c2d54d7ea 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include "common/ceph_context.h"
 #include "perf_counters_collection.h"
 
 namespace crimson::common {
@@ -27,6 +28,13 @@ void PerfCountersCollection::dump_formatted(ceph::Formatter *f, bool schema,
 
 PerfCountersCollection::ShardedPerfCountersCollection PerfCountersCollection::sharded_perf_coll;
 
+void PerfCountersDeleter::operator()(PerfCounters* p) noexcept
+{
+  if (cct) {
+    cct->get_perfcounters_collection()->remove(p);
+  }
+  delete p;
 }
 
+}
 
index 9ec5c3eb3742476d129cfca81adec6a9e2730a1a..fac33bce7ee9f4de5a5065c76c47d9f04fe391a5 100644 (file)
@@ -4,6 +4,7 @@
 #pragma once
 
 #include "common/perf_counters.h"
+#include "include/common_fwd.h"
 #include <seastar/core/sharded.hh>
 
 using crimson::common::PerfCountersCollectionImpl;
@@ -35,5 +36,14 @@ inline PerfCountersCollection& local_perf_coll() {
   return PerfCountersCollection::sharded_perf_coll.local();
 }
 
+class PerfCountersDeleter {
+  CephContext* cct;
+
+public:
+  PerfCountersDeleter() noexcept : cct(nullptr) {}
+  PerfCountersDeleter(CephContext* cct) noexcept : cct(cct) {}
+  void operator()(PerfCounters* p) noexcept;
+};
 }
+using PerfCountersRef = std::unique_ptr<crimson::common::PerfCounters, crimson::common::PerfCountersDeleter>;
 
index 0e8e73a05810259b64e9838446180c32ac5eaf9d..928b36a375d321ada31fcba3aacf5d578b5a2b13 100644 (file)
@@ -18,6 +18,7 @@
 #include "crimson/auth/KeyRing.h"
 #include "crimson/common/config_proxy.h"
 #include "crimson/common/log.h"
+#include "crimson/common/logclient.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Errors.h"
 #include "crimson/net/Messenger.h"
@@ -415,6 +416,7 @@ Client::Client(crimson::net::Messenger& messenger,
               CEPH_ENTITY_TYPE_MGR},
     timer{[this] { tick(); }},
     msgr{messenger},
+    log_client{nullptr},
     auth_registry{&cct},
     auth_handler{auth_handler}
 {}
@@ -457,7 +459,8 @@ void Client::tick()
 {
   gate.dispatch_in_background(__func__, *this, [this] {
     if (active_con) {
-      return seastar::when_all_succeed(active_con->get_conn()->keepalive(),
+      return seastar::when_all_succeed(wait_for_send_log(),
+                                       active_con->get_conn()->keepalive(),
                                        active_con->renew_tickets(),
                                        active_con->renew_rotating_keyring()).then_unpack([] {});
     } else {
@@ -468,6 +471,25 @@ void Client::tick()
   });
 }
 
+seastar::future<> Client::wait_for_send_log() {
+  utime_t now = ceph_clock_now();
+  if (now > last_send_log + cct._conf->mon_client_log_interval) {
+    last_send_log = now;
+    return send_log(log_flushing_t::NO_FLUSH);
+  }
+  return seastar::now();
+}
+
+seastar::future<> Client::send_log(log_flushing_t flush_flag) {
+  if (log_client) {
+    if (auto lm = log_client->get_mon_log_message(flush_flag); lm) {
+      return send_message(std::move(lm));
+    }
+    more_log_pending = log_client->are_pending();
+  }
+  return seastar::now();
+}
+
 bool Client::is_hunting() const {
   return !active_con;
 }
@@ -857,7 +879,15 @@ seastar::future<> Client::handle_mon_command_ack(Ref<MMonCommandAck> m)
 
 seastar::future<> Client::handle_log_ack(Ref<MLogAck> m)
 {
-  // XXX
+  if (log_client) {
+    return log_client->handle_log_ack(m).then([this] {
+      if (more_log_pending) {
+        return send_log(log_flushing_t::NO_FLUSH);
+      } else {
+        return seastar::now();
+      }
+    });
+  }
   return seastar::now();
 }
 
index 7ad14696c9085acbfe454f248053c92942a672b8..b2198f3470d464d5a0334a1457429d231254d5c7 100644 (file)
@@ -32,6 +32,8 @@ namespace crimson::net {
   class Messenger;
 }
 
+class LogClient;
+
 struct AuthAuthorizeHandler;
 class MAuthReply;
 struct MMonMap;
@@ -42,6 +44,8 @@ struct MMonCommandAck;
 struct MLogAck;
 struct MConfig;
 
+enum class log_flushing_t;
+
 namespace crimson::mon {
 
 class Connection;
@@ -62,6 +66,13 @@ class Client : public crimson::net::Dispatcher,
 
   crimson::net::Messenger& msgr;
 
+  LogClient *log_client;
+  bool more_log_pending = false;
+  utime_t last_send_log;
+
+  seastar::future<> send_log(log_flushing_t flush_flag);
+  seastar::future<> wait_for_send_log();
+
   // commands
   using get_version_t = seastar::future<std::tuple<version_t, version_t>>;
 
@@ -87,6 +98,10 @@ public:
   seastar::future<> start();
   seastar::future<> stop();
 
+  void set_log_client(LogClient *clog) {
+    log_client = clog;
+  }
+
   const uuid_d& get_fsid() const {
     return monmap.fsid;
   }
index f2b4cde54bcc291daf3d8473813bdfa3121f6396..34532b882d733fd31d4fbff63002b28f2173860e 100644 (file)
@@ -100,7 +100,9 @@ OSD::OSD(int id, uint32_t nonce,
       update_stats();
     }},
     asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
-    osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
+    osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services))),
+    log_client(cluster_msgr.get(), LogClient::NO_FLAGS),
+    clog(log_client.create_channel())
 {
   osdmaps[0] = boost::make_local_shared<OSDMap>();
   for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
@@ -117,6 +119,8 @@ OSD::OSD(int id, uint32_t nonce,
     }
   }
   logger().info("{}: nonce is {}", __func__, nonce);
+  monc->set_log_client(&log_client);
+  clog->set_log_to_monitors(true);
 }
 
 OSD::~OSD() = default;
@@ -377,6 +381,8 @@ seastar::future<> OSD::start()
     // create the admin-socket server, and the objects that register
     // to handle incoming commands
     return start_asok_admin();
+  }).then([this] {
+    return log_client.set_fsid(monc->get_fsid());
   }).then([this] {
     return start_boot();
   });
index f51c89c2be6150a0cd19b10775822d70413638bf..a48ee32b9f3b6f3d894c0e39280b27aa8bc5e4bd 100644 (file)
@@ -10,6 +10,7 @@
 #include <seastar/core/shared_future.hh>
 #include <seastar/core/timer.hh>
 
+#include "crimson/common/logclient.h"
 #include "crimson/common/type_helpers.h"
 #include "crimson/common/auth_handler.h"
 #include "crimson/common/gated.h"
@@ -244,6 +245,10 @@ public:
     spg_t pgid);
   Ref<PG> get_pg(spg_t pgid);
   seastar::future<> send_beacon();
+
+private:
+  LogClient log_client;
+  LogChannelRef clog;
 };
 
 inline std::ostream& operator<<(std::ostream& out, const OSD& osd) {
index bda08ca437b4061128ec4a1a3335b8dd4c8ca52f..d6b3f6867d8e01ee81c100cb656240595bb9e4d3 100644 (file)
@@ -25,10 +25,10 @@ public:
   void handle_conf_change(const ConfigProxy& conf,
                           const std::set<std::string>& changed) override;
 
-  Compressor::CompressionAlgorithm pick_method(uint32_t peer_type,
+  TOPNSPC::Compressor::CompressionAlgorithm pick_method(uint32_t peer_type,
                                               const std::vector<uint32_t>& preferred_methods);
 
-  Compressor::CompressionMode get_mode(uint32_t peer_type, bool is_secure);
+  TOPNSPC::Compressor::CompressionMode get_mode(uint32_t peer_type, bool is_secure);
 
   const std::vector<uint32_t> get_methods(uint32_t peer_type) { 
     std::scoped_lock l(lock);