]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg,mon,mgr,osd,common: log when DispatchQueue throttle limit is reached
authorJos Collin <jcollin@redhat.com>
Wed, 4 Nov 2020 13:21:44 +0000 (18:51 +0530)
committerJos Collin <jcollin@redhat.com>
Wed, 17 Dec 2025 10:09:00 +0000 (15:39 +0530)
* Log when DispatchQueue throttle limit is reached.
* Display a HEALTH_WARN in the ceph status.

Fixes: https://tracker.ceph.com/issues/46226
Signed-off-by: Jos Collin <jcollin@redhat.com>
17 files changed:
src/common/Throttle.cc
src/common/Throttle.h
src/common/options/global.yaml.in
src/mgr/DaemonHealthMetric.h
src/mgr/DaemonHealthMetricCollector.cc
src/mon/MonClient.cc
src/mon/MonClient.h
src/msg/DispatchQueue.h
src/msg/Dispatcher.h
src/msg/Messenger.cc
src/msg/Messenger.h
src/msg/async/Protocol.h
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV2.cc
src/msg/msg_types.h
src/osd/OSD.cc
src/osd/OSD.h

index 17f5d4c5e440a8bcb9a6e0ffa65d09d99dea32de..f27583c84d399b6eff77c9121e9ce88556a4c31e 100644 (file)
@@ -197,6 +197,7 @@ bool Throttle::get_or_fail(int64_t c)
     std::lock_guard l(lock);
     if (_should_wait(c) || !conds.empty()) {
       ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
+      failed++;
       result = false;
     } else {
       ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load()
@@ -256,6 +257,7 @@ void Throttle::reset()
   if (!conds.empty())
     conds.front().notify_one();
   count = 0;
+  failed = 0;
   if (logger) {
     logger->set(l_throttle_val, 0);
   }
index 980cb85356d004197fea6f3fa61d9dc7398a7204..aabaf8901ffec7218967bc78ceecfa757bc3a767 100644 (file)
@@ -33,7 +33,7 @@ class Throttle final : public ThrottleInterface {
   CephContext *cct;
   const std::string name;
   PerfCountersRef logger;
-  std::atomic<int64_t> count = { 0 }, max = { 0 };
+  std::atomic<int64_t> count = { 0 }, max = { 0 }, failed = { 0 };
   std::mutex lock;
   std::list<std::condition_variable> conds;
   const bool use_perf;
@@ -116,6 +116,15 @@ public:
    * @returns number of requests being hold after this
    */
   int64_t put(int64_t c = 1) override;
+
+  /**
+   * gets the number of (current) slot request failures.
+   * @returns the number of slot request failures
+   */
+  int64_t get_failed() const {
+    return failed.load();
+  }
+
    /**
    * reset the zero to the stock
    */
index 222dc89076e294c9a0efbf91f7869e5a78ebc514..e407f42194341fbfda0336351592b2d30edeff74 100644 (file)
@@ -1189,6 +1189,14 @@ options:
   fmt_desc: Throttles total size of messages waiting to be dispatched.
   default: 100_M
   with_legacy: true
+- name: ms_dispatch_throttle_log_interval
+  type: secs
+  level: advanced
+  desc: Interval in seconds to show the cluster warning and health warning (in ceph status)
+        when the dispatch throttle limit is exceeded. Setting it to 0 disables the
+        cluster warning and health warning.
+  default: 30
+  min: 0
 - name: ms_bind_ipv4
   type: bool
   level: advanced
index 6a2eb3371c728225d23b96e2587a0b261075262b..9e89eb7be132eceea2c69d88d7626c530e31e637 100644 (file)
@@ -12,6 +12,7 @@
 enum class daemon_metric : uint8_t {
   SLOW_OPS,
   PENDING_CREATING_PGS,
+  DISPATCH_QUEUE_THROTTLE,
   NONE,
 };
 
@@ -19,6 +20,7 @@ static inline const char *daemon_metric_name(daemon_metric t) {
   switch (t) {
   case daemon_metric::SLOW_OPS: return "SLOW_OPS";
   case daemon_metric::PENDING_CREATING_PGS: return "PENDING_CREATING_PGS";
+  case daemon_metric::DISPATCH_QUEUE_THROTTLE: return "DISPATCH_QUEUE_THROTTLE";
   case daemon_metric::NONE: return "NONE";
   default: return "???";
   }
index cf1aab2a219da0725abb73d6bc0685c4997c75fb..b64a0bb5d470eddeec41ef88e388d199de3d78c8 100644 (file)
@@ -93,6 +93,31 @@ class PendingPGs final : public DaemonHealthMetricCollector {
   vector<DaemonKey> osds;
 };
 
+class DispatchQueueThrottle final : public DaemonHealthMetricCollector {
+  bool _is_relevant(daemon_metric type) const override {
+    return type == daemon_metric::DISPATCH_QUEUE_THROTTLE;
+  }
+  health_check_t& _get_check(health_check_map_t& cm) const override {
+    return cm.get_or_add("DISPATCH_QUEUE_THROTTLE", HEALTH_WARN, "", 1);
+  }
+  bool _update(const DaemonKey& daemon, const DaemonHealthMetric& metric) override {
+    value.n = metric.get_n();
+    if (metric.get_n()) {
+      daemons.push_back(daemon);
+      return true;
+    } else {
+      return false;
+    }
+  }
+  void _summarize(health_check_t& check) const override {
+    if (daemons.empty()) {
+      return;
+    }
+    check.summary = fmt::format("Dispatch Queue Throttling, {} messages throttled.", value.n);
+  }
+  vector<DaemonKey> daemons;
+};
+
 } // anonymous namespace
 
 unique_ptr<DaemonHealthMetricCollector>
@@ -103,6 +128,8 @@ DaemonHealthMetricCollector::create(daemon_metric m)
     return std::make_unique<SlowOps>();
   case daemon_metric::PENDING_CREATING_PGS:
     return std::make_unique<PendingPGs>();
+  case daemon_metric::DISPATCH_QUEUE_THROTTLE:
+    return std::make_unique<DispatchQueueThrottle>();
   default:
     return {};
   }
index 2d486b0d11a364b96b9d91de4ff8e62f4c28c829..a22ba5afd6b745cf9ebc8e9c94a889875d52f6f0 100644 (file)
@@ -83,10 +83,13 @@ MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
     cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
   last_mon_command_tid(0),
   version_req_id(0)
-{}
+{
+  cct->_conf.add_observer(this);
+}
 
 MonClient::~MonClient()
 {
+  cct->_conf.remove_observer(this);
 }
 
 int MonClient::build_initial_monmap()
@@ -903,6 +906,34 @@ bool MonClient::ms_handle_reset(Connection *con)
   }
 }
 
