]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg,mon,common: log when DispatchQueue throttle limit is reached
authorJos Collin <jcollin@redhat.com>
Wed, 4 Nov 2020 13:21:44 +0000 (18:51 +0530)
committerJos Collin <jcollin@redhat.com>
Mon, 25 Jan 2021 05:48:04 +0000 (11:18 +0530)
Fixes: https://tracker.ceph.com/issues/46226
Signed-off-by: Jos Collin <jcollin@redhat.com>
src/common/options.cc
src/mon/MonClient.cc
src/mon/MonClient.h
src/msg/DispatchQueue.h
src/msg/Dispatcher.h
src/msg/Messenger.cc
src/msg/Messenger.h
src/msg/async/Protocol.h
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV2.cc
src/msg/msg_types.h

index 8e15babf9e3ce20e0f81312315db8c5dbfe883bf..7f63843ae50d8435121c30889cdca37fe590204c 100644 (file)
@@ -1019,6 +1019,11 @@ std::vector<Option> get_global_options() {
     .set_default(100_M)
     .set_description("Limit messages that are read off the network but still being processed"),
 
+    Option("ms_dispatch_throttle_log_interval", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+    .set_default(30)
+    .set_min(0)
+    .set_description("Interval in seconds for high verbosity debug log message when the dispatch throttle limit are hit"),
+
     Option("ms_bind_ipv4", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
     .set_default(true)
     .set_description("Bind servers to IPv4 address(es)")
index a1c67ad44990329f42addd021362584887efc06f..1e9087aff82e6582f778b1dfcdd550a3af117a13 100644 (file)
@@ -508,6 +508,7 @@ int MonClient::init()
 
   initialized = true;
 
+  cct->_conf.add_observer(this);
   messenger->set_auth_client(this);
   messenger->add_dispatcher_head(this);
 
@@ -546,6 +547,7 @@ void MonClient::shutdown()
   if (initialized) {
     initialized = false;
   }
+  cct->_conf.remove_observer(this);
   monc_lock.lock();
   timer.shutdown();
   stopping = false;
@@ -854,6 +856,29 @@ bool MonClient::ms_handle_reset(Connection *con)
   }
 }
 
+bool MonClient::ms_handle_throttle(ms_throttle_t ttype) {
+  switch (ttype) {
+  case ms_throttle_t::MESSAGE:
+    break; // TODO
+  case ms_throttle_t::BYTES:
+    break; // TODO
+  case ms_throttle_t::DISPATCH_QUEUE:
+    {
+      //cluster log a warning that Dispatch Queue Throttle Limit hit
+      if (!log_client) {
+        return false; //cannot handle if the daemon didn't setup a log_client for me
+      }
+      LogChannelRef clog = log_client->create_channel(CLOG_CHANNEL_CLUSTER);
+      clog->warn() << "Throttler Limit has been hit. "
+                   << "Some message processing may be significantly delayed.";
+    }
+    break;
+  default:
+    return false;
+  }
+  return true;
+}
+
 bool MonClient::_opened() const
 {
   ceph_assert(ceph_mutex_is_locked(monc_lock));
@@ -1615,6 +1640,24 @@ int MonClient::handle_auth_request(
   return -EACCES;
 }
 
+const char** MonClient::get_tracked_conf_keys() const {
+  static const char* KEYS[] = {
+    "ms_dispatch_throttle_bytes",
+    "ms_dispatch_throttle_log_interval",
+    NULL
+  };
+  return KEYS;
+}
+
+void MonClient::handle_conf_change(const ConfigProxy& conf, const std::set<std::string> &changed) {
+  if (changed.count("ms_dispatch_throttle_bytes") || changed.count("ms_dispatch_throttle_log_interval")) {
+    if (messenger) {
+      messenger->dispatch_throttle_bytes = cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes");
+      messenger->dispatch_throttle_log_interval = cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval");
+    }
+  }
+}
+
 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
   std::lock_guard l(monc_lock);
   if (auth) {
index cc7805d0675e9bb852979ed18ea26de7f5dc1265..7231d1a655495da45a330cc00b9a21d8f7602d00 100644 (file)
@@ -268,8 +268,9 @@ inline boost::system::error_condition make_error_condition(monc_errc e) noexcept
 const boost::system::error_category& monc_category() noexcept;
 
 class MonClient : public Dispatcher,
-                 public AuthClient,
-                 public AuthServer /* for mgr, osd, mds */ {
+                  public AuthClient,
+                  public AuthServer, /* for mgr, osd, mds */
+                  public md_config_obs_t {
   static constexpr auto dout_subsys = ceph_subsys_monc;
 public:
   // Error, Newest, Oldest
@@ -309,6 +310,7 @@ private:
   bool ms_handle_reset(Connection *con) override;
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override { return false; }
+  bool ms_handle_throttle(ms_throttle_t ttype) override;
 
   void handle_monmap(MMonMap *m);
   void handle_config(MConfig *m);
@@ -402,6 +404,11 @@ public:
     uint32_t auth_method,
     const ceph::buffer::list& bl,
     ceph::buffer::list *reply) override;
+  // md_config_obs_t (config observer)
+  const char** get_tracked_conf_keys() const override;
+  void handle_conf_change(
+    const ConfigProxy& conf,
+    const std::set<std::string> &changed) override;
 
   void set_entity_name(EntityName name) { entity_name = name; }
   void set_handle_authentication_dispatcher(Dispatcher *d) {
index de0cb7d1a0833dde7260d0eedf89f5a339b5167c..c951df5370dfc346f9ebadc1ba6f827eb6b65ab0 100644 (file)
@@ -212,6 +212,11 @@ class DispatchQueue {
   uint64_t get_id() {
     return next_id++;
   }
+
+  Messenger* get_messenger() const {
+    return msgr;
+  }
+
   void start();
   void entry();
   void wait();
index 5e025437b53570ad78844cb38e7e6c3c6bfeab74..36141571202a27e421c76bb28266d633d9f0b0d7 100644 (file)
@@ -214,6 +214,16 @@ public:
     return 0;
   }
 
+  /**
+   * handle throttle limit hit and cluster log it.
+   *
+   * return true if handled
+   * return false if not handled
+   */
+  virtual bool ms_handle_throttle(ms_throttle_t ttype) {
+    return false;
+  }
+
   /**
    * @} //Authentication
    */
index 8064a10a0d9b8d4b4fdbc217a8886c67f4616ea1..b701d17f912427f0a01c54050c478db062b23dc3 100644 (file)
@@ -66,6 +66,8 @@ Messenger::Messenger(CephContext *cct_, entity_name_t w)
     auth_registry(cct)
 {
   auth_registry.refresh_config();
+  dispatch_throttle_bytes = cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes");
+  dispatch_throttle_log_interval = cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval");
 }
 
 void Messenger::set_endpoint_addr(const entity_addr_t& a,
index e87f3196b1c9b58e2ab8c2f3a1c93c9b12b23d94..7dcaeb75ba482dfb281b5f21dc42c7c0766b414a 100644 (file)
@@ -112,6 +112,8 @@ protected:
 public:
   AuthClient *auth_client = 0;
   AuthServer *auth_server = 0;
+  uint64_t dispatch_throttle_bytes;
+  std::chrono::seconds dispatch_throttle_log_interval;
 
 #ifdef UNIT_TESTS_BUILT
   Interceptor *interceptor = nullptr;
@@ -813,6 +815,18 @@ public:
   void set_require_authorizer(bool b) {
     require_authorizer = b;
   }
+  /**
+   * Notify each Dispatcher that the Throttle Limit has been hit. Call
+   * this function whenever the connections are getting throttled.
+   *
+   * @param ttype Throttle type
+   */
+  void ms_deliver_throttle(ms_throttle_t ttype) {
+    for (const auto &dispatcher : dispatchers) {
+      if (dispatcher->ms_handle_throttle(ttype))
+        return;
+    }
+  }
 
   /**
    * @} // Dispatcher Interfacing
index 10436307ebf8362c76932328c16ee55f61893652..d9f3db7a778cd1a05f62084b18f66dcedaf1b291 100644 (file)
@@ -106,6 +106,7 @@ protected:
   AsyncConnection *connection;
   AsyncMessenger *messenger;
   CephContext *cct;
+  ceph::mono_time throttle_prev = ceph::mono_clock::zero();
 public:
   std::shared_ptr<AuthConnectionMeta> auth_meta;
 
index 43363371bc35d05eb342a4113881ee9936e8dc07..3f2100968dd4b53e13edd726c2aacebc6cb880e2 100644 (file)
@@ -718,6 +718,10 @@ CtPtr ProtocolV1::throttle_dispatch_queue() {
   ldout(cct, 20) << __func__ << dendl;
 
   if (cur_msg_size) {
+    Messenger* msgr = connection->dispatch_queue->get_messenger();
+    //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
+    connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes);
+
     if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
             cur_msg_size)) {
       ldout(cct, 10)
@@ -726,6 +730,16 @@ CtPtr ProtocolV1::throttle_dispatch_queue() {
           << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
           << connection->dispatch_queue->dispatch_throttler.get_max()
           << " failed, just wait." << dendl;
+      ceph::mono_time throttle_now = ceph::mono_clock::now();
+      auto duration = std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev);
+      if (duration >= msgr->dispatch_throttle_log_interval) {
+        ldout(cct, 1) << __func__ << " Throttler Limit has been hit. "
+                      << "Some message processing may be significantly delayed." << dendl;
+        throttle_prev = throttle_now;
+
+        //Cluster logging that throttling is occurring.
+        msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE);
+      }
       // following thread pool deal with th full message queue isn't a
       // short time, so we can wait a ms.
       if (connection->register_time_events.empty()) {
index 855006447f7964e7ea745b78e99b621774228f37..2de83a042380c1caef2efdbe97e8eb74f652ee72 100644 (file)
@@ -1572,6 +1572,10 @@ CtPtr ProtocolV2::throttle_dispatch_queue() {
 
   const size_t cur_msg_size = get_current_msg_size();
   if (cur_msg_size) {
+    Messenger* msgr = connection->dispatch_queue->get_messenger();
+    //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
+    connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes);
+
     if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
             cur_msg_size)) {
       ldout(cct, 10)
@@ -1580,6 +1584,16 @@ CtPtr ProtocolV2::throttle_dispatch_queue() {
           << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
           << connection->dispatch_queue->dispatch_throttler.get_max()
           << " failed, just wait." << dendl;
+      ceph::mono_time throttle_now = ceph::mono_clock::now();
+      auto duration = std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev);
+      if (duration >= msgr->dispatch_throttle_log_interval) {
+        ldout(cct, 1) << __func__ << " Throttler Limit has been hit. "
+                      << "Some message processing may be significantly delayed." << dendl;
+        throttle_prev = throttle_now;
+
+        //Cluster logging that throttling is occurring.
+        msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE);
+      }
       // following thread pool deal with th full message queue isn't a
       // short time, so we can wait a ms.
       if (connection->register_time_events.empty()) {
index 76a1c1ac2a7665a465f41d2610c755a8b3993b14..acde32f6ecfe83ed9aaac69a67fb707a6721da76 100644 (file)
@@ -811,4 +811,10 @@ inline std::ostream& operator<<(std::ostream& out, const ceph_entity_inst &i)
   return out << n;
 }
 
+enum class ms_throttle_t {
+    MESSAGE,
+    BYTES,
+    DISPATCH_QUEUE
+};
+
 #endif