From: Jos Collin Date: Wed, 4 Nov 2020 13:21:44 +0000 (+0530) Subject: msg,mon,mgr,osd,common: log when DispatchQueue throttle limit is reached X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9406b492a2541fba63ef12f3166e9ed7f7129659;p=ceph-ci.git msg,mon,mgr,osd,common: log when DispatchQueue throttle limit is reached * Log when DispatchQueue throttle limit is reached. * Display a HEALTH_WARN in the ceph status. Fixes: https://tracker.ceph.com/issues/46226 Signed-off-by: Jos Collin --- diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc index 17f5d4c5e44..f27583c84d3 100644 --- a/src/common/Throttle.cc +++ b/src/common/Throttle.cc @@ -197,6 +197,7 @@ bool Throttle::get_or_fail(int64_t c) std::lock_guard l(lock); if (_should_wait(c) || !conds.empty()) { ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl; + failed++; result = false; } else { ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() @@ -256,6 +257,7 @@ void Throttle::reset() if (!conds.empty()) conds.front().notify_one(); count = 0; + failed = 0; if (logger) { logger->set(l_throttle_val, 0); } diff --git a/src/common/Throttle.h b/src/common/Throttle.h index 980cb85356d..aabaf8901ff 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -33,7 +33,7 @@ class Throttle final : public ThrottleInterface { CephContext *cct; const std::string name; PerfCountersRef logger; - std::atomic count = { 0 }, max = { 0 }; + std::atomic count = { 0 }, max = { 0 }, failed = { 0 }; std::mutex lock; std::list conds; const bool use_perf; @@ -116,6 +116,15 @@ public: * @returns number of requests being hold after this */ int64_t put(int64_t c = 1) override; + + /** + * gets the number of (current) slot request failures. + * @returns the number of slot request failures + */ + int64_t get_failed() const { + return failed.load(); + } + /** * reset the zero to the stock */ diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 222dc89076e..e407f421943 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -1189,6 +1189,14 @@ options: fmt_desc: Throttles total size of messages waiting to be dispatched. default: 100_M with_legacy: true +- name: ms_dispatch_throttle_log_interval + type: secs + level: advanced + desc: Interval in seconds to show the cluster warning and health warning (in ceph status) + when the dispatch throttle limit is exceeded. Setting it to 0 disables the + cluster warning and health warning. + default: 30 + min: 0 - name: ms_bind_ipv4 type: bool level: advanced diff --git a/src/mgr/DaemonHealthMetric.h b/src/mgr/DaemonHealthMetric.h index 6a2eb3371c7..9e89eb7be13 100644 --- a/src/mgr/DaemonHealthMetric.h +++ b/src/mgr/DaemonHealthMetric.h @@ -12,6 +12,7 @@ enum class daemon_metric : uint8_t { SLOW_OPS, PENDING_CREATING_PGS, + DISPATCH_QUEUE_THROTTLE, NONE, }; @@ -19,6 +20,7 @@ static inline const char *daemon_metric_name(daemon_metric t) { switch (t) { case daemon_metric::SLOW_OPS: return "SLOW_OPS"; case daemon_metric::PENDING_CREATING_PGS: return "PENDING_CREATING_PGS"; + case daemon_metric::DISPATCH_QUEUE_THROTTLE: return "DISPATCH_QUEUE_THROTTLE"; case daemon_metric::NONE: return "NONE"; default: return "???"; } diff --git a/src/mgr/DaemonHealthMetricCollector.cc b/src/mgr/DaemonHealthMetricCollector.cc index cf1aab2a219..b64a0bb5d47 100644 --- a/src/mgr/DaemonHealthMetricCollector.cc +++ b/src/mgr/DaemonHealthMetricCollector.cc @@ -93,6 +93,31 @@ class PendingPGs final : public DaemonHealthMetricCollector { vector osds; }; +class DispatchQueueThrottle final : public DaemonHealthMetricCollector { + bool _is_relevant(daemon_metric type) const override { + return type == daemon_metric::DISPATCH_QUEUE_THROTTLE; + } + health_check_t& _get_check(health_check_map_t& cm) const override { + return cm.get_or_add("DISPATCH_QUEUE_THROTTLE", HEALTH_WARN, "", 1); + } + bool _update(const DaemonKey& daemon, const DaemonHealthMetric& metric) override { + value.n = metric.get_n(); + if (metric.get_n()) { + daemons.push_back(daemon); + return true; + } else { + return false; + } + } + void _summarize(health_check_t& check) const override { + if (daemons.empty()) { + return; + } + check.summary = fmt::format("Dispatch Queue Throttling, {} messages throttled.", value.n); + } + vector daemons; +}; + } // anonymous namespace unique_ptr @@ -103,6 +128,8 @@ DaemonHealthMetricCollector::create(daemon_metric m) return std::make_unique(); case daemon_metric::PENDING_CREATING_PGS: return std::make_unique(); + case daemon_metric::DISPATCH_QUEUE_THROTTLE: + return std::make_unique(); default: return {}; } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 2d486b0d11a..a22ba5afd6b 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -83,10 +83,13 @@ MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) : cct_->_conf.get_val("mon_client_hunt_interval_min_multiple")), last_mon_command_tid(0), version_req_id(0) -{} +{ + cct->_conf.add_observer(this); +} MonClient::~MonClient() { + cct->_conf.remove_observer(this); } int MonClient::build_initial_monmap() @@ -903,6 +906,34 @@ bool MonClient::ms_handle_reset(Connection *con) } } +bool MonClient::ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) { + switch (ttype) { + case ms_throttle_t::MESSAGE: + break; // TODO + case ms_throttle_t::BYTES: + break; // TODO + case ms_throttle_t::DISPATCH_QUEUE: + { + //cluster log a warning that Dispatch Queue Throttle Limit hit + if (!log_client) { + return false; //cannot handle if the daemon didn't setup a log_client for me + } + LogChannelRef clog = log_client->create_channel(CLOG_CHANNEL_CLUSTER); + clog->warn() << "Throttler Limit has been hit. " + << "Some message processing may be significantly delayed. " + << "Additional info: " << tinfo.takenslots << "/" + << tinfo.maxslots << " bytes used, " + << tinfo.failedrequests << " messages throttled."; + } + break; + case ms_throttle_t::NONE: + break; + default: + return false; + } + return true; +} + bool MonClient::_opened() const { ceph_assert(ceph_mutex_is_locked(monc_lock)); @@ -1702,6 +1733,30 @@ int MonClient::handle_auth_request( return -EACCES; } +std::vector MonClient::get_tracked_keys() const noexcept { + return { + "ms_dispatch_throttle_bytes"s, + "ms_dispatch_throttle_log_interval"s + }; +} + +void MonClient::handle_conf_change(const ConfigProxy& conf, + const std::set &changed) { + ldout(cct, 10) << __func__ << " " << changed << dendl; + if (changed.count("ms_dispatch_throttle_bytes")) { + if (messenger) { + messenger->dispatch_throttle_bytes = + cct->_conf.get_val("ms_dispatch_throttle_bytes"); + } + } + if (changed.count("ms_dispatch_throttle_log_interval")) { + if (messenger) { + messenger->dispatch_throttle_log_interval = + cct->_conf.get_val("ms_dispatch_throttle_log_interval"); + } + } +} + AuthAuthorizer* MonClient::build_authorizer(int service_id) const { std::lock_guard l(monc_lock); if (auth) { diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 3b78ea62ae2..99e993d6763 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -279,9 +279,10 @@ inline boost::system::error_condition make_error_condition(monc_errc e) noexcept const boost::system::error_category& monc_category() noexcept; class MonClient : public Dispatcher, - public AuthClient, - public AuthServer, /* for mgr, osd, mds */ - public AdminSocketHook { + public AuthClient, + public AuthServer, /* for mgr, osd, mds */ + public AdminSocketHook, + public md_config_obs_t { static constexpr auto dout_subsys = ceph_subsys_monc; public: // Error, Newest, Oldest @@ -322,6 +323,7 @@ private: bool ms_handle_reset(Connection *con) override; void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override { return false; } + bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) override; void handle_monmap(MMonMap *m); void handle_config(MConfig *m); @@ -422,7 +424,10 @@ public: uint32_t auth_method, const ceph::buffer::list& bl, ceph::buffer::list *reply) override; - + // md_config_obs_t (config observer) + std::vector get_tracked_keys() const noexcept override; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override; void set_entity_name(EntityName name) { entity_name = name; } void set_handle_authentication_dispatcher(Dispatcher *d) { handle_authentication_dispatcher = d; diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index 1c1893a2814..c53626be71c 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -213,6 +213,11 @@ class DispatchQueue { uint64_t get_id() { return next_id++; } + + Messenger* get_messenger() const { + return msgr; + } + void start(); void entry(); void wait(); diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 25014e869a8..6f2eee35f56 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -22,6 +22,7 @@ #include "include/ceph_assert.h" #include "include/common_fwd.h" #include "msg/MessageRef.h" +#include "msg/msg_types.h" #include @@ -32,6 +33,12 @@ class KeyStore; class Dispatcher { public: + typedef struct { + uint64_t takenslots; + uint64_t maxslots; + uint64_t failedrequests; + } ThrottleInfo; + /* Ordering of dispatch for a list of Dispatchers. */ using priority_t = uint32_t; static constexpr priority_t PRIORITY_HIGH = std::numeric_limits::max() / 4; @@ -245,6 +252,16 @@ public: return false; } + /** + * handle throttle limit hit and cluster log it. + * + * return true if handled + * return false if not handled + */ + virtual bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) { + return false; + } + /** * @} //Authentication */ diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc index 509784abca6..213d2e190ea 100644 --- a/src/msg/Messenger.cc +++ b/src/msg/Messenger.cc @@ -58,6 +58,8 @@ Messenger::Messenger(CephContext *cct_, entity_name_t w) { auth_registry.refresh_config(); comp_registry.refresh_config(); + dispatch_throttle_bytes.store(cct->_conf.get_val("ms_dispatch_throttle_bytes")); + dispatch_throttle_log_interval.store(cct->_conf.get_val("ms_dispatch_throttle_log_interval")); } void Messenger::set_endpoint_addr(const entity_addr_t& a, diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 8dc49e2483c..1fc2c5f406b 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -140,6 +140,8 @@ protected: public: AuthClient *auth_client = 0; AuthServer *auth_server = 0; + std::atomic dispatch_throttle_bytes; + std::atomic dispatch_throttle_log_interval; #ifdef UNIT_TESTS_BUILT Interceptor *interceptor = nullptr; @@ -857,6 +859,18 @@ public: void set_require_authorizer(bool b) { require_authorizer = b; } + /** + * Notify each Dispatcher that the Throttle Limit has been hit. Call + * this function whenever the connections are getting throttled. + * + * @param ttype Throttle type + * @param tinfo Throttle info + */ + void ms_deliver_throttle(ms_throttle_t ttype, const Dispatcher::ThrottleInfo& tinfo) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + dispatcher->ms_handle_throttle(ttype, tinfo); + } + } /** * @} // Dispatcher Interfacing diff --git a/src/msg/async/Protocol.h b/src/msg/async/Protocol.h index e2a0fd55958..c665085fdff 100644 --- a/src/msg/async/Protocol.h +++ b/src/msg/async/Protocol.h @@ -106,6 +106,8 @@ protected: AsyncConnection *connection; AsyncMessenger *messenger; CephContext *cct; + ceph::coarse_mono_time throttle_prev_log {ceph::coarse_mono_clock::zero()}; + const std::chrono::seconds THROTTLE_DELIVER_INTERVAL {std::chrono::seconds(120)}; public: std::shared_ptr auth_meta; diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 00fab0a910e..b5f486ff270 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -742,6 +742,10 @@ CtPtr ProtocolV1::throttle_dispatch_queue() { ldout(cct, 20) << __func__ << dendl; if (cur_msg_size) { + Messenger* msgr = connection->dispatch_queue->get_messenger(); + //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes. + connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes.load()); + if (!connection->dispatch_queue->dispatch_throttler.get_or_fail( cur_msg_size)) { ldout(cct, 1) @@ -750,6 +754,20 @@ CtPtr ProtocolV1::throttle_dispatch_queue() { << connection->dispatch_queue->dispatch_throttler.get_current() << "/" << connection->dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl; + ceph::coarse_mono_time throttle_now = ceph::coarse_mono_clock::now(); + std::chrono::seconds configured_interval = msgr->dispatch_throttle_log_interval.load(); + if (configured_interval.count()) { + if (std::chrono::duration_cast(throttle_now - throttle_prev_log) >= + configured_interval) { + //Cluster logging that throttling is occurring. + Dispatcher::ThrottleInfo tinfo; + tinfo.takenslots = connection->dispatch_queue->dispatch_throttler.get_current(); + tinfo.maxslots = connection->dispatch_queue->dispatch_throttler.get_max(); + tinfo.failedrequests = connection->dispatch_queue->dispatch_throttler.get_failed(); + msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE, tinfo); + throttle_prev_log = throttle_now; + } + } // following thread pool deal with th full message queue isn't a // short time, so we can wait a ms. if (connection->register_time_events.empty()) { @@ -759,6 +777,15 @@ CtPtr ProtocolV1::throttle_dispatch_queue() { } return nullptr; } + else { + //Don't deliver ms_throttle_t::NONE forever. Limit it for THROTTLE_DELIVER_INTERVAL seconds + //since the last ms_throttle_t::DISPATCH_QUEUE delivery. + if (std::chrono::duration_cast + (ceph::coarse_mono_clock::now() - throttle_prev_log) <= THROTTLE_DELIVER_INTERVAL) { + Dispatcher::ThrottleInfo tinfo = {0, 0, 0}; + msgr->ms_deliver_throttle(ms_throttle_t::NONE, tinfo); + } + } } throttle_stamp = ceph_clock_now(); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 53221aa9cf1..fcef44525bd 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -1643,6 +1643,10 @@ CtPtr ProtocolV2::throttle_dispatch_queue() { const size_t cur_msg_size = get_current_msg_size(); if (cur_msg_size) { + Messenger* msgr = connection->dispatch_queue->get_messenger(); + //update max if it's changed in the conf. Expecting qa tests would change ms_dispatch_throttle_bytes. + connection->dispatch_queue->dispatch_throttler.reset_max(msgr->dispatch_throttle_bytes.load()); + if (!connection->dispatch_queue->dispatch_throttler.get_or_fail( cur_msg_size)) { ldout(cct, 1) @@ -1651,6 +1655,20 @@ CtPtr ProtocolV2::throttle_dispatch_queue() { << connection->dispatch_queue->dispatch_throttler.get_current() << "/" << connection->dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl; + ceph::coarse_mono_time throttle_now = ceph::coarse_mono_clock::now(); + std::chrono::seconds configured_interval = msgr->dispatch_throttle_log_interval.load(); + if (configured_interval.count()) { + if (std::chrono::duration_cast(throttle_now - throttle_prev_log) >= + configured_interval) { + //Cluster logging that throttling is occurring. + Dispatcher::ThrottleInfo tinfo; + tinfo.takenslots = connection->dispatch_queue->dispatch_throttler.get_current(); + tinfo.maxslots = connection->dispatch_queue->dispatch_throttler.get_max(); + tinfo.failedrequests = connection->dispatch_queue->dispatch_throttler.get_failed(); + msgr->ms_deliver_throttle(ms_throttle_t::DISPATCH_QUEUE, tinfo); + throttle_prev_log = throttle_now; + } + } // following thread pool deal with th full message queue isn't a // short time, so we can wait a ms. if (connection->register_time_events.empty()) { @@ -1660,6 +1678,15 @@ CtPtr ProtocolV2::throttle_dispatch_queue() { } return nullptr; } + else { + //Don't deliver ms_throttle_t::NONE forever. Limit it for THROTTLE_DELIVER_INTERVAL seconds + //since the last ms_throttle_t::DISPATCH_QUEUE delivery. + if (std::chrono::duration_cast + (ceph::coarse_mono_clock::now() - throttle_prev_log) <= THROTTLE_DELIVER_INTERVAL) { + Dispatcher::ThrottleInfo tinfo = {0, 0, 0}; + msgr->ms_deliver_throttle(ms_throttle_t::NONE, tinfo); + } + } } throttle_stamp = ceph_clock_now(); diff --git a/src/msg/msg_types.h b/src/msg/msg_types.h index f823ad7e87c..05017ef1a94 100644 --- a/src/msg/msg_types.h +++ b/src/msg/msg_types.h @@ -842,4 +842,11 @@ inline std::ostream& operator<<(std::ostream& out, const ceph_entity_inst &i) return out << n; } +enum class ms_throttle_t { + MESSAGE, + BYTES, + DISPATCH_QUEUE, + NONE +}; + #endif diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 5e8ef8927cd..fe5caee0b2d 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -6832,6 +6832,41 @@ bool OSD::ms_handle_refused(Connection *con) return true; } +bool OSD::ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) { + switch (ttype) { + case ms_throttle_t::MESSAGE: + break; // TODO + case ms_throttle_t::BYTES: + break; // TODO + case ms_throttle_t::DISPATCH_QUEUE: + { + //save the latest throttled time, save the number of messages throttled. + std::lock_guard l(dispatch_queue_throttle_lock); + last_throttled.store(ceph::coarse_mono_clock::now()); + messages_throttled.store(tinfo.failedrequests); + } + break; + case ms_throttle_t::NONE: + { + //No Throttling + std::lock_guard l(dispatch_queue_throttle_lock); + if (last_throttled.load() != ceph::coarse_mono_clock::zero()) { + //Don't be hurry to reset last_throttled. Give get_health_metrics() + //THROTTLE_STATUS_INTERVAL seconds to read and display the previous status. + if (std::chrono::duration_cast + (ceph::coarse_mono_clock::now() - last_throttled) >= THROTTLE_STATUS_INTERVAL) { + last_throttled.store(ceph::coarse_mono_clock::zero()); + messages_throttled.store(0); + } + } + } + break; + default: + return false; + } + return true; +} + struct CB_OSD_GetVersion { OSD *osd; explicit CB_OSD_GetVersion(OSD *o) : osd(o) {} @@ -8024,6 +8059,16 @@ vector OSD::get_health_metrics() } metrics.emplace_back(daemon_metric::PENDING_CREATING_PGS, n_primaries); } + { + std::lock_guard l(dispatch_queue_throttle_lock); + if (messages_throttled.load() > 0) { + stringstream ss; + ss << "Dispatch Queue Throttling, " << messages_throttled.load() << " messages throttled."; + lgeneric_subdout(cct,osd,1) << ss.str() << dendl; + clog->warn() << ss.str(); + metrics.emplace_back(daemon_metric::DISPATCH_QUEUE_THROTTLE, messages_throttled.load()); + } + } return metrics; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 3b3e7092650..cbf18889aef 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1425,6 +1425,9 @@ public: private: std::atomic state{STATE_INITIALIZING}; + std::atomic last_throttled {ceph::coarse_mono_clock::zero()}; + std::atomic messages_throttled {0}; + const std::chrono::seconds THROTTLE_STATUS_INTERVAL {std::chrono::seconds(10)}; public: int get_state() const { @@ -1953,6 +1956,7 @@ protected: std::atomic num_pgs = {0}; std::mutex pending_creates_lock; + std::mutex dispatch_queue_throttle_lock; using create_from_osd_t = std::pair; std::set pending_creates_from_osd; unsigned pending_creates_from_mon = 0; @@ -2150,6 +2154,7 @@ private: bool ms_handle_reset(Connection *con) override; void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override; + bool ms_handle_throttle(ms_throttle_t ttype, const ThrottleInfo& tinfo) override; public: /* internal and external can point to the same messenger, they will still