+bool MonClient::ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) {
+  switch (ttype) {
+  case ms_throttle_t::MESSAGE:
+    break; // TODO
+  case ms_throttle_t::BYTES:
+    break; // TODO
+  case ms_throttle_t::DISPATCH_QUEUE:
+    {
+      //cluster log a warning that Dispatch Queue Throttle Limit hit
+      if (!log_client) {
+        return false; //cannot handle if the daemon didn't setup a log_client for me
+      }
+      LogChannelRef clog = log_client->create_channel(CLOG_CHANNEL_CLUSTER);
+      clog->warn() << "Throttler Limit has been hit. "
+                   << "Some message processing may be significantly delayed. "
+                   << "Additional info: " << tinfo.takenslots << "/"
+                   << tinfo.maxslots << " bytes used, "
+                   << tinfo.failedrequests << " messages throttled.";
+    }
+    break;
+  case ms_throttle_t::NONE:
+    break;
+  default:
+    return false;
+  }
+  return true;
+}
+
 bool MonClient::_opened() const
 {
   ceph_assert(ceph_mutex_is_locked(monc_lock));
@@ -1702,6 +1733,30 @@ int MonClient::handle_auth_request(
   return -EACCES;
 }
 
+std::vector<std::string> MonClient::get_tracked_keys() const noexcept {
+  return {
+    "ms_dispatch_throttle_bytes"s,
+    "ms_dispatch_throttle_log_interval"s
+  };
+}
+
+void MonClient::handle_conf_change(const ConfigProxy& conf,
+                                      const std::set <std::string> &changed) {
+  ldout(cct, 10) << __func__ << " " << changed << dendl;
+  if (changed.count("ms_dispatch_throttle_bytes")) {
+    if (messenger) {
+      messenger->dispatch_throttle_bytes =
+        cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes");
+    }
+  }
+  if (changed.count("ms_dispatch_throttle_log_interval")) {
+    if (messenger) {
+      messenger->dispatch_throttle_log_interval =
+        cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval");
+    }
+  }
+}
+
 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
   std::lock_guard l(monc_lock);
   if (auth) {
index 3b78ea62ae272a7a00683b1033a9f604e2f45720..99e993d67634aaa44cf62d6312d4ab46b3c92c38 100644 (file)
@@ -279,9 +279,10 @@ inline boost::system::error_condition make_error_condition(monc_errc e) noexcept
 const boost::system::error_category& monc_category() noexcept;
 
 class MonClient : public Dispatcher,
-                 public AuthClient,
-                 public AuthServer, /* for mgr, osd, mds */
-                 public AdminSocketHook {
+                  public AuthClient,
+                  public AuthServer, /* for mgr, osd, mds */
+                  public AdminSocketHook,
+                  public md_config_obs_t {
   static constexpr auto dout_subsys = ceph_subsys_monc;
 public:
   // Error, Newest, Oldest
@@ -322,6 +323,7 @@ private:
   bool ms_handle_reset(Connection *con) override;
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override { return false; }
+  bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) override;
 
   void handle_monmap(MMonMap *m);
   void handle_config(MConfig *m);
@@ -422,7 +424,10 @@ public:
     uint32_t auth_method,
     const ceph::buffer::list& bl,
     ceph::buffer::list *reply) override;
-
+  // md_config_obs_t (config observer)
+  std::vector<std::string> get_tracked_keys() const noexcept override;
+  void handle_conf_change(const ConfigProxy& conf,
+                          const std::set <std::string> &changed) override;
   void set_entity_name(EntityName name) { entity_name = name; }
   void set_handle_authentication_dispatcher(Dispatcher *d) {
     handle_authentication_dispatcher = d;
index 1c1893a28140f164e58e752df44d2fa387e0d8e3..c53626be71c2b17e4a3ad111aeabb5be520a3ba8 100644 (file)
@@ -213,6 +213,11 @@ class DispatchQueue {
   uint64_t get_id() {
     return next_id++;
   }
+
+  Messenger* get_messenger() const {
+    return msgr;
+  }
+
   void start();
   void entry();
   void wait();
index 25014e869a88a76de514b25824752ee531d14672..6f2eee35f5643706df9602e597617f4c9ce3fa56 100644 (file)
@@ -22,6 +22,7 @@
 #include "include/ceph_assert.h"
 #include "include/common_fwd.h"
 #include "msg/MessageRef.h"
+#include "msg/msg_types.h"
 
 #include <variant>
 
@@ -32,6 +33,12 @@ class KeyStore;
 
 class Dispatcher {
 public:
+  typedef struct {
+    uint64_t takenslots;
+    uint64_t maxslots;
+    uint64_t failedrequests;
+  } ThrottleInfo;
+
   /* Ordering of dispatch for a list of Dispatchers. */
   using priority_t = uint32_t;
   static constexpr priority_t PRIORITY_HIGH = std::numeric_limits<priority_t>::max() / 4;
@@ -245,6 +252,16 @@ public:
     return false;
   }
 
+  /**
+   * handle throttle limit hit and cluster log it.
+   *
+   * return true if handled
+   * return false if not handled
+   */
+  virtual bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) {
+    return false;
+  }
+
   /**
    * @} //Authentication
    */
index 509784abca6caa5e97d89bacabb92911df546fa9..213d2e190ea6ca0ec482d86e6cedbe436516042a 100644 (file)
@@ -58,6 +58,8 @@ Messenger::Messenger(CephContext *cct_, entity_name_t w)
 {
   auth_registry.refresh_config();
   comp_registry.refresh_config();
+  dispatch_throttle_bytes.store(cct->_conf.get_val<Option::size_t>("ms_dispatch_throttle_bytes"));
+  dispatch_throttle_log_interval.store(cct->_conf.get_val<std::chrono::seconds>("ms_dispatch_throttle_log_interval"));
 }
 
 void Messenger::set_endpoint_addr(const entity_addr_t& a,
index 8dc49e2483cd4fda72568b1e92420fd6277dd9d3..1fc2c5f406b434544504083ad7860a2dbcf1e7c2 100644 (file)
@@ -140,6 +140,8 @@ protected:
 public:
   AuthClient *auth_client = 0;
   AuthServer *auth_server = 0;
+  std::atomic<uint64_t> dispatch_throttle_bytes;
+  std::atomic<std::chrono::seconds> dispatch_throttle_log_interval;
 
 #ifdef UNIT_TESTS_BUILT
   Interceptor *interceptor = nullptr;
@@ -857,6 +859,18 @@ public:
   void set_require_authorizer(bool b) {
     require_authorizer = b;
   }
+  /**
+   * Notify each Dispatcher that the Throttle Limit has been hit. Call
+   * this function whenever the connections are getting throttled.
+   *
+   * @param ttype Throttle type
+   * @param tinfo Throttle info
+   */
+  void ms_deliver_throttle(ms_throttle_t ttype, const Dispatcher::ThrottleInfo& tinfo) {
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
+      dispatcher->ms_handle_throttle(ttype, tinfo);
+    }
+  }
 
   /**
    * @} // Dispatcher Interfacing
index e2a0fd559583c9164e276a314d7b30fc747e2f7a..c665085fdff0f614dab6c45589a1efbed5167ecd 100644 (file)
@@ -106,6 +106,8 @@ protected:
   AsyncConnection *connection;
   AsyncMessenger *messenger;
   CephContext *cct;
+  ceph::coarse_mono_time throttle_prev_log {ceph::coarse_mono_clock::zero()};
+  const std::chrono::seconds THROTTLE_DELIVER_INTERVAL {std::chrono::seconds(120)};
 public:
   std::shared_ptr<AuthConnectionMeta> auth_meta;
 
index 00fab0a910e1c5a45a2df349024c8b0dfc355b67..b5f486ff27009b918127e66a7246df6f90145403 100644 (file)
@@ -742,6 +742,10 @@ CtPtr ProtocolV1::throttle_dispatch_queue() {
   ldout(cct, 20) << __func__ << dendl;
 
   if (cur_msg_size) {
+    Messenger* msgr = connection->dispatch_queue->get_messenger();
+    //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
+    connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes.load());
+
     if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
             cur_msg_size)) {
       ldout(cct, 1)
@@ -750,6 +754,20 @@ CtPtr ProtocolV1::throttle_dispatch_queue() {
           << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
           << connection->dispatch_queue->dispatch_throttler.get_max()
           << " failed, just wait." << dendl;
+      ceph::coarse_mono_time throttle_now = ceph::coarse_mono_clock::now();
+      std::chrono::seconds configured_interval = msgr->dispatch_throttle_log_interval.load();
+      if (configured_interval.count()) {
+        if (std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev_log) >=
+            configured_interval) {
+          //Cluster logging that throttling is occurring.
+          Dispatcher::ThrottleInfo tinfo;
+          tinfo.takenslots = connection->dispatch_queue->dispatch_throttler.get_current();
+          tinfo.maxslots = connection->dispatch_queue->dispatch_throttler.get_max();
+          tinfo.failedrequests = connection->dispatch_queue->dispatch_throttler.get_failed();
+          msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE, tinfo);
+          throttle_prev_log = throttle_now;
+        }
+      }
       // following thread pool deal with th full message queue isn't a
       // short time, so we can wait a ms.
       if (connection->register_time_events.empty()) {
@@ -759,6 +777,15 @@ CtPtr ProtocolV1::throttle_dispatch_queue() {
       }
       return nullptr;
     }
+    else {
+      //Don't deliver ms_throttle_t::NONE forever. Limit it for THROTTLE_DELIVER_INTERVAL seconds
+      //since the last ms_throttle_t::DISPATCH_QUEUE delivery.
+      if (std::chrono::duration_cast<std::chrono::seconds>
+          (ceph::coarse_mono_clock::now() - throttle_prev_log) <= THROTTLE_DELIVER_INTERVAL) {
+        Dispatcher::ThrottleInfo tinfo = {0, 0, 0};
+        msgr->ms_deliver_throttle(ms_throttle_t::NONE, tinfo);
+      }
+    }
   }
 
   throttle_stamp = ceph_clock_now();
index 53221aa9cf1c428dcf489a069a560addd86d5ae1..fcef44525bd8e1834d50912432ddc2b9d8277f15 100644 (file)
@@ -1643,6 +1643,10 @@ CtPtr ProtocolV2::throttle_dispatch_queue() {
 
   const size_t cur_msg_size = get_current_msg_size();
   if (cur_msg_size) {
+    Messenger* msgr = connection->dispatch_queue->get_messenger();
+    //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes.
+    connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes.load());
+
     if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
             cur_msg_size)) {
       ldout(cct, 1)
@@ -1651,6 +1655,20 @@ CtPtr ProtocolV2::throttle_dispatch_queue() {
           << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
           << connection->dispatch_queue->dispatch_throttler.get_max()
           << " failed, just wait." << dendl;
+      ceph::coarse_mono_time throttle_now = ceph::coarse_mono_clock::now();
+      std::chrono::seconds configured_interval = msgr->dispatch_throttle_log_interval.load();
+      if (configured_interval.count()) {
+        if (std::chrono::duration_cast<std::chrono::seconds>(throttle_now - throttle_prev_log) >=
+            configured_interval) {
+          //Cluster logging that throttling is occurring.
+          Dispatcher::ThrottleInfo tinfo;
+          tinfo.takenslots = connection->dispatch_queue->dispatch_throttler.get_current();
+          tinfo.maxslots = connection->dispatch_queue->dispatch_throttler.get_max();
+          tinfo.failedrequests = connection->dispatch_queue->dispatch_throttler.get_failed();
+          msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE, tinfo);
+          throttle_prev_log = throttle_now;
+        }
+      }
       // following thread pool deal with th full message queue isn't a
       // short time, so we can wait a ms.
       if (connection->register_time_events.empty()) {
@@ -1660,6 +1678,15 @@ CtPtr ProtocolV2::throttle_dispatch_queue() {
       }
       return nullptr;
     }
+    else {
+      //Don't deliver ms_throttle_t::NONE forever. Limit it for THROTTLE_DELIVER_INTERVAL seconds
+      //since the last ms_throttle_t::DISPATCH_QUEUE delivery.
+      if (std::chrono::duration_cast<std::chrono::seconds>
+          (ceph::coarse_mono_clock::now() - throttle_prev_log) <= THROTTLE_DELIVER_INTERVAL) {
+        Dispatcher::ThrottleInfo tinfo = {0, 0, 0};
+        msgr->ms_deliver_throttle(ms_throttle_t::NONE, tinfo);
+      }
+    }
   }
 
   throttle_stamp = ceph_clock_now();
index f823ad7e87c3d9821530e18aa405bb670b7fde35..05017ef1a94f5c7170a9ff185bdaf411b8a54e05 100644 (file)
@@ -842,4 +842,11 @@ inline std::ostream& operator<<(std::ostream& out, const ceph_entity_inst &i)
   return out << n;
 }
 
+enum class ms_throttle_t {
+    MESSAGE,
+    BYTES,
+    DISPATCH_QUEUE,
+    NONE
+};
+
 #endif
index 5e8ef8927cd12d61c25c4e853f6559217dd9d248..fe5caee0b2d8a82524011aa829e1e35a2c0ff467 100644 (file)
@@ -6832,6 +6832,41 @@ bool OSD::ms_handle_refused(Connection *con)
   return true;
 }
 
+bool OSD::ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) {
+  switch (ttype) {
+  case ms_throttle_t::MESSAGE:
+    break; // TODO
+  case ms_throttle_t::BYTES:
+    break; // TODO
+  case ms_throttle_t::DISPATCH_QUEUE:
+    {
+      //save the latest throttled time, save the number of messages throttled.
+      std::lock_guard l(dispatch_queue_throttle_lock);
+      last_throttled.store(ceph::coarse_mono_clock::now());
+      messages_throttled.store(tinfo.failedrequests);
+    }
+    break;
+  case ms_throttle_t::NONE:
+    {
+      //No Throttling
+      std::lock_guard l(dispatch_queue_throttle_lock);
+      if (last_throttled.load() != ceph::coarse_mono_clock::zero()) {
+        //Don't be hurry to reset last_throttled. Give get_health_metrics()
+        //THROTTLE_STATUS_INTERVAL seconds to read and display the previous status.
+        if (std::chrono::duration_cast<std::chrono::seconds>
+            (ceph::coarse_mono_clock::now() - last_throttled) >= THROTTLE_STATUS_INTERVAL) {
+          last_throttled.store(ceph::coarse_mono_clock::zero());
+          messages_throttled.store(0);
+        }
+      }
+    }
+    break;
+  default:
+    return false;
+  }
+  return true;
+}
+
 struct CB_OSD_GetVersion {
   OSD *osd;
   explicit CB_OSD_GetVersion(OSD *o) : osd(o) {}
@@ -8024,6 +8059,16 @@ vector<DaemonHealthMetric> OSD::get_health_metrics()
     }
     metrics.emplace_back(daemon_metric::PENDING_CREATING_PGS, n_primaries);
   }
+  {
+    std::lock_guard l(dispatch_queue_throttle_lock);
+    if (messages_throttled.load() > 0) {
+      stringstream ss;
+      ss << "Dispatch Queue Throttling, " << messages_throttled.load() << " messages throttled.";
+      lgeneric_subdout(cct,osd,1) << ss.str() << dendl;
+      clog->warn() << ss.str();
+      metrics.emplace_back(daemon_metric::DISPATCH_QUEUE_THROTTLE, messages_throttled.load());
+    }
+  }
   return metrics;
 }
 
index 3b3e7092650d4e3a2be2351fee2aec1a1e78d029..cbf18889aef7b87a7d4961c8f1c15982bed2c91d 100644 (file)
@@ -1425,6 +1425,9 @@ public:
 
 private:
   std::atomic<int> state{STATE_INITIALIZING};
+  std::atomic<ceph::coarse_mono_time> last_throttled {ceph::coarse_mono_clock::zero()};
+  std::atomic<int64_t> messages_throttled {0};
+  const std::chrono::seconds THROTTLE_STATUS_INTERVAL {std::chrono::seconds(10)};
 
 public:
   int get_state() const {
@@ -1953,6 +1956,7 @@ protected:
   std::atomic<size_t> num_pgs = {0};
 
   std::mutex pending_creates_lock;
+  std::mutex dispatch_queue_throttle_lock;
   using create_from_osd_t = std::pair<spg_t, bool /* is primary*/>;
   std::set<create_from_osd_t> pending_creates_from_osd;
   unsigned pending_creates_from_mon = 0;
@@ -2150,6 +2154,7 @@ private:
   bool ms_handle_reset(Connection *con) override;
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override;
+  bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) override;
 
  public:
   /* internal and external can point to the same messenger, they will still